ADR-016: Two-Path Pipeline Principle (Interactive vs Bulk)¶
Status: Accepted (principle); ownership split delivered except one legacy inline stats writer — see "Current state" below Date: 2026-05-05 Deciders: Backend Team Related: ADR-010: Background Job Processing; plan docs/src/implementation-plans/310-d-pipeline-responsibility-split.md
Context¶
The CO2 calculator serves two user paths with fundamentally different latency expectations:
- Path 1 — Interactive UI: standard and principal users edit modules through
POST/PATCH/DELETE /v1/carbon_reports/.... Users expect instant visual feedback (<200ms typical). - Path 2 — Bulk operator: principal users and backoffice métier upload CSVs, sync factors, sync units. Operators expect minutes, not milliseconds; SSE streams progress.
Earlier code mixed both paths through the same write functions. Bulk CSV ingest computed emissions inside the ingest transaction; factor recalculation also wrote data_entry_emissions; both called recompute_stats writing carbon_reports. Two concurrent bulk pipelines for different modules raced on the same tables.
Decision¶
Codify a two-path principle with distinct write strategies per path:
| Path | Trigger | Write strategy |
|---|---|---|
| 1 — Interactive UI | UI module edit endpoints | Inline, synchronous |
| 2 — Bulk operator | /sync/dispatch, /sync/... | Async chained jobs |
Each table should have exactly one writer per path. The target ownership map:
| Table | Path 2 writer (target) | Path 1 writer (unchanged) |
|---|---|---|
data_entries | csv_ingest / api_ingest jobs | CarbonReportModuleWorkflow |
data_entry_emissions | emission_recalc job | CarbonReportModuleWorkflow |
carbon_reports.stats | aggregation job | CarbonReportModuleWorkflow |
Path 2 chain (per module), once fully delivered:
csv_ingest → emission_recalc → aggregation
Aggregation jobs dedupe per (module_type_id, year) so N parallel recalcs collapse to one stats refresh.
Path 1 keeps inline writes. Single-row request scope serializes its writes naturally; this is a deliberate UX choice, not a violation.
See docs/src/implementation-plans/310-d-pipeline-responsibility-split.md.
Current state¶
The single-writer split is delivered for the recalc chain, with one legacy inline writer left in CSV ingest:
- The dedicated
aggregationjob exists (backend/app/tasks/aggregation_tasks.py:87,@register("aggregation")) and is the bulk-path writer ofcarbon_reports.stats. Both recalc handlers chain it (coalesced to one trailing job per pipeline — see the code-flow diagram below);EmissionRecalculationWorkflowno longer callsrecompute_statsitself. backend/app/services/data_ingestion/base_csv_provider.py:1279— bulk CSV ingest still invokes_recompute_module_stats()inline before the recalc chain takes over. This is the remaining second writer oncarbon_reports.statsin Path 2; harmless (the trailing aggregation overwrites it) but redundant work.
Aggregation jobs are identified by job_type="aggregation" (runner registry), not a TargetType value — TargetType has no AGGREGATION member and doesn't need one.
Code flow¶
Path 2 — CSV upload, file by file¶
flowchart TD
A["POST /v1/sync/dispatch<br/><code>api/v1/data_sync.py:716</code><br/>sync_module_data_entries"] -->|"creates DataIngestionJob (csv_ingest)<br/>+ Pipeline row, then<br/>fire_and_forget(run_job) :924"| B
B["<b>run_job</b><br/><code>tasks/runner.py:49</code><br/>claim → heartbeat → handler →<br/>preemption check → FINISHED write"] --> C
C["csv_ingest_handler<br/><code>tasks/ingestion_tasks.py:42</code>"] -->|"_run_ingest :227"| D["CSV providers<br/><code>services/data_ingestion/base_csv_provider.py</code><br/>writes <b>data_entries</b><br/>(+ legacy inline stats :1082)"]
C -->|"_chain_emission_recalc_for_data_ingest :423<br/>one child per stale (det, year)"| E["chain_job<br/><code>tasks/_chain.py:153</code><br/>creates child row, dedup-guarded;<br/>dispatch deferred until parent's<br/>FINISHED commit (drain in runner :283)"]
E -->|"fire_and_forget(run_job)"| F
F["emission_recalc_handler<br/><code>tasks/emission_recalculation_tasks.py:284</code><br/>factor-recalc advisory lock"] --> G["EmissionRecalculationWorkflow<br/><code>workflows/emission_recalculation.py:30</code><br/>bulk factor rematch + per-entry SAVEPOINT<br/>writes <b>data_entry_emissions</b>"]
F -->|"_is_last_recalc_sibling :103<br/>only the LAST sibling chains"| H["chain_job → aggregation<br/>(AGGREGATION_DEDUP)"]
H -->|"fire_and_forget(run_job)"| I["aggregation_handler<br/><code>tasks/aggregation_tasks.py:87</code><br/>CarbonReportModuleService.recompute_stats<br/>writes <b>carbon_reports.stats</b>"]
B -.->|"after FINISHED write :338<br/>(every job, incl. C, F, I)"| J["recompute_pipeline_status<br/><code>repositories/data_ingestion.py:595</code><br/>resolves the Pipeline row"]
K["reconcile_pipeline_statuses_loop<br/><code>tasks/_pipeline_reconciler.py:103</code><br/>+ _recover_orphan_aggregations :37"] -.->|"periodic backstop"| J
L["poll_pending_jobs<br/><code>tasks/_poller.py:89</code>"] -.->|"re-dispatches orphaned<br/>QUEUED jobs (pod crash)"| B Every job — ingest, recalc, aggregation — runs through the same run_job runner; handlers are looked up via @register("<job_type>") in tasks/registry.py. The runner, not the handlers, answers "who resolves the pipeline": after each FINISHED write it calls recompute_pipeline_status, so the pipeline flips to done when its last child (normally the trailing aggregation) finishes. The reconciler loop and the poller are crash/race backstops only.
Progress reaches the UI via SSE: GET /v1/sync/jobs/{job_id}/stream (api/v1/data_sync.py:1318) streams job state; per-pipeline progress is derived read-side by compute_pipeline_progress (services/pipeline_progress.py:120) from the job rows.
Background loops — poller, reconciler, heartbeat¶
Path 2's primary dispatch is in-process fire_and_forget(run_job) — no loop involved. Three lifespan-managed loops (started in main.py's lifespan context, each gated by a RUN_* setting) cover the failure modes:
flowchart TD
M["FastAPI lifespan startup<br/><code>app/main.py:61-95</code>"] --> P & R & H
P["<b>Safety poller</b><br/><code>tasks/_poller.py:87</code> poll_pending_jobs<br/>every <b>2 s</b> (POLLER_INTERVAL_SECONDS)"]
P --> P1["Sweep 1: sweep_stuck_running_jobs<br/><code>repositories/data_ingestion.py:1126</code><br/>RUNNING + locked_at older than<br/><b>60 min</b> (STALE_JOB_TIMEOUT_MINUTES)<br/>→ NOT_STARTED, or FINISHED+ERROR<br/>when attempts ≥ max_attempts"]
P --> P2["Sweep 2: _pending_runner_jobs_query :60<br/>NOT_STARTED, job_type NOT NULL,<br/>run_after due, unlocked, attempts < max<br/>FOR UPDATE SKIP LOCKED,<br/><b>limit 100</b> (POLLER_BATCH_LIMIT)"]
P2 -->|"schedule_job → fire_and_forget"| RUN["run_job<br/><code>tasks/runner.py:49</code>"]
R["<b>Pipeline reconciler</b><br/><code>tasks/_pipeline_reconciler.py:103</code><br/>every <b>60 s</b> (PIPELINE_RECONCILER_INTERVAL_SECONDS)"]
R --> R1["recompute_pipeline_status<br/>for non-terminal pipelines"]
R --> R2["_recover_orphan_aggregations :37<br/>re-chains a missing trailing aggregation"]
H["<b>Pod heartbeat</b><br/><code>tasks/_pod_heartbeat.py</code><br/>every <b>30 s</b> (POD_HEARTBEAT_INTERVAL_SECONDS)<br/>refreshes <code>pods.last_heartbeat_at</code><br/>backs GET /v1/sync/workers"] Intervals and limits (defaults from app/core/config.py):
| Loop | Interval | Limit / threshold | Off switch |
|---|---|---|---|
| Safety poller | 2 s (POLLER_INTERVAL_SECONDS, ge=1) | 100 jobs per sweep (POLLER_BATCH_LIMIT); stale-RUNNING at 60 min | RUN_BACKGROUND_POLLER |
| Pipeline reconciler | 60 s (ge=10) | commits per pipeline | RUN_PIPELINE_RECONCILER |
| Pod heartbeat | 30 s (ge=5) | pod counts live within 2× interval | RUN_POD_HEARTBEAT |
Why a 2 s cadence is safe: each sweep is one SELECT … FOR UPDATE SKIP LOCKED LIMIT 100 — multi-pod deployments don't double-dispatch (skip-locked) and an idle system costs one cheap indexed query per pod per interval. The 60-min stale threshold must stay above the longest plausible job runtime; below it, the sweep would preempt a still-working pod and duplicate processing.
The poller never drives the happy path. A healthy upload flows entirely through endpoint → fire_and_forget → run_job → chain_job deferred dispatches; the poller only catches jobs whose in-process Task died between the row commit and the runner claim (pod crash, restart mid-deploy).
Path 1 — interactive module edit¶
flowchart LR
A["POST/PATCH/DELETE<br/>/v1/carbon_reports/...<br/>module edit endpoints"] --> B["CarbonReportModuleWorkflow<br/><code>workflows/carbon_report_module.py</code>"]
B --> C["writes <b>data_entries</b>"]
B --> D["DataEntryEmissionService<br/>writes <b>data_entry_emissions</b>"]
B --> E["recompute_stats (inline)<br/>writes <b>carbon_reports.stats</b><br/>:108 :229 :262"] One request, one transaction, all three tables written synchronously — no jobs, no pipeline row, no SSE. The user's spinner is the progress indicator.
Consequences¶
Positive:
- Bulk-path race conditions on
data_entry_emissionsandcarbon_reports.statsare eliminated by ownership, not locking. - Long-running emission compute no longer holds ingest transaction locks; ingest commits fast and chains the recalc.
- Frontend UX explicit: per-module "Recalculating..." badge while Path 2 chains run.
Negative:
- Two write paths to maintain. Tests must cover both.
- New contributors must learn which path their change belongs to; the rule "is the user staring at a spinner?" decides — yes is Path 1, no is Path 2.
Future work: batched ingest (1k–5k rows) is deferred until Path 2's job-split lands and lock duration becomes the bottleneck.
References¶
docs/src/implementation-plans/310-d-pipeline-responsibility-split.mddocs/src/implementation-plans/310-overview.md