Build a CSV Analysis Pipeline with DataStoryBot and Cron
Set up a recurring job that ingests daily CSV exports, generates data stories via DataStoryBot's API, and posts results to Slack or email.
Build a CSV Analysis Pipeline with DataStoryBot and Cron
Most data analysis workflows are still manual. Someone exports a CSV, opens a notebook, writes some pandas, produces a chart, and pastes it into Slack. This works when you have one dataset and one person. It doesn't work when you have ten datasets, five audiences, and the person is on vacation.
This article builds a complete unattended pipeline: a cron job fires on a schedule, pulls one or more CSV files from a data source, sends each through the DataStoryBot API to generate a narrative with charts, and delivers results to Slack or email. The pipeline handles retries, logs every run, and fails loudly if something goes wrong.
By the end, you'll have a production-ready Python script and a crontab entry (or GitHub Actions equivalent) to run it automatically.
Architecture
The pipeline has five stages:
[Cron] → [Fetch CSVs] → [Upload to DataStoryBot] → [Analyze + Refine] → [Deliver to Slack / Email]
Each stage is a function. Failures are isolated — a network error fetching from S3 doesn't look the same as a DataStoryBot 500, and neither looks like a Slack webhook failure. This separation makes debugging straightforward when a run fails at 6 AM.
Prerequisites
- Python 3.8+ with
requestsandpython-dotenvinstalled - A Slack incoming webhook URL (optional) — free, available from the Slack App directory
- A SendGrid API key (optional) — free tier handles 100 emails/day
- A CSV source: a local directory, an S3 bucket, a data warehouse export, or a URL
No DataStoryBot API key is required during the current open beta.
Project Layout
csv_pipeline/
├── pipeline.py # Main script
├── deliver.py # Slack and email delivery functions
├── .env # Secrets (never commit this)
├── requirements.txt
└── logs/
└── pipeline.log
Step 1: Fetch the CSV
In production, CSVs usually come from one of three places: a local directory where an export lands, an S3 bucket, or a database query. This example covers all three with a pluggable fetcher pattern.
import os
import requests
import boto3
def fetch_local(path):
"""Return path directly if it's a local file."""
if not os.path.exists(path):
raise FileNotFoundError(f"CSV not found: {path}")
if os.path.getsize(path) == 0:
raise ValueError(f"CSV is empty: {path}")
return path
def fetch_s3(bucket, key, local_dest="/tmp/pipeline_input.csv"):
"""Download a CSV from S3 to a local temp file."""
s3 = boto3.client("s3")
s3.download_file(bucket, key, local_dest)
return local_dest
def fetch_url(url, local_dest="/tmp/pipeline_input.csv"):
"""Download a CSV from a URL."""
resp = requests.get(url, timeout=60, stream=True)
resp.raise_for_status()
with open(local_dest, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
return local_dest
For the pipeline entry point, you configure which fetcher to use via environment variables. This keeps the script agnostic to the source:
import os
from dotenv import load_dotenv
load_dotenv()
CSV_SOURCE = os.getenv("CSV_SOURCE", "local") # "local", "s3", or "url"
CSV_PATH = os.getenv("CSV_PATH", "/data/daily_metrics.csv")
S3_BUCKET = os.getenv("S3_BUCKET")
S3_KEY = os.getenv("S3_KEY")
CSV_URL = os.getenv("CSV_URL")
def get_csv():
if CSV_SOURCE == "local":
return fetch_local(CSV_PATH)
elif CSV_SOURCE == "s3":
return fetch_s3(S3_BUCKET, S3_KEY)
elif CSV_SOURCE == "url":
return fetch_url(CSV_URL)
else:
raise ValueError(f"Unknown CSV_SOURCE: {CSV_SOURCE}")
Step 2: Upload, Analyze, and Refine
This is the DataStoryBot three-call flow described in the getting started guide, wrapped in a function that returns structured output. The function handles the full pipeline — upload, discover story angles, refine the top story, and download all charts.
import time
import logging
BASE_URL = "https://datastory.bot/api"
logger = logging.getLogger("pipeline")
def analyze_csv(csv_path, steering=None, timeout=300):
"""
Run the full DataStoryBot pipeline for a single CSV.
Returns a dict: {title, narrative, charts: [{bytes, caption}]}.
"""
# --- Upload ---
logger.info(f"Uploading {csv_path}")
with open(csv_path, "rb") as f:
upload_resp = requests.post(
f"{BASE_URL}/upload",
files={"file": (os.path.basename(csv_path), f, "text/csv")},
timeout=60,
)
upload_resp.raise_for_status()
upload_data = upload_resp.json()
container_id = upload_data["containerId"]
meta = upload_data["metadata"]
logger.info(
f"Uploaded: {meta['fileName']} — "
f"{meta['rowCount']} rows, {meta['columnCount']} cols"
)
# --- Analyze ---
analyze_payload = {"containerId": container_id}
if steering:
analyze_payload["steeringPrompt"] = steering
logger.info("Running analysis...")
analyze_resp = requests.post(
f"{BASE_URL}/analyze",
json=analyze_payload,
timeout=timeout,
)
analyze_resp.raise_for_status()
stories = analyze_resp.json()
if not stories:
raise RuntimeError("Analysis returned no stories. Check the CSV has analyzable data.")
logger.info(f"Found {len(stories)} story angles:")
for s in stories:
logger.info(f" - {s['title']}")
# --- Refine the top story ---
logger.info(f"Refining: {stories[0]['title']}")
refine_resp = requests.post(
f"{BASE_URL}/refine",
json={
"containerId": container_id,
"selectedStoryTitle": stories[0]["title"],
},
timeout=timeout,
)
refine_resp.raise_for_status()
report = refine_resp.json()
# --- Download charts ---
chart_images = []
for chart in report.get("charts", []):
img_resp = requests.get(
f"{BASE_URL}/files/{container_id}/{chart['fileId']}",
timeout=60,
)
img_resp.raise_for_status()
chart_images.append({
"bytes": img_resp.content,
"caption": chart["caption"],
"file_id": chart["fileId"],
})
logger.info(f"Downloaded chart: {chart['caption']}")
return {
"title": stories[0]["title"],
"narrative": report["narrative"],
"charts": chart_images,
"container_id": container_id,
}
Steering the Analysis
If you know what your stakeholders care about, pass a steeringPrompt to guide the analysis. For a daily sales report, that might look like:
result = analyze_csv(
csv_path,
steering=(
"Focus on day-over-day changes. Flag any metric that moved more than 10% "
"compared to yesterday. Highlight the top-performing and worst-performing segments."
),
)
For more on shaping the output, see steering prompts for controlled analysis.
Step 3: Retry Wrapper
Cron jobs fail silently. Wrapping the analysis in a retry loop means transient network errors or brief API unavailability don't kill the pipeline — and when it does fail permanently, you get a clear error to work with. Full retry logic for DataStoryBot is covered in error handling and retry patterns for data analysis APIs. Here's the focused version for this pipeline:
import time
import random
def analyze_with_retry(csv_path, steering=None, max_attempts=3):
"""
Retry analyze_csv on transient failures with exponential backoff.
Raises on the last attempt so the caller knows the run failed.
"""
last_error = None
for attempt in range(max_attempts):
try:
return analyze_csv(csv_path, steering=steering)
except requests.HTTPError as e:
status = e.response.status_code if e.response else 0
# Don't retry permanent errors
if status in (400, 401, 413, 422):
raise
last_error = e
except requests.RequestException as e:
last_error = e
if attempt < max_attempts - 1:
delay = (2 ** attempt) + random.uniform(0, 1)
logger.warning(
f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s: {last_error}"
)
time.sleep(delay)
raise RuntimeError(
f"Pipeline failed after {max_attempts} attempts: {last_error}"
)
Step 4: Deliver to Slack
Slack is the fastest way to get analysis results in front of a team. Use an incoming webhook to post the narrative text and attach chart images.
import json
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
def post_to_slack(result):
"""Post the narrative and first chart to a Slack channel via webhook."""
if not SLACK_WEBHOOK_URL:
logger.info("SLACK_WEBHOOK_URL not set — skipping Slack delivery")
return
# Trim the narrative for Slack's 3000-char block limit
narrative = result["narrative"]
if len(narrative) > 2800:
narrative = narrative[:2800] + "\n\n_(truncated — see email for full report)_"
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": result["title"],
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": narrative,
}
},
]
}
resp = requests.post(
SLACK_WEBHOOK_URL,
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
timeout=30,
)
resp.raise_for_status()
logger.info("Slack notification sent")
If you want to post chart images to Slack, use the Slack Files API (files.upload) with the chart bytes. The webhook approach above is simpler and sufficient for narrative-only delivery. For most teams, narrative-in-Slack plus charts-in-email is the right split.
Step 5: Deliver via Email
For charts and formatted narratives, email is more appropriate than Slack. This uses SendGrid with inline chart images — the same approach covered in automating weekly data reports, adapted here for daily pipelines:
import base64
import markdown as md_lib
from datetime import date
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import (
Mail, Attachment, FileContent, FileName,
FileType, Disposition, ContentId, Content, MimeType,
)
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
EMAIL_FROM = os.getenv("EMAIL_FROM", "reports@yourcompany.com")
EMAIL_TO = os.getenv("EMAIL_TO", "").split(",") # Comma-separated list
def send_email(result):
"""Send the narrative and charts as an HTML email via SendGrid."""
if not SENDGRID_API_KEY or not EMAIL_TO:
logger.info("SendGrid config missing — skipping email delivery")
return
html_narrative = md_lib.markdown(
result["narrative"],
extensions=["tables", "fenced_code"],
)
charts_html = ""
for chart in result["charts"]:
cid = chart["file_id"]
charts_html += f"""
<div style="background:#141414;padding:16px;border-radius:8px;margin:16px 0;">
<img src="cid:{cid}" alt="{chart['caption']}" width="600"
style="max-width:100%;height:auto;" />
<p style="color:#999;font-size:13px;margin-top:8px;">{chart['caption']}</p>
</div>
"""
html_body = f"""
<html>
<body style="font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;
color:#333;max-width:680px;margin:0 auto;padding:24px;">
<h1 style="font-size:22px;color:#111;">{result['title']}</h1>
{html_narrative}
<h2 style="font-size:18px;color:#111;margin-top:32px;">Charts</h2>
{charts_html}
<hr style="border:none;border-top:1px solid #eee;margin:32px 0;" />
<p style="font-size:12px;color:#999;">
Generated by <a href="https://datastory.bot">DataStoryBot</a>
on {date.today().isoformat()}.
</p>
</body>
</html>
"""
message = Mail(
from_email=EMAIL_FROM,
to_emails=[e.strip() for e in EMAIL_TO if e.strip()],
subject=f"Daily Data Report: {result['title']}",
html_content=Content(MimeType.html, html_body),
)
for chart in result["charts"]:
encoded = base64.b64encode(chart["bytes"]).decode("utf-8")
message.attachment = Attachment(
FileContent(encoded),
FileName(f"{chart['file_id']}.png"),
FileType("image/png"),
Disposition("inline"),
ContentId(chart["file_id"]),
)
sg = SendGridAPIClient(SENDGRID_API_KEY)
resp = sg.send(message)
logger.info(f"Email sent: HTTP {resp.status_code}")
Step 6: The Main Entry Point
Tie everything together in pipeline.py. Logging goes to stdout (captured by cron to a log file) and to a rotating file handler:
#!/usr/bin/env python3
"""
Daily CSV analysis pipeline.
Fetches a CSV, analyzes it with DataStoryBot, and delivers results to Slack and email.
"""
import logging
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler
# --- Logging setup ---
LOG_PATH = os.getenv("LOG_PATH", "logs/pipeline.log")
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
RotatingFileHandler(LOG_PATH, maxBytes=5_000_000, backupCount=5),
],
)
logger = logging.getLogger("pipeline")
STEERING_PROMPT = os.getenv(
"STEERING_PROMPT",
"Focus on day-over-day changes and any anomalies or outliers."
)
def run():
start = datetime.utcnow()
logger.info(f"Pipeline started at {start.isoformat()}")
try:
# 1. Fetch CSV
csv_path = get_csv()
logger.info(f"CSV ready at {csv_path}")
# 2. Analyze (with retry)
result = analyze_with_retry(csv_path, steering=STEERING_PROMPT)
# 3. Deliver
post_to_slack(result)
send_email(result)
elapsed = (datetime.utcnow() - start).total_seconds()
logger.info(f"Pipeline complete in {elapsed:.1f}s")
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
# Re-raise so cron records a non-zero exit code
sys.exit(1)
if __name__ == "__main__":
run()
The sys.exit(1) on failure is important for cron. Without it, cron treats every run as successful regardless of what happened.
Step 7: Cron Schedule
Add a crontab entry to run the pipeline every morning at 6:30 AM UTC:
# Edit the crontab
crontab -e
# Run every day at 6:30 AM UTC
30 6 * * * /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1
Common schedule patterns:
# Every weekday at 7:00 AM UTC
0 7 * * 1-5 /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1
# Every Monday (weekly report)
0 7 * * 1 /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1
# Every hour during business hours (Mon-Fri, 8 AM to 6 PM UTC)
0 8-18 * * 1-5 /usr/bin/python3 /opt/pipeline/pipeline.py >> /var/log/pipeline.log 2>&1
Use crontab.guru to validate expressions before deploying. Getting the timezone wrong is the most common cron mistake — cron runs in the system timezone, not your local one. On most cloud VMs, that's UTC.
Alternative: GitHub Actions
If you don't control a server, GitHub Actions scheduled workflows are a clean alternative:
# .github/workflows/daily-pipeline.yml
name: Daily CSV Analysis
on:
schedule:
- cron: "30 6 * * *" # 6:30 AM UTC daily
workflow_dispatch: # Allow manual runs
jobs:
analyze:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install requests python-dotenv sendgrid boto3
- name: Run pipeline
env:
CSV_SOURCE: url
CSV_URL: ${{ secrets.CSV_URL }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SENDGRID_API_KEY: ${{ secrets.SENDGRID_API_KEY }}
EMAIL_FROM: ${{ secrets.EMAIL_FROM }}
EMAIL_TO: ${{ secrets.EMAIL_TO }}
STEERING_PROMPT: "Focus on day-over-day changes and outliers."
run: python pipeline.py
Secrets are stored in the repository's Settings > Secrets and variables > Actions panel. This approach means zero server management and free execution for public repositories.
Running Multiple CSVs
If you have more than one dataset — say, one per region, one per product line, or one per client — define your pipeline runs as configuration rather than code:
import json
PIPELINES = json.loads(os.getenv("PIPELINES", "[]"))
# Example PIPELINES env var value:
# [
# {"name": "APAC Sales", "source": "s3", "bucket": "my-bucket", "key": "apac/daily.csv",
# "steering": "Focus on regional breakdowns.", "to": "apac-team@company.com"},
# {"name": "EMEA Sales", "source": "s3", "bucket": "my-bucket", "key": "emea/daily.csv",
# "steering": "Focus on currency impact.", "to": "emea-team@company.com"}
# ]
def run_all():
for pipeline_config in PIPELINES:
logger.info(f"Starting pipeline: {pipeline_config['name']}")
try:
if pipeline_config["source"] == "s3":
csv_path = fetch_s3(pipeline_config["bucket"], pipeline_config["key"])
elif pipeline_config["source"] == "url":
csv_path = fetch_url(pipeline_config["url"])
else:
csv_path = fetch_local(pipeline_config["path"])
result = analyze_with_retry(csv_path, steering=pipeline_config.get("steering"))
if pipeline_config.get("slack_webhook"):
post_to_slack(result, webhook_url=pipeline_config["slack_webhook"])
if pipeline_config.get("to"):
send_email(result, to=pipeline_config["to"].split(","))
except Exception as e:
logger.error(f"Pipeline {pipeline_config['name']} failed: {e}", exc_info=True)
# Continue with remaining pipelines
Running pipelines sequentially is simpler and avoids rate limit pressure. If you need parallel execution, use concurrent.futures.ThreadPoolExecutor with a concurrency limit of 2–3 to avoid hitting DataStoryBot's per-key limits.
What to Monitor
For production use, four things matter:
Run completion. If the log file hasn't been updated since the last expected run, something is wrong. A simple check: if (now - log_mtime) > 2 * schedule_interval, alert.
Exit codes. The sys.exit(1) on failure gives cron a signal. If you're using GitHub Actions, a failed step will mark the workflow run as failed — visible in the Actions tab.
API response times. Log elapsed for each stage. If analyze starts taking 5 minutes instead of 30 seconds, the data or prompt may have changed in a way that makes analysis harder.
Delivery receipts. Log Slack and SendGrid HTTP status codes. A 200 from the webhook doesn't guarantee the message was delivered, but a 4xx guarantees it wasn't.
Try It Interactively First
Before wiring up automation, run the pipeline interactively against your actual CSV in the DataStoryBot playground. This tells you what story angles the API finds, whether the narrative matches what you'd expect, and whether you need a steering prompt to guide it. Debugging a cron job is harder than debugging an interactive session.
What to Read Next
For the full DataStoryBot API reference including all request parameters and response fields, see getting started with the DataStoryBot API.
For production-grade retry and error handling patterns that go beyond what's covered here — including container expiry recovery and rate limit handling — read error handling and retry patterns for data analysis APIs.
For weekly rather than daily report pipelines with SendGrid email formatting details, see automating weekly data reports.
Ready to find your data story?
Upload a CSV and DataStoryBot will uncover the narrative in seconds.
Try DataStoryBot →