DynamoDB Object Mapper and Repository Framework
Core base model used for all DynamoDB-backed entities within the platform. Provides deterministic primary-key generation, timestamp management, repository binding, and standardized serialization behavior for the single-table datastore architecture. The model layer is distributed as a shared runtime dependency across backend services and Lambda workers to ensure consistent persistence semantics across ingestion pipelines, APIs, and stream-processing components.
Architecture Overview
Domain Model (Pydantic)
↓
DynamoBaseModel Serialization
↓
Repository Layer (Sync / Async)
↓
DynamoDB Single-Table Storage
This layered design ensures deterministic key construction, centralized write semantics, and reusable persistence logic across ingestion pipelines, background workers, and API services.
DynamoBaseModel
DynamoBaseModel is the base class for all DynamoDB-backed domain
entities. It extends Pydantic’s BaseModel and provides deterministic
primary-key generation, timestamp handling, and serialization utilities
required for single-table modeling.
Responsibilities
- Define partition and sort key behavior via
pk()andsk() - Automatically manage
created_atandupdated_attimestamps - Serialize entities into DynamoDB-compatible item structures
- Attach repository managers automatically at subclass creation
Key Requirements
def pk(self) -> str def sk(self) -> str
All subclasses must implement deterministic key construction methods to ensure consistent single-table partitioning behavior.
Serialization Behavior
- Adds timestamps automatically during writes
- Writes PK / SK attributes into the persisted item
- Optionally attaches GSI keys (GSI1 / GSI2)
- Removes null attributes
- Produces a normalized persistence payload
Automatic Repository Binding
When a subclass is declared, repository managers are automatically attached:
Model.objects_sync → synchronous repository Model.objects → asynchronous repository
If a subclass defines a custom Objects manager, it is dynamically
merged with the base repository implementation to allow model-specific query
extensions while retaining persistence operations.
DynamoRepo (Synchronous Repository)
DynamoRepo provides the synchronous persistence interface used by
ingestion workers, background jobs, and batch-processing pipelines.
Core Operations
- save() — insert or upsert individual records
- delete() — remove records using PK/SK composite keys
- upsert_many() — batch write operations in 25-item chunks
- delete_many() — batch delete operations
Write Behavior
- Validates model type before persistence
- Automatically applies timestamp updates
- Serializes payloads into DynamoDB attribute format
- Supports conditional writes for create-only operations
Batch Processing and Retry Strategy
- Batch operations execute in 25-record groups
- Automatically retries unprocessed items returned by DynamoDB
- Uses exponential backoff with randomized jitter
- Protects ingestion pipelines from partition-level throttling failures
AsyncDynamoRepo
AsyncDynamoRepo provides asynchronous compatibility for FastAPI and
other async service layers without duplicating repository logic.
Instead of re-implementing persistence methods, synchronous repository calls are executed using thread offloading:
await Model.objects.save(...) await Model.objects.upsert_many(...)
- Automatically wraps all synchronous repository methods
- Uses
asyncio.to_thread()for non-blocking execution - Allows ingestion workers and APIs to share identical persistence logic
- Eliminates duplicated datastore access implementations
Design Characteristics
- Schema-driven persistence using strongly-typed Pydantic models
- Automatic repository attachment per entity
- Deterministic PK/SK generation required at model level
- Batch-optimized write patterns aligned with DynamoDB throughput behavior
- Async compatibility without repository duplication
- Native support for DynamoDB single-table architectures
Operational Benefits
- Centralized persistence semantics across services
- High-throughput ingestion compatibility via batched writes
- Safe retry handling for large ingestion workloads
- Unified sync and async persistence interfaces
- Strong consistency between domain models and datastore structure