Projections¶
Projections build read models from event streams. waku provides two types:
- Inline projections run synchronously during
append_to_stream, guaranteeing immediate consistency between writes and reads. - Catch-up projections process events asynchronously in a background loop, eventually catching up with the event store.
graph LR
subgraph Inline
direction LR
W[append_to_stream] --> P1[Projection]
P1 --> RM1[Read Model]
end
subgraph Catch-Up
direction LR
ES[Event Store] -->|poll| R[Runner]
R -->|batch| P2[Projection]
P2 --> RM2[Read Model]
P2 -->|checkpoint| CP[(Checkpoint Store)]
end
Inline Projections¶
Implement IProjection to create an inline projection. It runs inside the same scope as
append_to_stream, so the read model is always consistent with the write model.
Every projection must define a projection_name class attribute.
Register inline projections through bind_aggregate() (or bind_decider()):
EventSourcingExtension().bind_aggregate(
repository=BankAccountRepository,
event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
projections=[AccountBalanceProjection],
)
Warning
Inline projections add latency to every write because they execute within the same operation. Keep them lightweight or use catch-up projections for expensive read model updates.
Catch-Up Projections¶
Implement ICatchUpProjection for projections that run asynchronously in a background process.
Catch-up projections poll the event store, process events in batches, and checkpoint their progress.
Warning
Catch-up projections must be idempotent — reprocessing the same events (e.g., after a crash before commit) must produce the same result.
Each catch-up projection has an error_policy class attribute (defaults to ErrorPolicy.RETRY)
and an optional teardown() method called during rebuilds to clean up existing state.
Register catch-up projections via bind_catch_up_projections():
EventSourcingExtension()
.bind_aggregate(
repository=BankAccountRepository,
event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
)
.bind_catch_up_projections([AccountSummaryProjection])
Error Policies¶
| Policy | Behavior |
|---|---|
ErrorPolicy.RETRY |
Retry the failed batch with exponential backoff (default) |
ErrorPolicy.SKIP |
Skip the failed batch and continue from the next checkpoint |
ErrorPolicy.STOP |
Stop the projection permanently until manually restarted |
CatchUpProjectionRunner¶
CatchUpProjectionRunner polls the event store and dispatches event batches to registered
catch-up projections. Each projection runs in its own concurrent task.
The runner listens for SIGTERM and SIGINT to shut down gracefully, finishing any
in-progress batch before exiting.
Use rebuild(projection_name) to reprocess all events from the beginning. This calls
teardown() on the projection, resets the checkpoint to zero, and replays every event
through the projection.
Tip
Run the projection runner as a separate process (e.g., a dedicated worker or sidecar) so it does not block your main application.
Configuration¶
CatchUpProjectionConfig controls the runner's polling and retry behavior:
| Field | Default | Description |
|---|---|---|
batch_size |
100 |
Maximum events per batch |
max_attempts |
3 |
Retry attempts before applying the error policy |
base_retry_delay_seconds |
10.0 |
Initial delay between retries |
max_retry_delay_seconds |
300.0 |
Maximum delay between retries |
poll_interval_min_seconds |
0.5 |
Minimum polling interval when events are available |
poll_interval_max_seconds |
5.0 |
Maximum polling interval when idle |
poll_interval_step_seconds |
1.0 |
Increment per idle cycle |
poll_interval_jitter_factor |
0.1 |
Random jitter factor applied to the interval |
Distributed Locking¶
IProjectionLock ensures only one instance of each catch-up projection runs at a time
across multiple processes. This prevents duplicate processing and checkpoint conflicts.
class IProjectionLock(abc.ABC):
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
"""Yields True if the lock was acquired, False if held by another instance."""
Choosing a Lock¶
graph TD
Q1{Single process?}
Q1 -->|Yes| IM[InMemoryProjectionLock]
Q1 -->|No| Q2{Using PgBouncer<br/>in transaction mode?}
Q2 -->|Yes| LB[PostgresLeaseProjectionLock]
Q2 -->|No| Q3{Long-running projections<br/>with many connections?}
Q3 -->|Yes| LB
Q3 -->|No| ADV[PostgresAdvisoryProjectionLock]
InMemoryProjectionLock |
PostgresLeaseProjectionLock |
PostgresAdvisoryProjectionLock |
|
|---|---|---|---|
| Use case | Single process, testing | Multi-process production | Multi-process, simple setups |
| Connection held | None | Only during heartbeats | Entire lock duration |
| PgBouncer compatible | N/A | Yes | No (session-bound) |
| Extra table required | No | Yes (es_projection_leases) |
No |
| Lock granularity | Per process | Per lease TTL | Per DB session |
| Failure detection | Instant | Heartbeat interval | Connection drop |
InMemoryProjectionLock¶
Always acquires the lock immediately. Tracks held lock names for testing. Use this for single-process deployments and in tests.
PostgresLeaseProjectionLock¶
Uses a database table with TTL-based leases for multi-process coordination. A background heartbeat task renews the lease periodically. If the heartbeat detects the lease was stolen (e.g., by another instance after TTL expiry), it cancels the projection task.
Configured via LeaseConfig:
| Field | Default | Description |
|---|---|---|
ttl_seconds |
30.0 |
How long the lease is valid before expiring |
renew_interval_factor |
1/3 |
Fraction of TTL at which the lease is renewed |
renew_interval_seconds |
(derived) | ttl_seconds * renew_interval_factor — read-only property |
With the defaults, the lease renews every 10 seconds and expires after 30 seconds of silence.
Consistency guarantees
There is no fencing token mechanism — a stalled holder (e.g., GC pause) can briefly overlap with a new holder until its next heartbeat fires.
In practice this is safe because the runner resolves the projection, event reader, and
checkpoint store from a single DI scope. When using SqlAlchemyCheckpointStore with a
scoped AsyncSession, the projection writes and checkpoint save share the same transaction
(the checkpoint store calls flush(), not commit()). This means either both succeed
atomically or both roll back — duplicate processing from a brief overlap will not corrupt
the read model.
PostgresAdvisoryProjectionLock¶
Uses PostgreSQL advisory locks
via pg_try_advisory_lock(hashtext(name)). The lock is session-bound — it holds a database
connection for the entire duration.
Warning
Advisory locks are not compatible with PgBouncer in transaction-pooling mode because
the lock is tied to the database session, not the transaction. Releasing the connection
back to the pool releases the lock. Use PostgresLeaseProjectionLock instead.
Checkpoint Store¶
ICheckpointStore tracks each catch-up projection's last processed position so it resumes
from where it left off after restarts.
class ICheckpointStore(abc.ABC):
async def load(self, projection_name: str, /) -> Checkpoint | None: ...
async def save(self, checkpoint: Checkpoint, /) -> None: ...
The Checkpoint dataclass carries the projection name, last processed global position, and timestamp.
Built-in implementations:
InMemoryCheckpointStore— dictionary-backed, suitable for single-process deployments and testingSqlAlchemyCheckpointStore— PostgreSQL-backed via SQLAlchemy async session
Configure the checkpoint store through EventSourcingConfig:
from waku.eventsourcing import EventSourcingConfig
from waku.eventsourcing.projection.sqlalchemy import make_sqlalchemy_checkpoint_store
es_config = EventSourcingConfig(
checkpoint_store=make_sqlalchemy_checkpoint_store(checkpoints_table),
)
make_sqlalchemy_checkpoint_store() works the same way as make_sqlalchemy_event_store() — it
returns a factory that Dishka uses to construct the store with its AsyncSession dependency
injected automatically.
Table Schema Reference¶
es_checkpoints¶
| Column | Type | Constraints | Description |
|---|---|---|---|
projection_name |
Text |
PK | Unique projection identifier |
position |
BigInteger |
NOT NULL | Last processed global position |
updated_at |
TIMESTAMP WITH TIME ZONE |
NOT NULL | Last checkpoint update time |
created_at |
TIMESTAMP WITH TIME ZONE |
default now() |
First checkpoint time |
Bind with bind_checkpoint_tables(metadata) from waku.eventsourcing.projection.sqlalchemy.
es_projection_leases¶
Only required when using PostgresLeaseProjectionLock.
| Column | Type | Constraints | Description |
|---|---|---|---|
projection_name |
Text |
PK | Projection being locked |
holder_id |
Text |
NOT NULL | UUID of the lock holder instance |
acquired_at |
TIMESTAMP WITH TIME ZONE |
default now() |
When the lease was first acquired |
renewed_at |
TIMESTAMP WITH TIME ZONE |
default now() |
Last heartbeat renewal time |
expires_at |
TIMESTAMP WITH TIME ZONE |
NOT NULL | When the lease expires if not renewed |
Bind with bind_lease_tables(metadata) from waku.eventsourcing.projection.lock.sqlalchemy.
Further reading¶
- Event Store — event persistence and metadata enrichment
- Schema Evolution — handling evolved events in projections
- Snapshots — aggregate snapshots and checkpoint strategies
- Testing — in-memory stores for integration tests