Implementation Plan: Elasticsearch Integration for Audit Table¶
Overview¶
We've implemented pattern 7 as requested, which involves adding a sync_status column to track the synchronization state of audit records with Elasticsearch. This approach provides reliability by tracking the sync status of each audit record and allowing for retry mechanisms.
Implemented Components¶
- Database Schema Changes
- Added
sync_statuscolumn toaudit_documentstable with enum values:pending,syncing,synced,failed - Added
sync_errorcolumn to store error messages for failed sync attempts -
Added
synced_atcolumn to track when the record was successfully synced -
Model Updates
- Updated the
AuditDocumentmodel to include the new fields -
Defined the
SyncStatusEnumenum -
Elasticsearch Integration
- Created an Elasticsearch client configuration
- Implemented functions to send audit records to Elasticsearch following the OPDo contract
-
Handle secure connection with the provided certificate
-
Background Sync Process
- Implemented a background task system to handle sync operations
- Created a periodic task to retry failed syncs
-
Implemented bulk sync for better performance
-
Configuration
- Added Elasticsearch configuration to environment variables
Detailed Implementation¶
1. Database Migration¶
We created a new migration file to add the required columns:
sync_status(ENUM: pending, syncing, synced, failed)sync_error(TEXT, nullable)synced_at(TIMESTAMP, nullable)
2. Model Updates¶
Updated backend/app/models/audit.py to include:
- New
SyncStatusEnumclass - New fields in
AuditDocumentBaseclass
3. Elasticsearch Client¶
Created a new module backend/app/elasticsearch/client.py that:
- Configures the Elasticsearch client with the provided connection details
- Handles secure connections using the certificate file
- Implements authentication with the API key
- Transforms audit records to OPDo schema for compliance with ISO 27701
4. Sync Service¶
Created a new service backend/app/services/audit_sync_service.py that:
- Contains methods to sync individual and bulk audit records
- Implements error handling and status updates
- Uses the pattern of updating status to
syncing, attempting sync, then updating tosyncedorfailed
5. Background Tasks¶
Implemented background task processing using FastAPI's BackgroundTasks.
6. Periodic Retry Mechanism¶
Set up a scheduled task to periodically retry failed syncs.
OPDo Contract Compliance¶
We've implemented strict compliance with the OPDo contract for ISO 27701 traceability:
| Field | Type | Implementation Details |
|---|---|---|
@timestamp | date | Required, formatted with Europe/Zurich timezone |
handler_id | string | Nullable, preserved as-is |
handled_id | string | Required, never null, comma-separated list |
crudt | string | Enum (C/R/U/D) mapped from change_type |
source | IP | Required, validated to be valid IP |
payload | string | Required, converted to compact JSON string |
Critical Implementation Details¶
1. handled_id Never Null Enforcement¶
Implementation ensures handled_id is never null:
def resolve_handled_id(audit_record: dict) -> str:
handled_ids = audit_record.get("handled_ids") or []
handler_id = audit_record.get("handler_id")
# If we have handled_ids, join them
if handled_ids:
return ",".join(str(x) for x in handled_ids)
# If we have handler_id, use it as the implicit handled_id
if handler_id:
return str(handler_id)
# Neither handled_ids nor handler_id available - this violates OPDo contract
raise ValueError("OPDo violation: handled_id cannot be null")
2. IP Address Validation¶
Source field must be a valid IP address:
def validate_ip(ip: str) -> str:
try:
ip_address(ip)
return ip
except Exception:
raise ValueError(f"Invalid IP for OPDo: {ip}")
3. Payload Stringification¶
Payload is converted to a compact JSON string:
def stringify_payload(payload) -> str:
if isinstance(payload, str):
return payload
# Convert dict to compact JSON string
return json.dumps(payload, separators=(",", ":"))
4. Complete OPDo Schema Mapper¶
Production-ready mapper that enforces all OPDo constraints:
def map_to_opdo_schema(audit_record: dict) -> dict:
# Create composite payload with all required fields
composite_payload = {
"route_path": audit_record.get("route_path"),
"route_payload": audit_record.get("route_payload"),
"change_reason": audit_record.get("change_reason"),
"entity_type": audit_record.get("entity_type"),
"entity_id": audit_record.get("entity_id"),
}
return {
"@timestamp": format_timestamp(audit_record["changed_at"]),
"handler_id": (
str(audit_record["handler_id"])
if audit_record.get("handler_id") is not None
else None
),
"handled_id": resolve_handled_id(audit_record),
"crudt": CRUDT_MAP[audit_record["change_type"]],
"source": validate_ip(audit_record["ip_address"]),
"payload": stringify_payload(composite_payload),
}
Testing¶
Comprehensive unit tests have been implemented covering:
- All utility functions (format_timestamp, validate_ip, resolve_handled_id, stringify_payload, map_to_opdo_schema)
- All ElasticsearchClient methods (initialization, sync_audit_record, bulk_sync_audit_records, get_audit_record)
- Edge cases and error conditions
- OPDo contract compliance validation
All 31 tests are currently passing, confirming our implementation correctly follows the OPDo contract.
detail mapping¶
Excellent. Now weβre in real spec-driven territory.
This changes a few important things. Letβs lock it down properly.
π OPDo Contract (Based on What You Just Said)¶
| Field | Type | Constraint |
|---|---|---|
@timestamp | date | required |
handler_id | string | nullable |
handled_id | string | required, never null, comma-separated list |
crudt | string | enum (C/R/U/D/β¦) |
source | IP | required, must be valid IP |
payload | string | required |
π¨ Critical Corrections You Must Implement¶
1οΈβ£ handled_id Must NEVER Be Null¶
Your current data:
"[]"
That means no handled ids.
Spec says:
if handler_id is null β handled_id is unique item and is handler_id (implicit)
So logic becomes:
def resolve_handled_id(audit_record: dict) -> str:
handled_ids = audit_record.get("handled_ids") or []
handler_id = audit_record.get("handler_id")
if handled_ids:
return ",".join(str(x) for x in handled_ids)
if handler_id:
# implicit case
return str(handler_id)
raise ValueError("OPDo violation: handled_id cannot be null")
You must NEVER send null.
2οΈβ£ handler_id Nullable¶
So:
handler_id = audit_record.get("handler_id")
if handler_id is not None:
handler_id = str(handler_id)
Do NOT auto-fill it unless spec says so.
3οΈβ£ source Must Be Valid IP¶
If you send:
"unknown"
You are non-compliant.
And yes β for ISO 27701 traceability, unknown IP is useless.
You should validate:
import ipaddress
def validate_ip(ip: str) -> str:
try:
ipaddress.ip_address(ip)
return ip
except Exception:
raise ValueError(f"Invalid IP for OPDo: {ip}")
If IP is missing β FAIL sync.
Do NOT silently send garbage.
4οΈβ£ payload Must Be String¶
You said:
payload is a string
So DO NOT send JSON object.
Send either:
Option A β Condensed JSON string (recommended)¶
import json
def stringify_payload(payload):
if isinstance(payload, str):
return payload
return json.dumps(payload, separators=(",", ":"))
That produces compact JSON without spaces.
Example:¶
{"module_type_id":1,"year":null,...}
This is clean and deterministic.
π§ Now Letβs Build The Proper Mapper¶
This is what your production-safe mapper should look like:
import json
import ipaddress
from datetime import datetime
from zoneinfo import ZoneInfo
CRUDT_MAP = {
"CREATE": "C",
"READ": "R",
"UPDATE": "U",
"DELETE": "D",
}
def format_timestamp(ts_str: str) -> str:
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S.%f")
dt = dt.replace(tzinfo=ZoneInfo("Europe/Zurich"))
return dt.isoformat()
def validate_ip(ip: str) -> str:
ipaddress.ip_address(ip)
return ip
def resolve_handled_id(audit_record: dict) -> str:
handled_ids = audit_record.get("handled_ids") or []
handler_id = audit_record.get("handler_id")
if handled_ids:
return ",".join(str(x) for x in handled_ids)
if handler_id:
return str(handler_id)
raise ValueError("OPDo violation: handled_id cannot be null")
def stringify_payload(payload):
if isinstance(payload, str):
return payload
return json.dumps(payload, separators=(",", ":"))
def map_to_opdo_schema(audit_record: dict) -> dict:
return {
"@timestamp": format_timestamp(audit_record["changed_at"]),
"handler_id": (
str(audit_record["handler_id"])
if audit_record.get("handler_id") is not None
else None
),
"handled_id": resolve_handled_id(audit_record),
"crudt": CRUDT_MAP[audit_record["change_type"]],
"source": validate_ip(audit_record["ip_address"]),
"payload": stringify_payload(audit_record["route_payload"]),
}
π― Important Architectural Advice¶
Do NOT:
- Auto-fix bad IPs
- Auto-fill missing handled_id silently
- Allow malformed timestamps
Because:
If OPDo is ISO 27701 driven, audit logs are legal artifacts.
Fail fast. Log error. Do not index invalid compliance data.
π₯ What Would Break In Your Current Data¶
From your sample:
"unknown" β invalid IP β
"[]" β empty handled_ids β
timestamp without timezone β
"CREATE" instead of "C" β
payload double-encoded JSON β
So yes β you are currently non-compliant.
π§ͺ I Strongly Recommend¶
Add a unit test like:
def test_opdo_contract_compliance():
doc = map_to_opdo_schema(sample_record)
assert doc["@timestamp"]
assert doc["handled_id"]
assert doc["crudt"] in ["C","R","U","D"]
- IPv6 allowed a priori (we should validate IP)
- Is localhost (127.0.0.1) a priori ?
- a priori never happen ? (10.x, 192.168.x)
- no timezone required example of valid: β2024-10-10T11:34:05+02:00"