310 — Plan Review¶
Verified plans against actual code in backend/app/models/data_ingestion.py, backend/app/repositories/data_ingestion.py, backend/app/tasks/*, backend/app/api/v1/data_sync.py, backend/app/models/factor.py.
Three showstoppers, several real-but-smaller issues, and one strategic question.
Showstoppers — need answers before any code is written¶
1. BackgroundTasks chaining is fragile, not "needs threading through"¶
Plans B (_enqueue_stale_recalculations), C (DAG chaining via next_job), and D (chained ingest → recalc → aggregation) all assume that calling background_tasks.add_task(...) from inside a running background task schedules the next task. This is implementation-defined Starlette behavior, not a documented API: BackgroundTasks runs queued tasks sequentially after the response; adding to the list during iteration may or may not be picked up depending on internal iteration mechanics, and the BackgroundTasks instance is per-request and goes out of scope after its loop ends.
This is the foundational decision for everything downstream. Pick one before writing more plan content on top:
- (a)
asyncio.create_task(run_recalculation_task(...))from within the running task — tied to the same event loop, no Starlette involvement, but the pod can't crash mid-chain - (b) Drop in-task chaining entirely; let the next job sit in
state=NOT_STARTEDand add a tiny poller (cron-like loop, separate thread, or/sync/jobs/run-pendingendpoint) that picks them up - (c) Add a real worker process that polls
SELECT … FOR UPDATE SKIP LOCKED
Option (b) is the cleanest with what we have. Option (a) is what we implicitly relied on already (existing run_module_recalculation_task doesn't chain anywhere). Option (c) is what the reflexion doc actually advocated.
2. is_current concurrency hole is bigger than Plan A claims to fix¶
Today's flow:
POST /recalculate-emissions/... → create job (state=NOT_STARTED, is_current=False)
→ commit
→ background_tasks.add_task(...)
[later, in BG task] → update_ingestion_job(state=RUNNING)
→ mark_job_as_current(job) ← is_current=True only here
Two operators clicking the same button simultaneously create two NOT_STARTED, is_current=False rows. The partial unique index ix_data_ingestion_jobs_is_current_unique (only enforces uniqueness WHERE is_current=true) does not trip because neither row is current yet. Both BG tasks then run; first one to call mark_job_as_current "wins" — but the loser keeps running too.
Plan A's claim_job only sets state=RUNNING. It does not set is_current=True. The race window between job creation and claim stays wide open. Two pods can both successfully claim_job against two different job_id values for the same (module, det, year) and both proceed.
Fix: claim_job must atomically set both state=RUNNING AND is_current=TRUE, and integrate the unset-previous-current logic from mark_job_as_current. The unique index then trips on the second claimer's UPDATE, and only one survives. Plan A as written needs revision.
3. Factor classification ordering is a footgun, not a design¶
Plan B claims "classification JSON structure per handler is already stable… ::text cast is therefore stable." Verified: factors.classification is Column(JSON) (factor.py:25), not JSONB. Postgres JSON preserves insertion order verbatim from the original text; ::text reflects whatever order Python's json.dumps produced.
Today this works because Python dicts since 3.7 preserve insertion order and every handler happens to insert keys consistently. The first PR that builds a classification dict differently — e.g. {"sub_class": "X", "class": "Y"} instead of {"class": "Y", "sub_class": "X"} — silently creates a duplicate row. The unique index will allow the dup because the text representation differs.
Fix options (Plan B should pick one):
- Migrate
classificationtoJSONB— Postgres sorts keys alphabetically,::textbecomes deterministic regardless of insertion order - Add a generated column
classification_key TEXT GENERATED ALWAYS AS (...) STOREDand index that - Add a normalize helper in code and trust convention — but document loudly that any new caller must use it
JSONB migration is the smallest change that eliminates the class of bug entirely.
Smaller issues to fold into plan revisions¶
Plan A¶
- "Remove existing state=RUNNING line in tasks" is wrong for
run_sync_task(the provider sets state, not the task body) but right forrun_recalculation_task(line 58-63) andrun_module_recalculation_task(line 172-176). Plan needs to specify per-task what changes. max_attemptsandrun_afterare added but no logic uses them. Be honest these are scaffolding for future retry logic, not active.- The recovery 30-min timeout is a magic number — make it a settings constant.
- Recovery doesn't reset
attempts, so a job recovered after exceedingmax_attemptswill fail to claim again. Probably want recovery to also decrement or resetattempts.
Plan B¶
mark_job_as_currentalready handlesdata_entry_type_id IS NULL(lines 184-189 of repo). Plan B's NULL fix is needed only formodule_type_id IS NULL. Tighten the wording._enqueue_stale_recalculationsbecomes throwaway code once Plan C lands (replaced bynext_jobpayload). Acknowledge this so we don't over-invest.- Missing: factors that exist in DB but aren't in the new CSV are silently kept. Today's
bulk_delete + insertremoves them. Need a "delete-not-in-set" step or accept stale rows. This is a real semantic regression not addressed in the plan. EntityTypeusesSAEnum(..., native_enum=True)— addingGLOBAL_PER_YEAR=1requiresALTER TYPE entity_type_enum ADD VALUE 'GLOBAL_PER_YEAR'. Plan B doesn't mention this migration step.
Plan C¶
runner.pycallsget_handler(job.job_type)after claim — but jobs created before Plan C lands havejob_type=NULL. Need a fallback or a backfill migration. Currently the plan assumes all jobs havejob_type.next_jobis singular but factor ingest fans out to N recalcs (one per stale type). Eithernext_jobs: listor fan-out happens at the handler level; plan needs to choose.started_atvslocked_atsemantics aren't disambiguated:locked_at: updates on every claim/retrystarted_at: stays at first claim (for total-duration tracking)- Plan C says "set together" — that's wrong on retry. Specify both clearly.
pipeline_idlifecycle is unspecified across A→B→C. Define: endpoint generates the UUID for multi-step flows and stores in the first job; downstream jobs inherit vianext_jobpayload.
Plan D¶
- N aggregation jobs problem: factor ingest fans out to N recalcs, each chains to aggregation = N aggregation jobs for one module. Need dedup ("skip if already queued for this module/year") or a coalesce mechanism.
- "Frontend shows spinner instead of 0 emissions" is hand-wavy. Today the UI shows
carbon_reports.statsaggregated values. After CSV upload these are stale, not zero. The async path means stale → eventually correct. Spec the actual UX (show "Recalculating..." badge? show stale + warning? block module view?). - The "manual data entry stays inline" carve-out violates the one-writer-per-table rule the plan claims to enforce. Both manual workflow and
emission_recalcjob writedata_entry_emissions. Acknowledge this explicitly. - Migration story: how do we ship D? Provider-by-provider? Big-bang? Plan doesn't specify.
Overview¶
- "Atomic
claim_job()method using FOR UPDATE SKIP LOCKED" is wrong. Plan A actually uses conditional UPDATE WHERE locked_by IS NULL. These are different patterns — SKIP LOCKED is for queue scanning, conditional UPDATE is for known-id claims. The conditional UPDATE is the right pattern; the overview text just needs fixing. - "Plans A and B can be implemented in parallel" — actually B uses
job_type,pipeline_id,attemptsfrom A. They can ship in one PR but B can't start until A's migration is at least drafted.
What we missed entirely¶
Concurrency testing strategy¶
Plan A's test list (test_data_ingestion_repo.py — claim_job success/duplicate/wrong-state) is unit tests with mocked DB. They won't exercise the actual atomic-UPDATE-with-WHERE race. We need an integration test that fires two concurrent claim_job calls against a real Postgres and asserts exactly one wins. Without this, we ship pod-safety code without proving it works under contention.
Plan D should probably be deferred, not pre-committed¶
The reflexion doc framed "one writer per table" as the principled root-cause fix. But A's claim+is_current and B's auto-recalc may close 80% of the actual bug surface. Plan D is the most disruptive (frontend UX, batching, table-ownership refactor across all providers, aggregation job dedup) and deserves real production data on whether the remaining concurrency pain is worth that cost. The overview presents it as certain-to-ship; better to call it "deferred — measure first."
Worker-poll loop is absent¶
The reflexion doc explicitly mentioned SELECT … FOR UPDATE SKIP LOCKED workers. Our plans assume BackgroundTasks forever. With run_after and attempts added to the model (Plan A), we have the data shape for a real queue but no consumer. If we ever want retry-with-backoff or scheduled jobs, we need a poller. Worth at least a sentence in the overview about why we're deferring this.
Batching¶
Reflexion doc: 1k–5k row batches for CSV, 50–200 entry batches for emissions. None of A/B/C/D delivers this. As data volume grows, single-transaction iterations will hold long locks. Should be at least mentioned as Plan D scope or a separate plan.
Recommended next step¶
- Make the chaining decision (showstopper #1) — picks (a)/(b)/(c)
- Revise Plan A to address showstopper #2 (is_current atomic) and the Plan-A smaller items
- Revise Plan B to address showstopper #3 (JSONB migration) and the missing-classification-rows semantic
- Decide whether Plan D ships at all or gets deferred
- Revise Plan C to align with the chaining decision and clarify
pipeline_id/job_typelifecycle