Slack and Email Report Bots Powered by Data Analysis API
Build a Slack bot or email automation that takes data attachments, runs them through DataStoryBot, and posts the story back.
Slack and Email Report Bots Powered by Data Analysis API
The DataStoryBot API turns a CSV into a narrative with charts. What you do with that narrative is up to you. Two of the most practical delivery mechanisms are Slack and email — not because they're fancy, but because that's where your stakeholders already live.
This article builds both. The Slack bot listens for CSV file uploads in a channel, runs the analysis, and posts the narrative and charts back as a thread reply. The email automation watches an inbox for messages with CSV attachments, processes them, and replies with the full data story. Both use Python throughout.
Before starting, skim the DataStoryBot API quickstart to understand the three-call flow (upload → analyze → refine). This article assumes you're familiar with it.
The Core Challenge: Async Analysis in a Bot Context
DataStoryBot's /analyze endpoint takes 10–90 seconds to complete. That's fine for a script. It's a problem for bots, which have strict interaction deadlines:
- Slack requires an initial HTTP 200 acknowledgment within 3 seconds of receiving an event, otherwise it retries the delivery.
- Email has no hard deadline, but holding a thread open for 90 seconds is wasteful and fragile.
The pattern for both bots is the same: acknowledge immediately, process in the background, post results when ready.
For Slack, the Bolt SDK handles the acknowledgment automatically if you structure your handlers correctly. For email, you'll run processing in a background thread. In production, you'd push work onto a queue (Redis + RQ, Celery, SQS) and let workers consume it — but for clarity, this article uses threading.Thread directly.
Part 1: The Slack Bot
Setup
Install dependencies:
pip install slack-bolt requests
You need a Slack app with these OAuth scopes:
files:read— to download file attachmentschat:write— to post messageschannels:history/groups:history— to read channel events
Create the app at api.slack.com/apps, enable Socket Mode or configure a Request URL for your server, and subscribe to the file_shared event under Event Subscriptions.
Set environment variables:
export SLACK_BOT_TOKEN="xoxb-..."
export SLACK_APP_TOKEN="xapp-..." # only needed for Socket Mode
export DATASTORYBOT_BASE="https://datastory.bot/api"
The DataStoryBot Pipeline
This function is the core of both bots. It takes a local file path and returns a structured result dict:
import requests
import os
BASE_URL = os.environ["DATASTORYBOT_BASE"]
def run_datastorybot(csv_path, steering=None):
"""
Upload a CSV to DataStoryBot, analyze it, refine the top story,
and return the narrative, charts, and story title.
Returns:
{
"title": str,
"narrative": str, # Markdown
"charts": [
{"bytes": bytes, "caption": str, "file_id": str}
]
}
Raises requests.HTTPError on API failures.
"""
# Step 1: Upload
with open(csv_path, "rb") as f:
resp = requests.post(f"{BASE_URL}/upload", files={"file": f}, timeout=30)
resp.raise_for_status()
upload_data = resp.json()
container_id = upload_data["containerId"]
# Step 2: Analyze — find story angles (slow, 10–90s)
analyze_payload = {"containerId": container_id}
if steering:
analyze_payload["steeringPrompt"] = steering
resp = requests.post(f"{BASE_URL}/analyze", json=analyze_payload, timeout=120)
resp.raise_for_status()
stories = resp.json()
if not stories:
raise ValueError("DataStoryBot returned no story angles for this dataset.")
# Step 3: Refine the top story into a full narrative
resp = requests.post(
f"{BASE_URL}/refine",
json={"containerId": container_id, "selectedStoryTitle": stories[0]["title"]},
timeout=120,
)
resp.raise_for_status()
report = resp.json()
# Download chart images
charts = []
for chart in report.get("charts", []):
img_resp = requests.get(
f"{BASE_URL}/files/{container_id}/{chart['fileId']}",
timeout=30,
)
img_resp.raise_for_status()
charts.append({
"bytes": img_resp.content,
"caption": chart.get("caption", ""),
"file_id": chart["fileId"],
})
return {
"title": stories[0]["title"],
"narrative": report["narrative"],
"charts": charts,
}
The Slack Bot Handler
import threading
import tempfile
import os
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
app = App(token=os.environ["SLACK_BOT_TOKEN"])
def process_and_post(client, channel_id, thread_ts, file_info):
"""
Downloads the Slack file, runs DataStoryBot, and posts results.
Runs in a background thread so the event handler can return immediately.
"""
# Download the file from Slack
file_url = file_info.get("url_private_download")
if not file_url:
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=":warning: Could not get download URL for the file.",
)
return
headers = {"Authorization": f"Bearer {os.environ['SLACK_BOT_TOKEN']}"}
dl_resp = requests.get(file_url, headers=headers, timeout=30)
if dl_resp.status_code != 200:
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=f":warning: Failed to download file (HTTP {dl_resp.status_code}).",
)
return
# Save to a temp file
suffix = ".csv"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(dl_resp.content)
tmp_path = tmp.name
try:
# Let the channel know analysis has started
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=":mag: Analyzing your data... this takes 30–90 seconds.",
)
result = run_datastorybot(tmp_path)
# Post the narrative as a thread reply
narrative_text = f"*{result['title']}*\n\n{result['narrative']}"
# Slack has a 40,000 char limit per message; truncate if needed
if len(narrative_text) > 3900:
narrative_text = narrative_text[:3900] + "\n\n_[narrative truncated]_"
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=narrative_text,
)
# Upload each chart as a file in the thread
for i, chart in enumerate(result["charts"]):
caption = chart["caption"] or f"Chart {i + 1}"
client.files_upload_v2(
channel=channel_id,
thread_ts=thread_ts,
content=chart["bytes"],
filename=f"chart_{i + 1}.png",
title=caption,
)
except requests.HTTPError as e:
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=f":x: DataStoryBot API error: {e.response.status_code} — {e.response.text[:200]}",
)
except Exception as e:
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=f":x: Analysis failed: {str(e)[:300]}",
)
finally:
os.unlink(tmp_path)
@app.event("file_shared")
def handle_file_shared(event, client, say):
"""
Fires when a file is shared in a channel the bot is in.
Acknowledge immediately; hand off processing to a background thread.
"""
file_id = event.get("file_id")
channel_id = event.get("channel_id")
# Fetch full file metadata
file_info_resp = client.files_info(file=file_id)
file_info = file_info_resp["file"]
# Only process CSV files
if file_info.get("filetype") not in ("csv", "text/csv") and not file_info.get(
"name", ""
).endswith(".csv"):
return
# The message_ts from file_shared is the thread anchor
message_ts = file_info.get("shares", {})
# Grab the first public or private channel share timestamp
thread_ts = None
for share_type in ("public", "private"):
shares = message_ts.get(share_type, {}).get(channel_id, [])
if shares:
thread_ts = shares[0]["ts"]
break
if not thread_ts:
return # Can't find the message timestamp; skip
# Spin up background processing — never block the event handler
t = threading.Thread(
target=process_and_post,
args=(client, channel_id, thread_ts, file_info),
daemon=True,
)
t.start()
if __name__ == "__main__":
handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
handler.start()
How It Behaves
When a user drops a CSV into the channel:
- The
file_sharedevent fires. The handler acknowledges it (Slack sees a 200) and spawns a thread. - The background thread downloads the file, posts
:mag: Analyzing your data..., and callsrun_datastorybot. - When analysis completes, the narrative posts as a thread reply, followed by each chart uploaded as an image file.
- If anything fails, an error message posts in the thread instead of silently dying.
The narrative is posted as plain text with Markdown formatting. Slack renders **bold** natively, so the story title and key findings display cleanly without any conversion.
Handling Large Files
Slack's file size limit for uploads is 1 GB, but DataStoryBot works best with files under 50 MB. Add a size check before spawning the thread:
file_size = file_info.get("size", 0)
MAX_BYTES = 50 * 1024 * 1024 # 50 MB
if file_size > MAX_BYTES:
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=f":warning: File is {file_size // (1024*1024)} MB. Max supported size is 50 MB.",
)
return
Part 2: The Email Automation
Setup
The email bot uses only the Python standard library for IMAP/SMTP plus requests:
pip install requests
Configure your mail server credentials. This example uses Gmail with an App Password, but any IMAP/SMTP server works with the same structure:
export EMAIL_ADDRESS="reports@yourcompany.com"
export EMAIL_PASSWORD="your-app-password"
export IMAP_HOST="imap.gmail.com"
export SMTP_HOST="smtp.gmail.com"
export SMTP_PORT="587"
export DATASTORYBOT_BASE="https://datastory.bot/api"
Email Parsing
import imaplib
import email
import os
import tempfile
import threading
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import time
import markdown # pip install markdown
IMAP_HOST = os.environ["IMAP_HOST"]
SMTP_HOST = os.environ["SMTP_HOST"]
SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))
EMAIL_ADDRESS = os.environ["EMAIL_ADDRESS"]
EMAIL_PASSWORD = os.environ["EMAIL_PASSWORD"]
def fetch_unread_csv_emails():
"""
Connect to IMAP, find unread emails with CSV attachments,
return a list of dicts with sender, subject, message_id, and attachment bytes.
"""
results = []
with imaplib.IMAP4_SSL(IMAP_HOST) as imap:
imap.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
imap.select("INBOX")
# Search for unread messages
status, message_ids = imap.search(None, "UNSEEN")
if status != "OK" or not message_ids[0]:
return results
for msg_id in message_ids[0].split():
status, data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
sender = msg.get("From", "")
subject = msg.get("Subject", "")
message_id = msg.get("Message-ID", "")
csv_attachments = []
for part in msg.walk():
content_type = part.get_content_type()
filename = part.get_filename() or ""
is_csv = (
content_type in ("text/csv", "application/csv", "application/octet-stream")
and filename.endswith(".csv")
) or filename.endswith(".csv")
if is_csv:
csv_attachments.append({
"filename": filename,
"bytes": part.get_payload(decode=True),
})
if csv_attachments:
# Mark as seen so we don't reprocess
imap.store(msg_id, "+FLAGS", "\\Seen")
results.append({
"sender": sender,
"subject": subject,
"message_id": message_id,
"attachments": csv_attachments,
})
return results
def send_reply(to_address, subject, in_reply_to, narrative_html, charts):
"""
Send an HTML email reply with charts embedded inline.
"""
msg = MIMEMultipart("related")
msg["From"] = EMAIL_ADDRESS
msg["To"] = to_address
msg["Subject"] = f"Re: {subject}" if not subject.startswith("Re:") else subject
msg["In-Reply-To"] = in_reply_to
msg["References"] = in_reply_to
# Build HTML body with inline chart references
chart_img_tags = ""
for i, chart in enumerate(charts):
cid = f"chart_{i}"
caption = chart["caption"] or f"Chart {i + 1}"
chart_img_tags += (
f'<figure style="margin: 24px 0;">'
f'<img src="cid:{cid}" alt="{caption}" style="max-width:100%;border-radius:4px;">'
f'<figcaption style="font-size:13px;color:#666;margin-top:6px;">{caption}</figcaption>'
f"</figure>\n"
)
html_body = f"""
<html><body style="font-family:system-ui,sans-serif;max-width:680px;margin:0 auto;padding:24px;color:#1a1a1a;">
{narrative_html}
{chart_img_tags}
<hr style="margin-top:40px;border:none;border-top:1px solid #e0e0e0;">
<p style="font-size:12px;color:#888;">Generated by <a href="https://datastory.bot">DataStoryBot</a></p>
</body></html>
"""
alt_part = MIMEMultipart("alternative")
alt_part.attach(MIMEText("See HTML version for the data story and charts.", "plain"))
alt_part.attach(MIMEText(html_body, "html"))
msg.attach(alt_part)
# Attach chart images
for i, chart in enumerate(charts):
img = MIMEImage(chart["bytes"], _subtype="png")
img.add_header("Content-ID", f"<chart_{i}>")
img.add_header("Content-Disposition", "inline", filename=f"chart_{i + 1}.png")
msg.attach(img)
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.starttls()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
smtp.sendmail(EMAIL_ADDRESS, to_address, msg.as_string())
def process_email(email_data):
"""
Process a single email's first CSV attachment through DataStoryBot
and send the reply. Designed to run in a background thread.
"""
attachment = email_data["attachments"][0] # process first CSV only
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(attachment["bytes"])
tmp_path = tmp.name
try:
result = run_datastorybot(tmp_path)
# Convert Markdown narrative to HTML
narrative_html = markdown.markdown(
f"## {result['title']}\n\n{result['narrative']}",
extensions=["tables", "fenced_code"],
)
send_reply(
to_address=email_data["sender"],
subject=email_data["subject"],
in_reply_to=email_data["message_id"],
narrative_html=narrative_html,
charts=result["charts"],
)
print(f"Replied to {email_data['sender']} with story: {result['title']}")
except Exception as e:
# Send a failure notice rather than silently dropping the email
error_msg = MIMEText(
f"DataStoryBot could not analyze the attached file.\n\nError: {str(e)}", "plain"
)
error_msg["From"] = EMAIL_ADDRESS
error_msg["To"] = email_data["sender"]
error_msg["Subject"] = f"Re: {email_data['subject']} — analysis failed"
error_msg["In-Reply-To"] = email_data["message_id"]
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as smtp:
smtp.starttls()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
smtp.sendmail(EMAIL_ADDRESS, email_data["sender"], error_msg.as_string())
finally:
os.unlink(tmp_path)
def poll_inbox(interval_seconds=60):
"""
Poll the inbox every `interval_seconds`. For each unread email
with a CSV attachment, spin up a background thread to process it.
"""
print(f"Polling {EMAIL_ADDRESS} every {interval_seconds}s...")
while True:
try:
emails = fetch_unread_csv_emails()
for email_data in emails:
print(f"Processing email from {email_data['sender']}: {email_data['subject']}")
t = threading.Thread(target=process_email, args=(email_data,), daemon=True)
t.start()
except Exception as e:
print(f"Inbox poll error: {e}")
time.sleep(interval_seconds)
if __name__ == "__main__":
poll_inbox(interval_seconds=30)
How It Behaves
- The poller checks the inbox every 30 seconds.
- Any unread email with a
.csvattachment is immediately marked as seen (preventing double-processing) and handed to a background thread. - The background thread downloads and analyzes the CSV, converts the Markdown narrative to HTML, and sends a properly threaded reply with charts embedded inline.
- If analysis fails, the sender gets an error reply rather than silence.
Sender Allowlisting
You probably don't want to process every CSV that lands in the inbox. Add a simple allowlist check before spawning the thread:
ALLOWED_SENDERS = {
"alice@yourcompany.com",
"analytics-team@yourcompany.com",
}
# Extract the email address from the From header
import re
def extract_address(from_header):
match = re.search(r"<(.+?)>", from_header)
return match.group(1).lower() if match else from_header.strip().lower()
sender_address = extract_address(email_data["sender"])
if sender_address not in ALLOWED_SENDERS:
print(f"Skipping email from unlisted sender: {sender_address}")
continue
Steering the Analysis
Both bots benefit from steering prompts when the audience has known interests. The steeringPrompt parameter guides DataStoryBot toward specific angles rather than letting it pick freely.
For the Slack bot, you can parse the file comment for a steering directive. When a user uploads a CSV and adds a comment like analyze: focus on revenue by region, extract that and pass it to run_datastorybot:
# In handle_file_shared, check the file's initial comment
initial_comment = file_info.get("initial_comment", {}).get("comment", "")
steering = None
if initial_comment.lower().startswith("analyze:"):
steering = initial_comment[len("analyze:"):].strip()
threading.Thread(
target=process_and_post,
args=(client, channel_id, thread_ts, file_info, steering),
daemon=True,
).start()
For the email bot, parse the email body for the same pattern:
def extract_steering_from_body(msg):
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
for line in body.splitlines():
if line.lower().startswith("analyze:"):
return line[len("analyze:"):].strip()
return None
Production Considerations
Thread pool limits. threading.Thread has no concurrency ceiling. If ten users drop CSVs simultaneously, ten threads each hold an open HTTP connection to DataStoryBot for up to 90 seconds. For low-volume internal bots this is fine. For anything public-facing, use a thread pool executor or a proper task queue.
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=5)
executor.submit(process_and_post, client, channel_id, thread_ts, file_info)
Retries. The run_datastorybot function uses raise_for_status() which surfaces HTTP errors immediately. In production, wrap the analyze call in a retry loop with exponential backoff — transient 5xx errors from the API are possible under load. See the error handling guide for a reusable retry decorator.
Container TTL. DataStoryBot containers expire 20 minutes after creation. The upload, analyze, and refine calls all happen within a single run_datastorybot invocation, so you'll never hit the TTL limit within a single request. You would hit it only if you tried to cache a containerId across multiple requests, which this code does not do.
File cleanup. Both bots write CSV bytes to a tempfile.NamedTemporaryFile and delete it in finally. On Linux, unlink removes the file immediately. If your process crashes mid-analysis, the temp file is orphaned in /tmp. A cron job that deletes /tmp/*.csv files older than an hour is good hygiene.
Next Steps
The bots in this article are synchronous — they process one story angle and post it. A natural extension is to post all three story angles as Slack buttons and let the user pick which one to refine. That requires Slack's Block Kit interactive components and a separate action handler, but the DataStoryBot calls are identical.
For scheduled rather than on-demand analysis, see Automating Weekly Data Reports, which builds a full cron pipeline with SendGrid delivery. If you're ingesting CSVs at volume from an automated source rather than human uploads, the CSV analysis pipeline guide covers batching and deduplication patterns.
Ready to find your data story?
Upload a CSV and DataStoryBot will uncover the narrative in seconds.
Try DataStoryBot →