general12 min read

Build a CSV Analysis Pipeline with DataStoryBot and Cron

Set up a recurring job that ingests daily CSV exports, generates data stories via DataStoryBot's API, and posts results to Slack or email.

By DataStoryBot Team

Build a CSV Analysis Pipeline with DataStoryBot and Cron

Most data analysis workflows are still manual. Someone exports a CSV, opens a notebook, writes some pandas, produces a chart, and pastes it into Slack. This works when you have one dataset and one person. It doesn't work when you have ten datasets, five audiences, and the person is on vacation.

This article builds a complete unattended pipeline: a cron job fires on a schedule, pulls one or more CSV files from a data source, sends each through the DataStoryBot API to generate a narrative with charts, and delivers results to Slack or email. The pipeline handles retries, logs every run, and fails loudly if something goes wrong.

By the end, you'll have a production-ready Python script and a crontab entry (or GitHub Actions equivalent) to run it automatically.

Architecture

The pipeline has five stages:

[Cron] → [Fetch CSVs] → [Upload to DataStoryBot] → [Analyze + Refine] → [Deliver to Slack / Email]

Each stage is a function. Failures are isolated — a network error fetching from S3 doesn't look the same as a DataStoryBot 500, and neither looks like a Slack webhook failure. This separation makes debugging straightforward when a run fails at 6 AM.

Prerequisites

  • Python 3.8+ with requests and python-dotenv installed
  • A Slack incoming webhook URL (optional) — free, available from the Slack App directory
  • A SendGrid API key (optional) — free tier handles 100 emails/day
  • A CSV source: a local directory, an S3 bucket, a data warehouse export, or a URL

No DataStoryBot API key is required during the current open beta.

Project Layout

csv_pipeline/
├── pipeline.py        # Main script
├── deliver.py         # Slack and email delivery functions
├── .env               # Secrets (never commit this)
├── requirements.txt
└── logs/
    └── pipeline.log

Step 1: Fetch the CSV

In production, CSVs usually come from one of three places: a local directory where an export lands, an S3 bucket, or a database query. This example covers all three with a pluggable fetcher pattern.

import os
import requests
import boto3

def fetch_local(path):
    """Return path directly if it's a local file."""
    if not os.path.exists(path):
        raise FileNotFoundError(f"CSV not found: {path}")
    if os.path.getsize(path) == 0:
        raise ValueError(f"CSV is empty: {path}")
    return path


def fetch_s3(bucket, key, local_dest="/tmp/pipeline_input.csv"):
    """Download a CSV from S3 to a local temp file."""
    s3 = boto3.client("s3")
    s3.download_file(bucket, key, local_dest)
    return local_dest


def fetch_url(url, local_dest="/tmp/pipeline_input.csv"):
    """Download a CSV from a URL."""
    resp = requests.get(url, timeout=60, stream=True)
    resp.raise_for_status()
    with open(local_dest, "wb") as f:
        for chunk in resp.iter_content(chunk_size=8192):
            f.write(chunk)
    return local_dest

For the pipeline entry point, you configure which fetcher to use via environment variables. This keeps the script agnostic to the source:

import os
from dotenv import load_dotenv

load_dotenv()

CSV_SOURCE = os.getenv("CSV_SOURCE", "local")   # "local", "s3", or "url"
CSV_PATH   = os.getenv("CSV_PATH", "/data/daily_metrics.csv")
S3_BUCKET  = os.getenv("S3_BUCKET")
S3_KEY     = os.getenv("S3_KEY")
CSV_URL    = os.getenv("CSV_URL")

def get_csv():
    if CSV_SOURCE == "local":
        return fetch_local(CSV_PATH)
    elif CSV_SOURCE == "s3":
        return fetch_s3(S3_BUCKET, S3_KEY)
    elif CSV_SOURCE == "url":
        return fetch_url(CSV_URL)
    else:
        raise ValueError(f"Unknown CSV_SOURCE: {CSV_SOURCE}")

Step 2: Upload, Analyze, and Refine

This is the DataStoryBot three-call flow described in the getting started guide, wrapped in a function that returns structured output. The function handles the full pipeline — upload, discover story angles, refine the top story, and download all charts.

import time
import logging

BASE_URL = "https://datastory.bot/api"
logger = logging.getLogger("pipeline")


def analyze_csv(csv_path, steering=None, timeout=300):
    """
    Run the full DataStoryBot pipeline for a single CSV.
    Returns a dict: {title, narrative, charts: [{bytes, caption}]}.
    """
    # --- Upload ---
    logger.info(f"Uploading {csv_path}")
    with open(csv_path, "rb") as f:
        upload_resp = requests.post(
            f"{BASE_URL}/upload",
            files={"file": (os.path.basename(csv_path), f, "text/csv")},
            timeout=60,
        )
    upload_resp.raise_for_status()
    upload_data = upload_resp.json()

    container_id = upload_data["containerId"]
    meta = upload_data["metadata"]
    logger.info(
        f"Uploaded: {meta['fileName']} — "
        f"{meta['rowCount']} rows, {meta['columnCount']} cols"
    )

    # --- Analyze ---
    analyze_payload = {"containerId": container_id}
    if steering:
        analyze_payload["steeringPrompt"] = steering

    logger.info("Running analysis...")
    analyze_resp = requests.post(
        f"{BASE_URL}/analyze",
        json=analyze_payload,
        timeout=timeout,
    )
    analyze_resp.raise_for_status()
    stories = analyze_resp.json()

    if not stories:
        raise RuntimeError("Analysis returned no stories. Check the CSV has analyzable data.")

    logger.info(f"Found {len(stories)} story angles:")
    for s in stories:
        logger.info(f"  - {s['title']}")

    # --- Refine the top story ---
    logger.info(f"Refining: {stories[0]['title']}")
    refine_resp = requests.post(
        f"{BASE_URL}/refine",
        json={
            "containerId": container_id,
            "selectedStoryTitle": stories[0]["title"],
        },
        timeout=timeout,
    )
    refine_resp.raise_for_status()
    report = refine_resp.json()

    # --- Download charts ---
    chart_images = []
    for chart in report.get("charts", []):
        img_resp = requests.get(
            f"{BASE_URL}/files/{container_id}/{chart['fileId']}",
            timeout=60,
        )
        img_resp.raise_for_status()
        chart_images.append({
            "bytes": img_resp.content,
            "caption": chart["caption"],
            "file_id": chart["fileId"],
        })
        logger.info(f"Downloaded chart: {chart['caption']}")

    return {
        "title": stories[0]["title"],
        "narrative": report["narrative"],
        "charts": chart_images,
        "container_id": container_id,
    }

Steering the Analysis

If you know what your stakeholders care about, pass a steeringPrompt to guide the analysis. For a daily sales report, that might look like:

result = analyze_csv(
    csv_path,
    steering=(
        "Focus on day-over-day changes. Flag any metric that moved more than 10% "
        "compared to yesterday. Highlight the top-performing and worst-performing segments."
    ),
)

For more on shaping the output, see steering prompts for controlled analysis.

Step 3: Retry Wrapper

Cron jobs fail silently. Wrapping the analysis in a retry loop means transient network errors or brief API unavailability don't kill the pipeline — and when it does fail permanently, you get a clear error to work with. Full retry logic for DataStoryBot is covered in error handling and retry patterns for data analysis APIs. Here's the focused version for this pipeline:

import time
import random


def analyze_with_retry(csv_path, steering=None, max_attempts=3):
    """
    Retry analyze_csv on transient failures with exponential backoff.
    Raises on the last attempt so the caller knows the run failed.
    """
    last_error = None
    for attempt in range(max_attempts):
        try:
            return analyze_csv(csv_path, steering=steering)
        except requests.HTTPError as e:
            status = e.response.status_code if e.response else 0
            # Don't retry permanent errors
            if status in (400, 401, 413, 422):
                raise
            last_error = e
        except requests.RequestException as e:
            last_error = e

        if attempt < max_attempts - 1:
            delay = (2 ** attempt) + random.uniform(0, 1)
            logger.warning(
                f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s: {last_error}"
            )
            time.sleep(delay)

    raise RuntimeError(
        f"Pipeline failed after {max_attempts} attempts: {last_error}"
    )

Step 4: Deliver to Slack

Slack is the fastest way to get analysis results in front of a team. Use an incoming webhook to post the narrative text and attach chart images.

import json


SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")


def post_to_slack(result):
    """Post the narrative and first chart to a Slack channel via webhook."""
    if not SLACK_WEBHOOK_URL:
        logger.info("SLACK_WEBHOOK_URL not set — skipping Slack delivery")
        return

    # Trim the narrative for Slack's 3000-char block limit
    narrative = result["narrative"]
    if len(narrative) > 2800:
        narrative = narrative[:2800] + "\n\n_(truncated — see email for full report)_"

    payload = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": result["title"],
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": narrative,
                }
            },
        ]
    }

    resp = requests.post(
        SLACK_WEBHOOK_URL,
        data=json.dumps(payload),
        headers={"Content-Type": "application/json"},
        timeout=30,
    )
    resp.raise_for_status()
    logger.info("Slack notification sent")

If you want to post chart images to Slack, use the Slack Files API (files.upload) with the chart bytes. The webhook approach above is simpler and sufficient for narrative-only delivery. For most teams, narrative-in-Slack plus charts-in-email is the right split.

Step 5: Deliver via Email

For charts and formatted narratives, email is more appropriate than Slack. This uses SendGrid with inline chart images — the same approach covered in automating weekly data reports, adapted here for daily pipelines:

import base64
import markdown as md_lib
from datetime import date
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import (
    Mail, Attachment, FileContent, FileName,
    FileType, Disposition, ContentId, Content, MimeType,
)

SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
EMAIL_FROM       = os.getenv("EMAIL_FROM", "reports@yourcompany.com")
EMAIL_TO         = os.getenv("EMAIL_TO", "").split(",")  # Comma-separated list


def send_email(result):
    """Send the narrative and charts as an HTML email via SendGrid."""
    if not SENDGRID_API_KEY or not EMAIL_TO:
        logger.info("SendGrid config missing — skipping email delivery")
        return

    html_narrative = md_lib.markdown(
        result["narrative"],
        extensions=["tables", "fenced_code"],
    )

    charts_html = ""
    for chart in result["charts"]:
        cid = chart["file_id"]
        charts_html += f"""
        <div style="background:#141414;padding:16px;border-radius:8px;margin:16px 0;">
            <img src="cid:{cid}" alt="{chart['caption']}" width="600"
                 style="max-width:100%;height:auto;" />
            <p style="color:#999;font-size:13px;margin-top:8px;">{chart['caption']}</p>
        </div>
        """

    html_body = f"""
    <html>
    <body style="font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;
                 color:#333;max-width:680px;margin:0 auto;padding:24px;">
        <h1 style="font-size:22px;color:#111;">{result['title']}</h1>
        {html_narrative}
        <h2 style="font-size:18px;color:#111;margin-top:32px;">Charts</h2>
        {charts_html}
        <hr style="border:none;border-top:1px solid #eee;margin:32px 0;" />
        <p style="font-size:12px;color:#999;">
            Generated by <a href="https://datastory.bot">DataStoryBot</a>
            on {date.today().isoformat()}.
        </p>
    </body>
    </html>
    """

    message = Mail(
        from_email=EMAIL_FROM,
        to_emails=[e.strip() for e in EMAIL_TO if e.strip()],
        subject=f"Daily Data Report: {result['title']}",
        html_content=Content(MimeType.html, html_body),
    )

    for chart in result["charts"]:
        encoded = base64.b64encode(chart["bytes"]).decode("utf-8")
        message.attachment = Attachment(
            FileContent(encoded),
            FileName(f"{chart['file_id']}.png"),
            FileType("image/png"),
            Disposition("inline"),
            ContentId(chart["file_id"]),
        )

    sg = SendGridAPIClient(SENDGRID_API_KEY)
    resp = sg.send(message)
    logger.info(f"Email sent: HTTP {resp.status_code}")

Step 6: The Main Entry Point

Tie everything together in pipeline.py. Logging goes to stdout (captured by cron to a log file) and to a rotating file handler:

#!/usr/bin/env python3
"""
Daily CSV analysis pipeline.
Fetches a CSV, analyzes it with DataStoryBot, and delivers results to Slack and email.
"""
import logging
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler

# --- Logging setup ---
LOG_PATH = os.getenv("LOG_PATH", "logs/pipeline.log")
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),
        RotatingFileHandler(LOG_PATH, maxBytes=5_000_000, backupCount=5),
    ],
)
logger = logging.getLogger("pipeline")

STEERING_PROMPT = os.getenv(
    "STEERING_PROMPT",
    "Focus on day-over-day changes and any anomalies or outliers."
)


def run():
    start = datetime.utcnow()
    logger.info(f"Pipeline started at {start.isoformat()}")

    try:
        # 1. Fetch CSV
        csv_path = get_csv()
        logger.info(f"CSV ready at {csv_path}")

        # 2. Analyze (with retry)
        result = analyze_with_retry(csv_path, steering=STEERING_PROMPT)

        # 3. Deliver
        post_to_slack(result)
        send_email(result)

        elapsed = (datetime.utcnow() - start).total_seconds()
        logger.info(f"Pipeline complete in {elapsed:.1f}s")

    except Exception as e:
        logger.error(f"Pipeline failed: {e}", exc_info=True)
        # Re-raise so cron records a non-zero exit code
        sys.exit(1)


if __name__ == "__main__":
    run()

The sys.exit(1) on failure is important for cron. Without it, cron treats every run as successful regardless of what happened.

Step 7: Cron Schedule

Add a crontab entry to run the pipeline every morning at 6:30 AM UTC:

# Edit the crontab
crontab -e

# Run every day at 6:30 AM UTC
30 6 * * * /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1

Common schedule patterns:

# Every weekday at 7:00 AM UTC
0 7 * * 1-5  /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1

# Every Monday (weekly report)
0 7 * * 1    /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1

# Every hour during business hours (Mon-Fri, 8 AM to 6 PM UTC)
0 8-18 * * 1-5  /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1

Use crontab.guru to validate expressions before deploying. Getting the timezone wrong is the most common cron mistake — cron runs in the system timezone, not your local one. On most cloud VMs, that's UTC.

Alternative: GitHub Actions

If you don't control a server, GitHub Actions scheduled workflows are a clean alternative:

# .github/workflows/daily-pipeline.yml
name: Daily CSV Analysis

on:
  schedule:
    - cron: "30 6 * * *"  # 6:30 AM UTC daily
  workflow_dispatch:       # Allow manual runs

jobs:
  analyze:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: pip install requests python-dotenv sendgrid boto3

      - name: Run pipeline
        env:
          CSV_SOURCE: url
          CSV_URL: ${{ secrets.CSV_URL }}
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
          SENDGRID_API_KEY: ${{ secrets.SENDGRID_API_KEY }}
          EMAIL_FROM: ${{ secrets.EMAIL_FROM }}
          EMAIL_TO: ${{ secrets.EMAIL_TO }}
          STEERING_PROMPT: "Focus on day-over-day changes and outliers."
        run: python pipeline.py

Secrets are stored in the repository's Settings > Secrets and variables > Actions panel. This approach means zero server management and free execution for public repositories.

Running Multiple CSVs

If you have more than one dataset — say, one per region, one per product line, or one per client — define your pipeline runs as configuration rather than code:

import json

PIPELINES = json.loads(os.getenv("PIPELINES", "[]"))
# Example PIPELINES env var value:
# [
#   {"name": "APAC Sales", "source": "s3", "bucket": "my-bucket", "key": "apac/daily.csv",
#    "steering": "Focus on regional breakdowns.", "to": "apac-team@company.com"},
#   {"name": "EMEA Sales", "source": "s3", "bucket": "my-bucket", "key": "emea/daily.csv",
#    "steering": "Focus on currency impact.", "to": "emea-team@company.com"}
# ]

def run_all():
    for pipeline_config in PIPELINES:
        logger.info(f"Starting pipeline: {pipeline_config['name']}")
        try:
            if pipeline_config["source"] == "s3":
                csv_path = fetch_s3(pipeline_config["bucket"], pipeline_config["key"])
            elif pipeline_config["source"] == "url":
                csv_path = fetch_url(pipeline_config["url"])
            else:
                csv_path = fetch_local(pipeline_config["path"])

            result = analyze_with_retry(csv_path, steering=pipeline_config.get("steering"))

            if pipeline_config.get("slack_webhook"):
                post_to_slack(result, webhook_url=pipeline_config["slack_webhook"])
            if pipeline_config.get("to"):
                send_email(result, to=pipeline_config["to"].split(","))

        except Exception as e:
            logger.error(f"Pipeline {pipeline_config['name']} failed: {e}", exc_info=True)
            # Continue with remaining pipelines

Running pipelines sequentially is simpler and avoids rate limit pressure. If you need parallel execution, use concurrent.futures.ThreadPoolExecutor with a concurrency limit of 2–3 to avoid hitting DataStoryBot's per-key limits.

What to Monitor

For production use, four things matter:

Run completion. If the log file hasn't been updated since the last expected run, something is wrong. A simple check: if (now - log_mtime) > 2 * schedule_interval, alert.

Exit codes. The sys.exit(1) on failure gives cron a signal. If you're using GitHub Actions, a failed step will mark the workflow run as failed — visible in the Actions tab.

API response times. Log elapsed for each stage. If analyze starts taking 5 minutes instead of 30 seconds, the data or prompt may have changed in a way that makes analysis harder.

Delivery receipts. Log Slack and SendGrid HTTP status codes. A 200 from the webhook doesn't guarantee the message was delivered, but a 4xx guarantees it wasn't.

Try It Interactively First

Before wiring up automation, run the pipeline interactively against your actual CSV in the DataStoryBot playground. This tells you what story angles the API finds, whether the narrative matches what you'd expect, and whether you need a steering prompt to guide it. Debugging a cron job is harder than debugging an interactive session.

What to Read Next

For the full DataStoryBot API reference including all request parameters and response fields, see getting started with the DataStoryBot API.

For production-grade retry and error handling patterns that go beyond what's covered here — including container expiry recovery and rate limit handling — read error handling and retry patterns for data analysis APIs.

For weekly rather than daily report pipelines with SendGrid email formatting details, see automating weekly data reports.

Ready to find your data story?

Upload a CSV and DataStoryBot will uncover the narrative in seconds.

Try DataStoryBot →