Plan: Manual emission recalculation endpoint with "needs recalculation" status¶
TL;DR: Three additions. (1) Expose GET /sync/recalculation-status?year=YYYY returning per-module status (with per-data-entry-type detail nested inside), derived from existing DataIngestionJob rows. (2) Expose POST /sync/recalculate-emissions/{module_type_id}/{data_entry_type_id}?year=YYYY for single-type recalculation. (3) Expose POST /sync/recalculate-emissions/{module_type_id}?year=YYYY for module-level bulk recalculation with an only_stale flag (selectable via a frontend dialog). No auto-trigger on factor ingestion. No new DB table — status is derived from DataIngestionJob.id ordering.
Phase 1 — Repository: cross-module DataEntry query¶
add 1 method
- Add
list_by_data_entry_type_and_year(data_entry_type_id, year) -> list[DataEntry]tobackend/app/repositories/data_entry_repo.py - JOIN:
DataEntry ⋈ CarbonReportModule ⋈ CarbonReport WHERE data_entry_type_id=X AND CarbonReport.year=Y - Models already imported in the repo file
Phase 1.5 — Schema + Repository: recalculation status derivation¶
no new DB table — derived from existing DataIngestionJob rows
- Add two Pydantic models to
backend/app/api/v1/data_sync.py(alongside existing response models):
class RecalculationStatus(BaseModel):
"""Per-(module_type_id, data_entry_type_id) recalculation status."""
module_type_id: int
data_entry_type_id: int
year: int
needs_recalculation: bool
last_factor_job_id: Optional[int]
last_factor_job_result: Optional[IngestionResult]
last_recalculation_job_id: Optional[int]
last_recalculation_job_result: Optional[IngestionResult]
class ModuleRecalculationStatus(BaseModel):
"""Per-module rollup status — true if any data_entry_type needs recalculation."""
module_type_id: int
year: int
needs_recalculation: bool # any(det.needs_recalculation for det in data_entry_types)
data_entry_types: list[RecalculationStatus]
The ModuleRecalculationStatus aggregation is computed in the API layer (no extra query): group the flat RecalculationStatus rows returned by the repo method by module_type_id, then set needs_recalculation = any(row.needs_recalculation for row in group).
- Add
get_recalculation_status_by_year(year: int) -> list[RecalculationStatusRow]tobackend/app/repositories/data_ingestion.py:
Derivation logic — for each (module_type_id, data_entry_type_id, year): - Factor jobs in scope: is_current=True, year=Y, state=FINISHED, target_type=FACTORS, result != ERROR — grouped by (module_type_id, data_entry_type_id), keeping the row with MAX(id) (latest factor sync across all ingestion methods) - Recalculation jobs in scope: is_current=True, year=Y, target_type=DATA_ENTRIES, ingestion_method=computed — one per (module_type_id, data_entry_type_id) (unique because of the is_current partial-unique index) - LEFT JOIN factor groups with recalculation jobs on (module_type_id, data_entry_type_id) - needs_recalculation = recalc_job IS NULL OR latest_factor_job.id > recalc_job.id
Rationale: DataIngestionJob.id is a serial primary key — a higher id means a later job. This comparison reliably detects whether factor data is newer than the last emission recalculation without any additional timestamp column.
The method returns a lightweight dataclass/TypedDict (not the full ORM model) to avoid loading the meta JSON for every job.
Phase 2 — Workflow: EmissionRecalculationWorkflow¶
new file + implement existing stub
- Create
backend/app/workflows/emission_recalculation.pywith methodrecalculate_for_data_entry_type(data_entry_type_id, year) -> dict: - Fetch entries via Phase 1 repo method
- For each:
DataEntryEmissionService.upsert_by_data_entry(DataEntryResponse.model_validate(entry))—DataEntryResponsemaps 1:1 toDataEntryfields, directmodel_validateworks - Per-entry errors:
except Exception→ log + accumulate inerror_details, never abort - Collect affected
carbon_report_module_ids; after the loop callCarbonReportModuleService.recompute_stats()once per distinct module -
Return
{recalculated: N, modules_refreshed: M, errors: K, error_details: [...]} -
Implement
FactorService.find_modules_for_recalculation()stub atbackend/app/services/factor_service.py(L252) using the same repo method (parallel with step 2)
Phase 3 — Task: background recalculation runner¶
new file — mirrors run_sync_task session pattern
- Create
backend/app/tasks/emission_recalculation_tasks.pywith:
Single-type variant (used by the per-data-entry-type endpoint):
async def run_recalculation_task(
module_type_id: int,
data_entry_type_id: int,
year: int,
job_id: int,
) -> None
- Opens two
SessionLocal()contexts following the same pattern asrun_sync_task:job_sessionfor status updates (commits immediately, visible to SSE),data_sessionfor emission writes (single atomic commit at the end) - Phase sequence via
job_sessionupdates:RUNNING/"Starting emission recalculation..."- After repo query:
"Found {N} data entries to recalculate" "Recalculating emissions..."
- Calls
EmissionRecalculationWorkflow(data_session).recalculate_for_data_entry_type(data_entry_type_id, year) - On success:
await data_session.commit(), then updates job →FINISHED / SUCCESSwithmeta.recalculationstats - On error:
await data_session.rollback(), updates job →FINISHED / ERRORwithstatus_message
def run_recalculation(module_type_id: int, data_entry_type_id: int, year: int, job_id: int) -> None
- Sync wrapper (mirrors
run_ingestion): callsasyncio.run(run_recalculation_task(...))
Module-level (multi-type) variant (used by the per-module endpoint):
async def run_module_recalculation_task(
module_type_id: int,
data_entry_type_ids: list[int],
year: int,
job_id: int,
) -> None
- Same dual-session pattern (
job_session/data_session) - Iterates over
data_entry_type_idsin sequence:- Per type: updates job
status_message = "Recalculating {data_entry_type} ({i}/{N})..."viajob_session - Calls
EmissionRecalculationWorkflow(data_session).recalculate_for_data_entry_type(type_id, year) - Accumulates stats per type into
meta.recalculation = {type_id: stats, ...}
- Per type: updates job
- A per-type error does not abort remaining types — accumulated in stats
data_session.commit()is called once after all types are done (all-or-nothing for the whole module)- Final job result:
SUCCESSif no errors across all types;WARNINGif any type had partial errors;ERRORonly if all types failed
def run_module_recalculation(module_type_id: int, data_entry_type_ids: list[int], year: int, job_id: int) -> None
- Sync wrapper: calls
asyncio.run(run_module_recalculation_task(...))
Phase 4 — Endpoint: POST /sync/recalculate-emissions/{module_type_id}/{data_entry_type_id}¶
add to backend/app/api/v1/data_sync.py
- New endpoint with path params
module_type_id: ModuleTypeEnum,data_entry_type_id: DataEntryTypeEnum, and required query paramyear: int:
POST /sync/recalculate-emissions/{module_type_id}/{data_entry_type_id}?year=2025
- Permission:
require_permission("backoffice.data_management", "sync") - Validates
yearis provided (400 if missing) - Creates a
DataIngestionJobdirectly viaDataIngestionRepository(db).create_ingestion_job()— no provider needed:module_type_id: from pathdata_entry_type_id: from pathyear: from queryingestion_method = IngestionMethod.computedtarget_type = TargetType.DATA_ENTRIESentity_type = EntityType.MODULE_PER_YEARstate = IngestionState.NOT_STARTEDmeta = {"config": {"year": year, "data_entry_type_id": data_entry_type_id.value}}
- Commits job creation, then schedules
run_recalculationviabackground_tasks.add_task - Returns
SyncStatusResponse(job_id=job_id, state=NOT_STARTED, message="Emission recalculation scheduled") - Client streams progress via existing
GET /sync/jobs/{job_id}/stream— no new SSE endpoint needed
Phase 4.5 — Endpoint: POST /sync/recalculate-emissions/{module_type_id} (module-level bulk trigger)¶
add to backend/app/api/v1/data_sync.py
- New module-level endpoint:
POST /sync/recalculate-emissions/{module_type_id}?year=2025&only_stale=true
- Permission:
require_permission("backoffice.data_management", "sync") - Path param
module_type_id: ModuleTypeEnum; required query paramyear: int; optionalonly_stale: bool = True - Resolves candidate
data_entry_type_idsfromMODULE_TYPE_TO_DATA_ENTRY_TYPES[module_type_id] - If
only_stale=True: callsDataIngestionRepository(db).get_recalculation_status_by_year(year), filters to types whereneeds_recalculation=Truethat also belong to this module; returns 400 with"No data entry types require recalculation for this module"if none qualify - If
only_stale=False: uses all data_entry_types for the module - Creates one
DataIngestionJobdirectly viaDataIngestionRepository:module_type_id: from pathdata_entry_type_id = None(multi-type job — not scoped to a single type)year: from queryingestion_method = IngestionMethod.computedtarget_type = TargetType.DATA_ENTRIESentity_type = EntityType.MODULE_PER_YEARstate = IngestionState.NOT_STARTEDmeta = {"config": {"year": year, "data_entry_type_ids": [...], "only_stale": only_stale}}
- Commits job creation, schedules
run_module_recalculationviabackground_tasks.add_task - Returns
SyncStatusResponse(job_id=job_id, state=NOT_STARTED, message="Module emission recalculation scheduled for {N} data entry types") - Client streams progress via existing
GET /sync/jobs/{job_id}/stream
Phase 4.6 — Endpoint: GET /sync/recalculation-status¶
add to backend/app/api/v1/data_sync.py
- New read endpoint:
GET /sync/recalculation-status?year=2025
- Permission:
require_permission("backoffice.data_management", "view") - Required query param
year: int(400 if missing) - Calls
DataIngestionRepository(db).get_recalculation_status_by_year(year)→ flatlist[RecalculationStatus] - Groups by
module_type_idin the API layer →list[ModuleRecalculationStatus] - Returns
[]if no completed FACTORS jobs exist for the year
Phase 5 — Frontend: status display + recalculation actions¶
frontend/src/pages/back-office/DataManagementPage.vue
- On page load (and after any factor sync or recalculation job completes), call
GET /sync/recalculation-status?year=YYYY: - Store results as
list[ModuleRecalculationStatus]in reactive state, keyed bymodule_type_id - Module-level row/card: show "Recalculation needed" warning badge when
module.needs_recalculation=true; show success/warning chip whenfalse(usinglast_recalculation_job_resultof the most recently recalculated data_entry_type) -
Per-data-entry-type sub-row: show individual
needs_recalculationbadge +last_recalculation_job_resultchip usingmodule.data_entry_types -
Module-level "Recalculate Emissions" button (shown on each module card regardless of
needs_recalculation; disabled while a module-level recalculation is in progress): - On click: opens a Quasar
q-dialogwith the choice:"Recalculate only data entry types that need it"(default, maps toonly_stale=true)"Recalculate all data entry types"(maps toonly_stale=false)
- The dialog shows which data_entry_types are stale (from reactive status) to help the operator decide
- On confirm:
POST /sync/recalculate-emissions/{module_type_id}?year=YYYY&only_stale={bool}→ receivejob_id - Subscribe to SSE on
GET /sync/jobs/{job_id}/stream; show inline module-level progress spinner +status_message - On
stream_closed: refresh recalculation-status → all badges update reactively -
On
FINISHED: show module-level result badge and per-type stats frommeta.recalculation -
Per-data-entry-type "Recalculate Emissions" button (existing, unchanged from previous design):
- On click:
POST /sync/recalculate-emissions/{module_type_id}/{data_entry_type_id}?year=YYYY→ receivejob_id - Subscribe to SSE; show inline progress; on
stream_closedrefresh status
- On click:
Phase 6 — Tests¶
parallel with implementation phases
backend/tests/unit/repositories/test_data_entry_repo.py— testlist_by_data_entry_type_and_year(): matching year, non-matching year, empty resultbackend/tests/unit/repositories/test_data_ingestion_repo.py(extend) — testget_recalculation_status_by_year():- FACTORS job only (no recalculation job) →
needs_recalculation=True - FACTORS job + recalculation job where
recalc.id > factor.id→needs_recalculation=False - FACTORS job + recalculation job where
factor.id > recalc.id→needs_recalculation=True - FACTORS job with
result=ERROR→ excluded → no status row returned - No FACTORS jobs at all → empty list
- FACTORS job only (no recalculation job) →
backend/tests/unit/workflows/test_emission_recalculation.py(NEW) — all-success, partial error (one entry fails), empty setbackend/tests/unit/tasks/test_emission_recalculation_tasks.py(NEW):run_recalculation_task— updates job RUNNING → FINISHED/SUCCESS; error path → FINISHED/ERRORrun_module_recalculation_task— iterates all given types; one type error → WARNING result; all types error → ERROR result; verifiesdata_session.commit()called once; verifies per-type progressstatus_messageupdates
Relevant files¶
backend/app/repositories/data_entry_repo.py— Phase 1 additionbackend/app/repositories/data_ingestion.py— Phase 1.5 addition (get_recalculation_status_by_year)backend/app/api/v1/data_sync.py— Phase 1.5 (RecalculationStatus+ModuleRecalculationStatusmodels), Phase 4 (per-type POST), Phase 4.5 (module-level POST), Phase 4.6 (GET status)backend/app/workflows/emission_recalculation.py— Phase 2 (new)backend/app/services/factor_service.pyL252 — Phase 2 stub implementationbackend/app/tasks/emission_recalculation_tasks.py— Phase 3 (new, both single-type and module-level tasks)frontend/src/pages/back-office/DataManagementPage.vue— Phase 5backend/app/tasks/ingestion_tasks.py— no changes
Verification¶
- After a successful FACTORS job for
(module X, data_entry_type Y, year 2025), callGET /sync/recalculation-status?year=2025→ module X appears withneeds_recalculation=true; itsdata_entry_typesarray contains Y withneeds_recalculation=true - Module-level dialog opens; select "only stale" →
POST /recalculate-emissions/{module_type_id}?year=2025&only_stale=true→ singlejob_idreturned - SSE on
GET /sync/jobs/{job_id}/streamshows"Recalculating {data_entry_type} (1/N)..."per type, then final stats inmeta.recalculation - After completion,
GET /sync/recalculation-status?year=2025→ module Xneeds_recalculation=false, all data_entry_typesneeds_recalculation=false - Run a new FACTORS job → status flips back to
needs_recalculation=truefor that type/module automatically - Select "all data entry types" in dialog with
only_stale=false→ all types recalculated even if not stale only_stale=truewhen no types are stale → 400 with clear error message- Break one DataEntry → module job finishes
WARNING; per-type stats inmeta.recalculationshowerrors=1for that type, others unaffected pytest backend/tests/unit/repositories/test_data_ingestion_repo.py -vpytest backend/tests/unit/workflows/test_emission_recalculation.py -vpytest backend/tests/unit/tasks/test_emission_recalculation_tasks.py -v
Further Considerations¶
- Scale / batching: if a unit has thousands of DataEntries, the recalculation loop may be slow. A
BATCH_SIZE-limited iterator with intermediateflush()+ progress updates (reusing the pattern fromprocess_csv_in_batches) would prevent memory spikes. This can be deferred if current data volumes are small. - Idempotency: the endpoint can be called multiple times safely —
upsert_by_data_entry()already deletes existing emissions before re-inserting. Theis_currentflag onDataIngestionJobwill naturally track the latest recalculation job for a given(module_type_id, data_entry_type_id, year)combination. - Multi-method factor jobs: a
(module_type_id, data_entry_type_id, year)may have both a CSV and a computed FACTORSis_currentjob. The status derivation takesMAX(id)across all ingestion methods for that combination, so the most recent factor sync (regardless of method) is what matters. - No migration needed:
needs_recalculationis fully derived from existingDataIngestionJobrows. Theis_currentpartial-unique index already ensures at most one current job per(module_type_id, data_entry_type_id, target_type, ingestion_method, year)combination. - Module-level job
data_entry_type_id=None: theis_currentunique partial index onDataIngestionJobcovers(module_type_id, data_entry_type_id, target_type, ingestion_method, year). Withdata_entry_type_id=NULL, PostgreSQL's NULL ≠ NULL semantics mean the uniqueness constraint won't prevent multiple concurrent module-level jobs. Themark_job_as_currentmethod will need to handle this (WHERE data_entry_type_id IS NULL) explicitly to unset previous module-level jobs.