AircraftOps
Search
Posts

Core DynamoDB Stream Worker

Architecture → Serverless Functions

Core reactive consistency worker that consumes DynamoDB Stream events and propagates datastore changes to downstream subsystems including search indexing, image default reconciliation, and FAA matcher maintenance. Ensures cross-system eventual consistency through event-driven fan-out processing.


What This Worker Is Responsible For
  • Listen to DynamoDB Streams and process each stream record in the batch.
  • Deserialize DynamoDB stream images (NewImage, OldImage) into plain Python dicts using boto3.dynamodb.types.TypeDeserializer.
  • Route changes to subsystem handlers based on event type: INSERT/MODIFY triggers upsert-style behavior, REMOVE triggers cleanup.
  • Enforce eventual consistency across:
    • Search documents (Typesense) via an SQS action queue
    • Default image pointer rows (DefaultEntityImage contract)
    • ICAO entity matcher statistics derived from FAA link rows
  • Fail loudly (no swallowing exceptions) so AWS retry + DLQ behavior can protect against silent drift.
End-to-End Processing Flow
  1. DynamoDB emits a stream record with eventName and a dynamodb payload containing Keys, NewImage, OldImage.
  2. The Lambda converts images to Python dicts (so downstream code can use normal types).
  3. The worker logs a compact record summary (eventName, PK, SK, item size).
  4. It dispatches to:
    • typesense_sync (search document synchronization)
    • image_sync (default-image enforcement)
    • faa_matcher_sync (matcher counts / signals)
1) Stream Ingestion + Deserialization

DynamoDB Streams use an attribute-value encoding (e.g. {"S":"..."}, {"N":"..."}). This worker converts that structure into native Python types via:

  • ddb_to_python(image){ field: python_value }
  • Supports both NewImage and OldImage

This conversion is what allows downstream handlers to be written like normal domain logic rather than “DynamoDB stream parsing” code.

2) Operational Guarantees (Retries / DLQ)

Each record is processed inside a try/except. If anything fails, the worker:

  • Logs the failure with PK/SK context and stack trace (exc_info=True)
  • Re-raises the exception (explicitly not swallowed)

That “fail fast” behavior is intentional: it delegates resilience to AWS Lambda + event source retry policies, and prevents silent divergence between DynamoDB and downstream systems.

Subsystem A — Typesense Search Sync (via SQS)

This subsystem translates specific DynamoDB rows into Typesense documents and queues indexing actions. The key design choice: the stream processor does not talk to Typesense directly. Instead it pushes upsert/delete actions into an SQS queue (TYPESENSE_QUEUE_URL), allowing search indexing to scale independently and retry safely.

Model Resolution

A row is eligible for Typesense sync only if its PK/SK matches one of the STREAM_MODELS specs (regex-based). Each spec declares: model class, PK regex, SK regex, and the target Typesense collection.

Upsert vs Delete Logic
  • If old_doc existed and new_doc disappears → enqueue delete
  • If new_doc exists and differs from old_doc → enqueue upsert
  • If no spec match → skip quietly (debug/info logs)

Entities Covered by STREAM_MODELS

The specs define which single-table “rows” are considered indexable and what collection they map to.

  • AIRCRAFT#REGISTRY#FAA + DETAIL#<REG>registry
  • AIRCRAFT#REGISTRY#FOREIGN + DETAIL#<REG>registry
  • IMAGES#ORG#SYSTEM#ENTITY#REGISTRATION#ID#... + IMG#...aircraft_images
  • ICAO#AIRCRAFT + TYPE#...icao_aircraft
  • ICAO#MANUFACTURERS + MFR#...icao_manufacturers
  • ICAO#VARIANTS rows are special: collection=None (used for autosuggestion workflows, not indexed here)
Subsystem B — Image Sync (DefaultEntityImage Reconciliation)

This subsystem enforces a platform invariant: for every entity that has images, there is a consistent “default image” pointer stored as a DefaultEntityImage row. The stream processor triggers reconciliation whenever an image row is inserted/updated/deleted.

Trigger Condition
Only rows with PK starting with IMAGES# and SK starting with IMG# are processed by this subsystem. Everything else is ignored.
Entity Resolution

The image row contains org_id, entity_type, entity_id. The handler selects the correct image model (ICAO vs Registration) and then queries for the newest image (limit=1).

Reconciliation Cases
  • Case 1: No images remain → delete default row (if present) + repair
  • Case 2: Images exist but no default row → create default to newest + repair
  • Case 3: Default exists but points to missing image → switch to newest + repair
  • Case 4: Default is valid → repair anyway (ensures downstream integrity)

Net effect: any image churn is automatically normalized into a consistent, query-friendly “default image” contract, which keeps UI and downstream pipelines stable.

Subsystem C — FAA Matcher Sync (ICAOEntityMatcherModel)

This subsystem keeps a derived matcher structure up to date based on FAA link rows. It’s effectively maintaining a “how often do we see this (ICAO, manufacturer, model) tuple” signal that can power autosuggestions, matching heuristics, or ranking.

Trigger Condition
Only FAA link rows matching: PK=FAA#LINKS and SK=MFR_MDL_CODE#...#SN#... are processed. Anything else is ignored.
INSERT

If a link row gains valid matcher fields (type_designator, manufacturer, model), the worker calls increment() on ICAOEntityMatcherModel.

MODIFY

If matcher fields change, the worker calls move(old, new) to decrement the old tuple and increment the new tuple. If fields don’t change, it logs and exits.

REMOVE

When a link row is deleted (and has matcher fields), the worker calls decrement() to remove the derived signal.

Configuration & Integration Points
  • LOG_LEVEL controls logger verbosity (logger.py attaches a StreamHandler if none exists).
  • TYPESENSE_QUEUE_URL is required and is used to enqueue upsert/delete actions.
  • The worker relies on repository methods like objects_sync.get, list_images, repair_default, increment, move, decrement to keep side effects consistent and encapsulated.