Core DynamoDB Stream Worker
Core reactive consistency worker that consumes DynamoDB Stream events and propagates datastore changes to downstream subsystems including search indexing, image default reconciliation, and FAA matcher maintenance. Ensures cross-system eventual consistency through event-driven fan-out processing.
- Listen to DynamoDB Streams and process each stream record in the batch.
-
Deserialize DynamoDB stream images (
NewImage,OldImage) into plain Python dicts usingboto3.dynamodb.types.TypeDeserializer. -
Route changes to subsystem handlers based on event type:
INSERT/MODIFYtriggers upsert-style behavior,REMOVEtriggers cleanup. -
Enforce eventual consistency across:
- Search documents (Typesense) via an SQS action queue
- Default image pointer rows (DefaultEntityImage contract)
- ICAO entity matcher statistics derived from FAA link rows
- Fail loudly (no swallowing exceptions) so AWS retry + DLQ behavior can protect against silent drift.
-
DynamoDB emits a stream record with
eventNameand adynamodbpayload containingKeys,NewImage,OldImage. - The Lambda converts images to Python dicts (so downstream code can use normal types).
-
The worker logs a compact record summary (
eventName,PK,SK, item size). -
It dispatches to:
typesense_sync(search document synchronization)image_sync(default-image enforcement)faa_matcher_sync(matcher counts / signals)
DynamoDB Streams use an attribute-value encoding (e.g. {"S":"..."}, {"N":"..."}).
This worker converts that structure into native Python types via:
ddb_to_python(image)→{ field: python_value }- Supports both
NewImageandOldImage
This conversion is what allows downstream handlers to be written like normal domain logic rather than “DynamoDB stream parsing” code.
Each record is processed inside a try/except. If anything fails, the worker:
- Logs the failure with
PK/SKcontext and stack trace (exc_info=True) - Re-raises the exception (explicitly not swallowed)
That “fail fast” behavior is intentional: it delegates resilience to AWS Lambda + event source retry policies, and prevents silent divergence between DynamoDB and downstream systems.
This subsystem translates specific DynamoDB rows into Typesense documents and queues indexing actions.
The key design choice: the stream processor does not talk to Typesense directly.
Instead it pushes upsert/delete actions into an SQS queue
(TYPESENSE_QUEUE_URL), allowing search indexing to scale independently and retry safely.
A row is eligible for Typesense sync only if its PK/SK matches one of the
STREAM_MODELS specs (regex-based). Each spec declares:
model class, PK regex, SK regex, and the target Typesense collection.
- If
old_docexisted andnew_docdisappears → enqueuedelete - If
new_docexists and differs fromold_doc→ enqueueupsert - If no spec match → skip quietly (debug/info logs)
The specs define which single-table “rows” are considered indexable and what collection they map to.
AIRCRAFT#REGISTRY#FAA+DETAIL#<REG>→registryAIRCRAFT#REGISTRY#FOREIGN+DETAIL#<REG>→registryIMAGES#ORG#SYSTEM#ENTITY#REGISTRATION#ID#...+IMG#...→aircraft_imagesICAO#AIRCRAFT+TYPE#...→icao_aircraftICAO#MANUFACTURERS+MFR#...→icao_manufacturersICAO#VARIANTSrows are special:collection=None(used for autosuggestion workflows, not indexed here)
This subsystem enforces a platform invariant: for every entity that has images, there is a consistent
“default image” pointer stored as a DefaultEntityImage row.
The stream processor triggers reconciliation whenever an image row is inserted/updated/deleted.
PK starting with IMAGES# and SK starting with IMG#
are processed by this subsystem. Everything else is ignored.
The image row contains org_id, entity_type, entity_id.
The handler selects the correct image model (ICAO vs Registration) and then queries
for the newest image (limit=1).
- Case 1: No images remain → delete default row (if present) + repair
- Case 2: Images exist but no default row → create default to newest + repair
- Case 3: Default exists but points to missing image → switch to newest + repair
- Case 4: Default is valid → repair anyway (ensures downstream integrity)
Net effect: any image churn is automatically normalized into a consistent, query-friendly “default image” contract, which keeps UI and downstream pipelines stable.
This subsystem keeps a derived matcher structure up to date based on FAA link rows. It’s effectively maintaining a “how often do we see this (ICAO, manufacturer, model) tuple” signal that can power autosuggestions, matching heuristics, or ranking.
PK=FAA#LINKS and SK=MFR_MDL_CODE#...#SN#...
are processed. Anything else is ignored.
If a link row gains valid matcher fields (type_designator, manufacturer, model),
the worker calls increment() on ICAOEntityMatcherModel.
If matcher fields change, the worker calls move(old, new) to decrement the old tuple and
increment the new tuple. If fields don’t change, it logs and exits.
When a link row is deleted (and has matcher fields), the worker calls decrement()
to remove the derived signal.
-
LOG_LEVELcontrols logger verbosity (logger.pyattaches a StreamHandler if none exists). -
TYPESENSE_QUEUE_URLis required and is used to enqueueupsert/deleteactions. -
The worker relies on repository methods like
objects_sync.get,list_images,repair_default,increment,move,decrementto keep side effects consistent and encapsulated.