1236 — First-class pipelines table¶
Status: design · Issue #1236 · Integrates into fix/pipeline-debug (supersedes the standalone PR #1237). Not bundled with #1234 / #1225.
Why¶
A "pipeline" is currently emergent: rows in data_ingestion_jobs sharing a pipeline_id UUID, DAG reconstructed from meta.parent_job_id / recalc_jobs_chained / aggregation_job_id, status recomputed on every read by compute_pipeline_progress. That missing aggregate root is the root cause of a recurring bug class: NULL-until-fan-out (orphans + the #1225 eager-id workaround), premature "success" (#1219), and the console (#1234) having to GROUP BY pipeline_id with two-step pagination. See emission-pipeline-flow.md for the current DAG.
Background — why this is hard (poisoning & deadlocks)¶
Keep this; it is the whole reason the design is shaped this way.
- A SQLAlchemy async
Sessionis one Postgres transaction. Any statement error aborts the entire transaction; every later statement then fails withInFailedSqlTransaction: current transaction is aborted …until aROLLBACK(PendingRollbackError). The logged error is usually the cascade, not the cause — e.g. jobs 47/74/104 log a harmlessSELECT carbon_report_modulesbecause it was merely the first statement to touch an already-poisoned session; the real failure was an earlier write. - A deadlock = two transactions each holding a lock the other needs in opposite order; Postgres kills one (
DeadlockDetected). Here it is structural: one upload →recalc_jobs_chained: 3→ 3emission_recalcchildren run concurrently over overlapping rows, eachDELETE/re-insertdata_entry_emissions, eachaggregationthen rewrites ~2231carbon_reports. 3× concurrent full-table rewrites = the deadlocks in the data (job 90 onUPDATE carbon_reports). - Chain: amplification → concurrent overlapping writes → deadlock (trigger) → no rollback → poisoning (cascade) → job sinks (#1219/#1225).
- Two sessions (
job_sessionvsdata_session) correctly isolate job-state integrity from data poisoning — necessary, but only that axis. It does not prevent cross-worker deadlocks, nor stop a poisoneddata_sessionsinking the rest of a job's data work; that is what #1225's per-entrybegin_nested()SAVEPOINT covers. Both halves are required.
Proposed model¶
First-class pipelines table; data_ingestion_jobs.pipeline_id becomes an FK.
pipelines
id uuid pk
kind text -- = parent job_type: csv_ingest | api_ingest |
-- factor_ingest | unit_sync |
-- module_emission_recalc | reference_ingest
status text -- NOT_STARTED|RUNNING|SUCCESS|PARTIAL|FAILED
entity_type enum -- kept; NOT folded into kind
ingestion_method enum -- kept; NOT folded into kind
module_type_id int null
year int null
expected_recalc int null -- owned recalc count (was meta.recalc_jobs_chained)
job_count int
error_count int
started_at timestamptz null
finished_at timestamptz null
last_error text null
kind mirrors the parent job's job_type — deliberately not a flattened CSV_MODULE_PER_YEAR / API_MODULE_UNIT_SPECIFIC enum (see Rejected alternatives).
DECIDED — Phase-1 status maintenance¶
- Recompute-and-store (not incremental). On a job terminal, read the pipeline's sibling jobs, run the existing pure
compute_pipeline_progress, write the single resultingstatus. Rationale: drift is the exact bug we are killing; recompute-from- truth cannot drift and self-heals a lost write, whereas an incremental accumulator under concurrency + retries + out-of-order terminals is precisely how drift returns. The per-terminal sibling SELECT is bounded and indexed bypipeline_id— cheap insurance. - Option (a) — only the last child writes status. No manual counting:
compute_pipeline_progress(siblings).doneis the last-child oracle. Under READ COMMITTED a non-last terminal seesdone=falseand skips; the actually-last terminal sees every earlier commit and writes. If two terminals race and both observe the full set, both write — benign, recompute-from-truth makes the writes identical (just brief row contention). UPSERT; add no coordination machinery for a self-resolving edge. - Mechanism: post-commit isolated write, NOT a SAVEPOINT inside the finish transaction.
repo.finish_jobself-commits thejob_session; there is no enclosing transaction to nest in. So the status write is its own short transaction afterfinish_jobreturnsTrue, fullytry/except'd (log-and-skip on any DB error). This is stronger isolation than the SAVEPOINT ideal: the job's terminal is already durably committed, so a failed status write cannot poison it at all. Trade-off: a small window where a reader sees stale status — irrelevant in Phase 1 (no reads flipped), absorbed in Phase 3 by the sweep + next-terminal self-heal. - Row creation is a separate concern from status. Create the
pipelinesrow at parent creation, not in the runner (the runner is a terminal actor, not a kickoff actor). One idempotent helperensure_pipeline_exists(session, pipeline_id, parent_job)(INSERT … ON CONFLICT DO NOTHING) called from the 4 post-merge mint sites (_stamp_job_type_and_meta, the 2 recalc endpoints'DataIngestionJob(...),_chain.pylazy mint). One logical chokepoint, several call sites — keeps Phase-3 in-flight visibility without 4-way drift. - Reconciliation sweep (recompute status where stored ≠
compute_pipeline_progress) is the durable backstop for skipped writes. Phase 1 ships it as a standalone callable (manually / cron-invokable); Phase 3 schedules it before flipping reads.
Concurrency & contention model (informs Phase 3+)¶
The contention map has two distinct problems — do not conflate:
| Phase | Writes | Cross-pipeline conflict |
|---|---|---|
| ingest + emission_recalc | data_entries/data_entry_emissions, det-partitioned | none between different dets |
| aggregation | carbon_report_modules + the shared carbon_reports synthesis | always |
- Problem A — aggregation is the universal collision + amplified. Fix the step, not whole pipelines (whole-pipeline priority over- serializes the independent det-partitioned phases and still does the full rewrite). Two levers: coalesce concurrent aggregation for a report/year scope into one trailing run (extend the existing
AGGREGATION_DEDUP); scope the rewrite to the recalc's already- computedaffected_module_idsinstead of all reports. - VERIFY (not asserted): confirm the aggregation handler actually full-rewrites (
modules_refreshed: 2231constant in the dumps strongly suggests it) and is not already scoped some other way, before designing the scoping change. - Problem B — same-
(module_type_id, data_entry_type_id, year)factor-vs-data ordering is a correctness bug, not just contention: a data recalc concurrent with a factor reload for the same scope computes emissions against half-loaded factors → silently wrong numbers. Needs a scoped factor-before-data mutex (reuse theuq_emission_recalc_activeper-scope primitive; the gap is ordering the parent ingests for that scope), not whole-pipeline priority.
Net direction: keep ingest/recalc concurrent (that is the wanted parallelism); make aggregation a coalesced, affected-scope, serialized- per-report-domain step (A); add a scoped factor→data ordering (B). Most of this extends primitives already built.
Phased plan (each shippable + reversible)¶
v0.x has no backfill — the DB is dropped between deploys; real backfill starts at v1.x. This collapses old Phase 2: there is no history to synthesise. On any fresh v0.x DB every
pipeline_idoriginates fromensure_pipeline_exists, so the FK is not backfill-gated — it can be enforced once a clean DB is running Phase-1 code (i.e. the deploy after Phase 1 ships, which in v0.x is a DB drop). The mid-DB-life window (Phase 1 deployed onto a DB that already has pre-Phase-1 pipeline_id rows) is the only reason the FK isn't in the Phase-1 migration itself.
- Add table + write-through. ✅ DONE (
fix/pipeline-debug). Migration createspipelinestable-only (column stays plain UUID, no FK yet — see box above). The 4 mint sites callensure_pipeline_exists; the runner advancesstatuspost-finish_jobas an isolated log-and-skip write (last-child oracle). Sweep is a standalone callable. Verify (met): the reconciliation test proves zero drift (storedstatus==compute_pipeline_progress). - Enforce FK (v0.x, post-DB-drop) — add
pipeline_idFK →pipelines(id)once a clean DB runs Phase-1 code. No data migration. Verify: migration applies on a fresh DB; an orphanpipeline_idis impossible becauseensure_pipeline_existsruns at every mint. (v1.x: a real backfill migration replaces this step — apipelinesrow per historicalpipeline_id, NULL-pipeline parents → single-step; #1219/poisoned samples landFAILED/PARTIAL. Thelast_error-skips-"Success" fix already makes that backfill safe.) - Flip reads. Console (#1234),
GET /pipelines/{id}, progress read the table;compute_pipeline_progressbecomes writer-side only. Verify: golden-output diff before/after on the same DB. - Aggregation coalesce + scope (Problem A) and scoped factor→data ordering (Problem B) — separate sub-PRs, gated on the VERIFY above.
- Retire meta threading once nothing reads the counters.
Phase 1 is a pure addition (revert = drop table); phase 3 is the only behavioural flip.
Rejected alternatives¶
- Flat
kindenum: cartesian ofingestion_method×entity_type(already columns); incomplete forfactor_ingest/unit_syncpresent in real data. pipelinesas a VIEW: keeps per-read recompute, cannot carry an authoritativestatus.- Global whole-pipeline priority/mutex (the first instinct for side-note-2): over-serializes the independent det-partitioned phases and still leaves the full-table aggregation rewrite. Replaced by the A/B decomposition above.
- Incremental status: drift under concurrency/retries/reorder = the #1219 class.
Open questions¶
PARTIAL(some children errored, chain completed) vsFAILED(chain broken) exact definition.- Aggregation coalescing key:
(carbon_report scope, year)? Interaction withunit_sync(year-level, also rewritescarbon_reports). - Shared-synthesis edge: det-disjoint pipelines still collide on a
carbon_reportsrow when their modules share a report — serialize by report-domain, not by det. - Retry-safety of the factor→data ordering (
max_attempts=3, mid-pipeline re-claim) and interaction with dedup-skip. - Backfill of legacy rows lacking meta counters — best-effort
compute_pipeline_progress, same as today's read path.
Convention notes¶
Pipeline work integrates into fix/pipeline-debug (not dev) until told otherwise. Commit messages: no Co-Authored-By trailer.