general14 min read

Multi-Dataset Reports: Combining Stories from Multiple CSVs

Pattern for analyzing multiple related CSVs separately and assembling the narratives into a cohesive multi-section report.

By DataStoryBot Team

Multi-Dataset Reports: Combining Stories from Multiple CSVs

Monthly business reviews involve data from everywhere: sales closed this month, marketing spend and pipeline by channel, support ticket volume and resolution times. These datasets live in different systems, have different schemas, and tell different parts of the same story.

The naive approach is to upload all the data into one giant CSV and hope the analysis holds together. It rarely does. Mixing revenue transactions with support tickets in a single file creates a mess of unrelated columns, forces the model to infer which metrics relate to which business function, and produces vague narratives that try to bridge incommensurable things.

A better pattern: analyze each CSV separately, extract the best narrative from each, then assemble the narratives into a single report with an editorial layer that connects them. This article walks through that pattern in full — from parallel API calls to the executive summary.

The Architecture

sales.csv     → DataStoryBot API → sales narrative + charts
marketing.csv → DataStoryBot API → marketing narrative + charts  ─→ combined report
support.csv   → DataStoryBot API → support narrative + charts

Each dataset gets its own upload, analyze, and refine cycle. The results are then stitched together in a deterministic assembly step. The API calls for different datasets are independent, which means you can run them in parallel.

Step 1: The Reusable Analysis Function

Start with a function that runs the full three-step DataStoryBot pipeline for a single CSV and returns a normalized result dict:

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

BASE_URL = "https://datastory.bot/api"

def analyze_dataset(name: str, csv_path: str, steering: str = None) -> dict:
    """
    Run the full DataStoryBot pipeline on a single CSV.
    Returns a dict with name, title, narrative, and chart paths.
    """
    # Upload
    with open(csv_path, "rb") as f:
        upload_resp = requests.post(f"{BASE_URL}/upload", files={"file": f})
        upload_resp.raise_for_status()

    container_id = upload_resp.json()["containerId"]

    # Analyze — discover story angles
    analyze_payload = {"containerId": container_id}
    if steering:
        analyze_payload["steeringPrompt"] = steering

    analyze_resp = requests.post(f"{BASE_URL}/analyze", json=analyze_payload)
    analyze_resp.raise_for_status()
    angles = analyze_resp.json()

    # Refine the top story
    refine_resp = requests.post(f"{BASE_URL}/refine", json={
        "containerId": container_id,
        "selectedStoryTitle": angles[0]["title"]
    })
    refine_resp.raise_for_status()
    result = refine_resp.json()

    # Download charts to disk
    charts = []
    for i, chart in enumerate(result.get("charts", [])):
        img_resp = requests.get(f"{BASE_URL}/files/{container_id}/{chart['fileId']}")
        img_resp.raise_for_status()
        path = f"/tmp/{name}_chart_{i+1}.png"
        with open(path, "wb") as f:
            f.write(img_resp.content)
        charts.append({"path": path, "caption": chart["caption"]})

    return {
        "name": name,
        "title": angles[0]["title"],
        "narrative": result["narrative"],
        "charts": charts,
    }

The name parameter is a logical label — "sales", "marketing", "support" — used later for ordering and HTML anchors. The function is self-contained and produces the same output shape regardless of the underlying CSV schema.

Step 2: Parallel API Calls

Each dataset requires three sequential API calls internally, but the calls across different datasets are fully independent. Run them in parallel using ThreadPoolExecutor:

# Define the datasets and their steering prompts
DATASETS = [
    {
        "name": "sales",
        "path": "/data/exports/sales_march_2026.csv",
        "steering": (
            "Focus on revenue attainment vs. target, deal velocity, "
            "and top-performing reps or segments. Month-over-month comparisons if available."
        )
    },
    {
        "name": "marketing",
        "path": "/data/exports/marketing_march_2026.csv",
        "steering": (
            "Focus on pipeline generated by channel, cost per lead, "
            "and conversion rates through the funnel. Identify the highest-ROI channel."
        )
    },
    {
        "name": "support",
        "path": "/data/exports/support_march_2026.csv",
        "steering": (
            "Focus on ticket volume trends, resolution time by category, "
            "and any emerging issue types. Flag anything that changed significantly this month."
        )
    },
]

def run_parallel_analyses(datasets: list) -> dict:
    """
    Analyze all datasets in parallel.
    Returns a dict keyed by dataset name.
    """
    results = {}

    with ThreadPoolExecutor(max_workers=len(datasets)) as executor:
        future_to_name = {
            executor.submit(
                analyze_dataset,
                ds["name"],
                ds["path"],
                ds.get("steering")
            ): ds["name"]
            for ds in datasets
        }

        for future in as_completed(future_to_name):
            name = future_to_name[future]
            try:
                results[name] = future.result()
                print(f"[{name}] Done: {results[name]['title']}")
            except Exception as e:
                print(f"[{name}] Failed: {e}")
                raise

    return results

# Run it
sections = run_parallel_analyses(DATASETS)

With three datasets, parallel execution cuts wall-clock time by roughly 60-70% compared to running them sequentially. Each DataStoryBot pipeline takes 15-30 seconds; in parallel they all finish in roughly one pipeline's worth of time.

One important constraint: the 20-minute container TTL applies per upload. Since each dataset gets its own container, there are no TTL conflicts across parallel runs. Download all chart files during the analyze_dataset call — don't wait until after all futures complete.

Step 3: Ordering and Editorial Logic

Raw outputs from parallel runs come back in completion order, not the order you want. Establish a canonical section order and use it to sequence the assembled report:

# Canonical order for a monthly business review
SECTION_ORDER = ["sales", "marketing", "support"]

def order_sections(results: dict, order: list) -> list:
    """Return sections in the specified order, skipping any that failed."""
    ordered = []
    for name in order:
        if name in results:
            ordered.append(results[name])
        else:
            print(f"Warning: section '{name}' missing from results, skipping.")
    return ordered

ordered_sections = order_sections(sections, SECTION_ORDER)

For a monthly business review, the standard editorial logic is: sales first (the outcome), marketing second (the leading indicators and pipeline that drives future sales), support last (the operational health and product signal). This sequencing creates a natural narrative arc — results, then causes, then implications.

Different report types call for different sequencing:

  • Product review: feature adoption → retention → support tickets
  • Financial review: revenue → cost centers → margin analysis
  • Operations review: throughput → error rates → SLA compliance

The sequencing decision is editorial, not technical. Make it explicit in your configuration rather than relying on alphabetical ordering or API response timing.

Step 4: Generating Transition Text

Each section's narrative is standalone — it doesn't reference the other sections. You need editorial connective tissue between them. The simplest approach generates transitions from the section titles:

def make_transition(from_section: dict, to_section: dict) -> str:
    """Generate a brief transition paragraph between two sections."""
    transitions = {
        ("sales", "marketing"): (
            "The sales results above reflect deal activity that has already closed. "
            "The marketing section below shows what's in the pipeline — the deals that will close "
            "in the coming months."
        ),
        ("marketing", "support"): (
            "Pipeline health tells you where revenue is coming from. "
            "Support data tells you what's happening with the customers already there."
        ),
    }
    key = (from_section["name"], to_section["name"])
    return transitions.get(key, "")

For more dynamic transitions, you can generate them programmatically using the section titles:

def make_dynamic_transition(from_section: dict, to_section: dict) -> str:
    """Build a generic transition using section metadata."""
    return (
        f"*{from_section['title']}* establishes the context for what follows. "
        f"The next section examines the {to_section['name']} picture."
    )

Hardcoded transitions are more polished. Dynamic fallbacks are better than no transitions at all. In practice, use hardcoded transitions for report types you run regularly, and fall back to dynamic generation for ad-hoc combinations.

Step 5: Building the Executive Summary

An executive summary is a separate artifact from the section narratives. It synthesizes across sections — something that requires knowing what each section found. The simplest reliable approach: extract the first paragraph of each narrative (which DataStoryBot writes as a lede that captures the key finding) and combine them under a summary header.

import re

def extract_lede(narrative: str) -> str:
    """Extract the first non-heading paragraph from a markdown narrative."""
    paragraphs = narrative.split("\n\n")
    for para in paragraphs:
        stripped = para.strip()
        # Skip headings and empty paragraphs
        if stripped and not stripped.startswith("#"):
            return stripped
    return ""

def build_executive_summary(sections: list, report_period: str) -> str:
    """
    Assemble an executive summary from section ledes.
    """
    lines = [
        f"## Executive Summary — {report_period}\n",
    ]

    for section in sections:
        lede = extract_lede(section["narrative"])
        if lede:
            lines.append(f"**{section['name'].title()}**: {lede}\n")

    return "\n".join(lines)

exec_summary = build_executive_summary(ordered_sections, "March 2026")

The result is a one-page summary where each line is sourced directly from the AI-generated narrative for that section. It reads consistently because each lede follows the same structural pattern DataStoryBot uses: lead with the key finding, quantify it, provide context.

Step 6: Assembling the Full Report

With ordered sections, transitions, and an executive summary, assembly is a straightforward pass over the data:

import markdown
import base64
from pathlib import Path

def assemble_html_report(
    sections: list,
    exec_summary: str,
    report_title: str,
    report_period: str,
) -> str:
    """
    Combine all sections into a single HTML document.
    Charts are embedded as base64 inline images.
    """

    def section_anchor(name: str) -> str:
        return f"section-{name}"

    # Table of contents
    toc_items = "\n".join(
        f'<li><a href="#{section_anchor(s["name"])}">{s["name"].title()}: {s["title"]}</a></li>'
        for s in sections
    )
    toc_html = f"<ul>{toc_items}</ul>"

    # Convert exec summary to HTML
    exec_html = markdown.markdown(exec_summary, extensions=["extra"])

    # Build section HTML
    section_parts = []
    for i, section in enumerate(sections):
        narrative_html = markdown.markdown(section["narrative"], extensions=["extra"])

        # Embed charts as base64
        charts_html = ""
        for chart in section["charts"]:
            img_bytes = Path(chart["path"]).read_bytes()
            b64 = base64.b64encode(img_bytes).decode()
            charts_html += f"""
            <figure>
                <img src="data:image/png;base64,{b64}" alt="{chart['caption']}">
                <figcaption>{chart['caption']}</figcaption>
            </figure>
            """

        section_parts.append(f"""
        <section id="{section_anchor(section['name'])}">
            <h2>{section['name'].title()}: {section['title']}</h2>
            {narrative_html}
            {charts_html}
        </section>
        """)

        # Add transition if there's a next section
        if i < len(sections) - 1:
            transition = make_transition(sections[i], sections[i + 1])
            if transition:
                section_parts.append(f'<p class="transition">{transition}</p>')

    sections_html = "\n".join(section_parts)

    return f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
  body {{
    font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
    font-size: 11pt;
    line-height: 1.6;
    color: #1a1a1a;
    max-width: 860px;
    margin: 0 auto;
    padding: 40px 24px;
  }}
  .report-header {{
    border-bottom: 3px solid #2563eb;
    padding-bottom: 20px;
    margin-bottom: 32px;
  }}
  .report-header h1 {{ font-size: 26pt; margin: 0 0 8px; }}
  .report-header .period {{ color: #666; font-size: 12pt; }}
  .toc {{ background: #f8fafc; border-radius: 6px; padding: 20px 24px; margin-bottom: 40px; }}
  .toc h3 {{ margin: 0 0 12px; font-size: 12pt; color: #444; }}
  .toc ul {{ margin: 0; padding-left: 20px; }}
  .toc li {{ margin-bottom: 6px; }}
  .exec-summary {{
    background: #f0f4ff;
    border-left: 4px solid #2563eb;
    padding: 20px 24px;
    border-radius: 0 6px 6px 0;
    margin-bottom: 48px;
  }}
  .exec-summary h2 {{ margin-top: 0; font-size: 14pt; color: #1e40af; }}
  section {{ margin-bottom: 56px; padding-bottom: 40px; border-bottom: 1px solid #e5e7eb; }}
  section h2 {{ font-size: 18pt; color: #111; border-bottom: 2px solid #e5e7eb; padding-bottom: 8px; }}
  figure {{ margin: 24px 0; text-align: center; page-break-inside: avoid; }}
  figure img {{ max-width: 100%; border-radius: 4px; }}
  figcaption {{ font-size: 10pt; color: #666; margin-top: 8px; font-style: italic; }}
  .transition {{
    background: #fafafa;
    border-left: 3px solid #d1d5db;
    padding: 12px 16px;
    color: #555;
    font-style: italic;
    margin: 32px 0;
    border-radius: 0 4px 4px 0;
  }}
  blockquote {{
    border-left: 3px solid #2563eb;
    margin-left: 0;
    padding: 12px 16px;
    background: #f8fafc;
    color: #333;
  }}
</style>
</head>
<body>
  <div class="report-header">
    <h1>{report_title}</h1>
    <div class="period">{report_period}</div>
  </div>

  <div class="toc">
    <h3>Contents</h3>
    {toc_html}
  </div>

  <div class="exec-summary">
    {exec_html}
  </div>

  {sections_html}
</body>
</html>"""

Complete Working Example

Putting it all together into a single runnable script:

import requests
import markdown
import base64
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date

BASE_URL = "https://datastory.bot/api"

DATASETS = [
    {
        "name": "sales",
        "path": "/data/exports/sales_march_2026.csv",
        "steering": "Focus on revenue attainment vs. target, deal velocity, and top-performing segments.",
    },
    {
        "name": "marketing",
        "path": "/data/exports/marketing_march_2026.csv",
        "steering": "Focus on pipeline generated by channel, cost per lead, and funnel conversion rates.",
    },
    {
        "name": "support",
        "path": "/data/exports/support_march_2026.csv",
        "steering": "Focus on ticket volume trends, resolution time, and any emerging issue categories.",
    },
]

SECTION_ORDER = ["sales", "marketing", "support"]

TRANSITIONS = {
    ("sales", "marketing"): (
        "The sales figures above show what closed this month. "
        "The marketing section shows what's building in the pipeline."
    ),
    ("marketing", "support"): (
        "Pipeline and acquisition tell one side of the growth story. "
        "Support data tells the other: how existing customers are experiencing the product."
    ),
}


def analyze_dataset(name, csv_path, steering=None):
    with open(csv_path, "rb") as f:
        upload = requests.post(f"{BASE_URL}/upload", files={"file": f})
        upload.raise_for_status()
    container_id = upload.json()["containerId"]

    payload = {"containerId": container_id}
    if steering:
        payload["steeringPrompt"] = steering
    analyze = requests.post(f"{BASE_URL}/analyze", json=payload)
    analyze.raise_for_status()
    angles = analyze.json()

    refine = requests.post(f"{BASE_URL}/refine", json={
        "containerId": container_id,
        "selectedStoryTitle": angles[0]["title"],
    })
    refine.raise_for_status()
    result = refine.json()

    charts = []
    for i, chart in enumerate(result.get("charts", [])):
        img = requests.get(f"{BASE_URL}/files/{container_id}/{chart['fileId']}")
        img.raise_for_status()
        path = f"/tmp/{name}_chart_{i+1}.png"
        Path(path).write_bytes(img.content)
        charts.append({"path": path, "caption": chart["caption"]})

    return {"name": name, "title": angles[0]["title"], "narrative": result["narrative"], "charts": charts}


def extract_lede(narrative):
    for para in narrative.split("\n\n"):
        stripped = para.strip()
        if stripped and not stripped.startswith("#"):
            return stripped
    return ""


def build_report(report_title, report_period):
    # Parallel analysis
    raw_results = {}
    with ThreadPoolExecutor(max_workers=len(DATASETS)) as executor:
        futures = {
            executor.submit(analyze_dataset, ds["name"], ds["path"], ds.get("steering")): ds["name"]
            for ds in DATASETS
        }
        for future in as_completed(futures):
            name = futures[future]
            raw_results[name] = future.result()
            print(f"[{name}] {raw_results[name]['title']}")

    # Order sections
    sections = [raw_results[n] for n in SECTION_ORDER if n in raw_results]

    # Executive summary
    summary_lines = [f"## Executive Summary — {report_period}\n"]
    for s in sections:
        lede = extract_lede(s["narrative"])
        if lede:
            summary_lines.append(f"**{s['name'].title()}**: {lede}\n")
    exec_summary = "\n".join(summary_lines)
    exec_html = markdown.markdown(exec_summary, extensions=["extra"])

    # TOC
    toc = "\n".join(
        f'<li><a href="#section-{s["name"]}">{s["name"].title()}: {s["title"]}</a></li>'
        for s in sections
    )

    # Sections
    section_html_parts = []
    for i, section in enumerate(sections):
        narrative_html = markdown.markdown(section["narrative"], extensions=["extra"])
        charts_html = ""
        for chart in section["charts"]:
            b64 = base64.b64encode(Path(chart["path"]).read_bytes()).decode()
            charts_html += (
                f'<figure><img src="data:image/png;base64,{b64}" alt="{chart["caption"]}">'
                f'<figcaption>{chart["caption"]}</figcaption></figure>'
            )
        section_html_parts.append(
            f'<section id="section-{section["name"]}">'
            f'<h2>{section["name"].title()}: {section["title"]}</h2>'
            f'{narrative_html}{charts_html}</section>'
        )
        if i < len(sections) - 1:
            key = (sections[i]["name"], sections[i + 1]["name"])
            transition = TRANSITIONS.get(key, "")
            if transition:
                section_html_parts.append(f'<p class="transition">{transition}</p>')

    # Write HTML
    output_path = f"/tmp/report_{date.today().isoformat()}.html"
    Path(output_path).write_text(
        f"""<!DOCTYPE html><html><head><meta charset="utf-8">
        <title>{report_title}</title></head><body>
        <h1>{report_title}</h1><p>{report_period}</p>
        <ul>{toc}</ul>
        <div class="exec-summary">{exec_html}</div>
        {"".join(section_html_parts)}
        </body></html>"""
    )
    print(f"Report written: {output_path}")
    return output_path


if __name__ == "__main__":
    build_report("Monthly Business Review", "March 2026")

To convert the HTML to PDF, pipe it through WeasyPrint or Puppeteer — both approaches are covered in detail in PDF data reports from AI: generate, format, distribute.

Handling Failures Gracefully

Parallel pipelines fail independently, which is an advantage. If the support CSV is malformed, the sales and marketing sections still succeed. Build the assembly step to tolerate missing sections:

def order_sections(results: dict, order: list, required: list = None) -> list:
    missing = [n for n in (required or []) if n not in results]
    if missing:
        raise ValueError(f"Required sections missing: {missing}")
    return [results[n] for n in order if n in results]

For a monthly business review, you might declare required=["sales"] — the report doesn't make sense without revenue data — while treating marketing and support as optional enrichment.

For retry logic on transient API failures, see error handling and retry patterns for data APIs.

Scheduling the Multi-Dataset Report

The build function above is self-contained. Schedule it the same way you'd schedule any single-CSV report:

# First business day of each month at 7:00 AM UTC
0 7 1 * * /usr/bin/python3 /opt/reports/monthly_review.py

The main operational difference from a single-CSV report is that all three CSV exports need to be available before the script runs. If your exports land at different times — sales closes at midnight, support data refreshes at 6 AM — set the cron trigger after the latest one arrives.

For complete scheduling patterns including export coordination, see automating weekly data reports.

When to Use This Pattern vs. a Single Combined CSV

Use separate CSVs when:

  • The datasets have different row-level granularity (transactions vs. tickets vs. campaigns)
  • Each dataset tells a standalone story that stands on its own
  • You want distinct narrative sections rather than a single blended analysis
  • You need steering prompts specific to each domain

Use a single combined CSV when:

  • You're genuinely looking for cross-dataset correlations (e.g., does marketing spend directly predict deal velocity this week?)
  • The join key is clean and the result is a flat, coherent table
  • The question is inherently cross-functional ("what's the relationship between support load and churn?")

The pattern in this article is for composing independently meaningful stories. Cross-dataset correlation analysis is a different problem — that requires a deliberate data join before upload, not separate analyses with narrative assembly.

What to Read Next

For the foundational three-step pipeline that each section analysis uses, see how to generate a data report from CSV in one API call.

To convert the assembled HTML report to a distributable PDF, read PDF data reports from AI: generate, format, distribute.

For scheduling the complete pipeline to run automatically on a monthly cadence, see automating weekly data reports.

Ready to find your data story?

Upload a CSV and DataStoryBot will uncover the narrative in seconds.

Try DataStoryBot →