Skip to content

Projections

Projections build read models from event streams. waku provides two types:

  • Inline projections run synchronously during append_to_stream, guaranteeing immediate consistency between writes and reads.
  • Catch-up projections process events asynchronously in a background loop, eventually catching up with the event store.
graph LR
    subgraph Inline
        direction LR
        W[append_to_stream] --> P1[Projection]
        P1 --> RM1[Read Model]
    end
    subgraph Catch-Up
        direction LR
        ES[Event Store] -->|poll| R[Runner]
        R -->|batch| P2[Projection]
        P2 --> RM2[Read Model]
        P2 -->|checkpoint| CP[(Checkpoint Store)]
    end

Inline Projections

Implement IProjection to create an inline projection. It runs inside the same scope as append_to_stream, so the read model is always consistent with the write model.

Every projection must define a projection_name class attribute.

from collections.abc import Sequence

from typing import TYPE_CHECKING

from waku.eventsourcing.contracts.event import StoredEvent
from waku.eventsourcing.projection.interfaces import IProjection

from app.events import MoneyDeposited, MoneyWithdrawn

if TYPE_CHECKING:
    from waku.eventsourcing.contracts.stream import StreamId


class AccountBalanceProjection(IProjection):
    projection_name = 'account_balance'

    def __init__(self) -> None:
        self.balances: dict[StreamId, int] = {}

    async def project(self, events: Sequence[StoredEvent], /) -> None:
        for event in events:
            match event.data:
                case MoneyDeposited(amount=amount):
                    self.balances[event.stream_id] = self.balances.get(event.stream_id, 0) + amount
                case MoneyWithdrawn(amount=amount):
                    self.balances[event.stream_id] = self.balances.get(event.stream_id, 0) - amount

Register inline projections through bind_aggregate() (or bind_decider()):

EventSourcingExtension().bind_aggregate(
    repository=BankAccountRepository,
    event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    projections=[AccountBalanceProjection],
)

Warning

Inline projections add latency to every write because they execute within the same operation. Keep them lightweight or use catch-up projections for expensive read model updates.

Catch-Up Projections

Implement ICatchUpProjection for projections that run asynchronously in a background process. Catch-up projections poll the event store, process events in batches, and checkpoint their progress.

Warning

Catch-up projections must be idempotent — reprocessing the same events (e.g., after a crash before commit) must produce the same result.

Each catch-up projection has an error_policy class attribute (defaults to ErrorPolicy.RETRY) and an optional teardown() method called during rebuilds to clean up existing state.

from collections.abc import Sequence
from dataclasses import dataclass

from waku.eventsourcing.contracts.event import StoredEvent
from waku.eventsourcing.projection.interfaces import ErrorPolicy, ICatchUpProjection

from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn


@dataclass
class AccountSummary:
    owner: str = ''
    balance: int = 0
    transaction_count: int = 0


class AccountSummaryProjection(ICatchUpProjection):
    projection_name = 'account_summary'
    error_policy = ErrorPolicy.RETRY

    def __init__(self) -> None:
        self.summaries: dict[str, AccountSummary] = {}

    async def project(self, events: Sequence[StoredEvent], /) -> None:
        for event in events:
            stream_key = event.stream_id.stream_key
            summary = self.summaries.setdefault(stream_key, AccountSummary())

            match event.data:
                case AccountOpened(owner=owner):
                    summary.owner = owner
                case MoneyDeposited(amount=amount):
                    summary.balance += amount
                    summary.transaction_count += 1
                case MoneyWithdrawn(amount=amount):
                    summary.balance -= amount
                    summary.transaction_count += 1

    async def teardown(self) -> None:
        self.summaries.clear()

Register catch-up projections via bind_catch_up_projections():

EventSourcingExtension()
    .bind_aggregate(
        repository=BankAccountRepository,
        event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    )
    .bind_catch_up_projections([AccountSummaryProjection])

Error Policies

Policy Behavior
ErrorPolicy.RETRY Retry the failed batch with exponential backoff (default)
ErrorPolicy.SKIP Skip the failed batch and continue from the next checkpoint
ErrorPolicy.STOP Stop the projection permanently until manually restarted

CatchUpProjectionRunner

CatchUpProjectionRunner polls the event store and dispatches event batches to registered catch-up projections. Each projection runs in its own concurrent task.

from waku.di import AsyncContainer
from waku.eventsourcing.projection.config import CatchUpProjectionConfig
from waku.eventsourcing.projection.interfaces import ICatchUpProjection
from waku.eventsourcing.projection.lock.in_memory import InMemoryProjectionLock
from waku.eventsourcing.projection.runner import CatchUpProjectionRunner


async def run_projections(
    container: AsyncContainer,
    projection_types: list[type[ICatchUpProjection]],
) -> None:
    runner = CatchUpProjectionRunner(
        container=container,
        lock=InMemoryProjectionLock(),
        projection_types=projection_types,
        config=CatchUpProjectionConfig(
            batch_size=100,
            max_attempts=3,
        ),
    )
    await runner.run()

The runner listens for SIGTERM and SIGINT to shut down gracefully, finishing any in-progress batch before exiting.

Use rebuild(projection_name) to reprocess all events from the beginning. This calls teardown() on the projection, resets the checkpoint to zero, and replays every event through the projection.

Tip

Run the projection runner as a separate process (e.g., a dedicated worker or sidecar) so it does not block your main application.

Configuration

CatchUpProjectionConfig controls the runner's polling and retry behavior:

Field Default Description
batch_size 100 Maximum events per batch
max_attempts 3 Retry attempts before applying the error policy
base_retry_delay_seconds 10.0 Initial delay between retries
max_retry_delay_seconds 300.0 Maximum delay between retries
poll_interval_min_seconds 0.5 Minimum polling interval when events are available
poll_interval_max_seconds 5.0 Maximum polling interval when idle
poll_interval_step_seconds 1.0 Increment per idle cycle
poll_interval_jitter_factor 0.1 Random jitter factor applied to the interval

Distributed Locking

IProjectionLock ensures only one instance of each catch-up projection runs at a time across multiple processes. This prevents duplicate processing and checkpoint conflicts.

class IProjectionLock(abc.ABC):
    @contextlib.asynccontextmanager
    async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
        """Yields True if the lock was acquired, False if held by another instance."""

Choosing a Lock

graph TD
    Q1{Single process?}
    Q1 -->|Yes| IM[InMemoryProjectionLock]
    Q1 -->|No| Q2{Using PgBouncer<br/>in transaction mode?}
    Q2 -->|Yes| LB[PostgresLeaseProjectionLock]
    Q2 -->|No| Q3{Long-running projections<br/>with many connections?}
    Q3 -->|Yes| LB
    Q3 -->|No| ADV[PostgresAdvisoryProjectionLock]
InMemoryProjectionLock PostgresLeaseProjectionLock PostgresAdvisoryProjectionLock
Use case Single process, testing Multi-process production Multi-process, simple setups
Connection held None Only during heartbeats Entire lock duration
PgBouncer compatible N/A Yes No (session-bound)
Extra table required No Yes (es_projection_leases) No
Lock granularity Per process Per lease TTL Per DB session
Failure detection Instant Heartbeat interval Connection drop

InMemoryProjectionLock

Always acquires the lock immediately. Tracks held lock names for testing. Use this for single-process deployments and in tests.

PostgresLeaseProjectionLock

Uses a database table with TTL-based leases for multi-process coordination. A background heartbeat task renews the lease periodically. If the heartbeat detects the lease was stolen (e.g., by another instance after TTL expiry), it cancels the projection task.

Configured via LeaseConfig:

Field Default Description
ttl_seconds 30.0 How long the lease is valid before expiring
renew_interval_factor 1/3 Fraction of TTL at which the lease is renewed
renew_interval_seconds (derived) ttl_seconds * renew_interval_factor — read-only property

With the defaults, the lease renews every 10 seconds and expires after 30 seconds of silence.

Consistency guarantees

There is no fencing token mechanism — a stalled holder (e.g., GC pause) can briefly overlap with a new holder until its next heartbeat fires.

In practice this is safe because the runner resolves the projection, event reader, and checkpoint store from a single DI scope. When using SqlAlchemyCheckpointStore with a scoped AsyncSession, the projection writes and checkpoint save share the same transaction (the checkpoint store calls flush(), not commit()). This means either both succeed atomically or both roll back — duplicate processing from a brief overlap will not corrupt the read model.

PostgresAdvisoryProjectionLock

Uses PostgreSQL advisory locks via pg_try_advisory_lock(hashtext(name)). The lock is session-bound — it holds a database connection for the entire duration.

Warning

Advisory locks are not compatible with PgBouncer in transaction-pooling mode because the lock is tied to the database session, not the transaction. Releasing the connection back to the pool releases the lock. Use PostgresLeaseProjectionLock instead.

Checkpoint Store

ICheckpointStore tracks each catch-up projection's last processed position so it resumes from where it left off after restarts.

class ICheckpointStore(abc.ABC):
    async def load(self, projection_name: str, /) -> Checkpoint | None: ...
    async def save(self, checkpoint: Checkpoint, /) -> None: ...

The Checkpoint dataclass carries the projection name, last processed global position, and timestamp.

Built-in implementations:

  • InMemoryCheckpointStore — dictionary-backed, suitable for single-process deployments and testing
  • SqlAlchemyCheckpointStore — PostgreSQL-backed via SQLAlchemy async session

Configure the checkpoint store through EventSourcingConfig:

from waku.eventsourcing import EventSourcingConfig
from waku.eventsourcing.projection.sqlalchemy import make_sqlalchemy_checkpoint_store

es_config = EventSourcingConfig(
    checkpoint_store=make_sqlalchemy_checkpoint_store(checkpoints_table),
)

make_sqlalchemy_checkpoint_store() works the same way as make_sqlalchemy_event_store() — it returns a factory that Dishka uses to construct the store with its AsyncSession dependency injected automatically.

Table Schema Reference

es_checkpoints

Column Type Constraints Description
projection_name Text PK Unique projection identifier
position BigInteger NOT NULL Last processed global position
updated_at TIMESTAMP WITH TIME ZONE NOT NULL Last checkpoint update time
created_at TIMESTAMP WITH TIME ZONE default now() First checkpoint time

Bind with bind_checkpoint_tables(metadata) from waku.eventsourcing.projection.sqlalchemy.

es_projection_leases

Only required when using PostgresLeaseProjectionLock.

Column Type Constraints Description
projection_name Text PK Projection being locked
holder_id Text NOT NULL UUID of the lock holder instance
acquired_at TIMESTAMP WITH TIME ZONE default now() When the lease was first acquired
renewed_at TIMESTAMP WITH TIME ZONE default now() Last heartbeat renewal time
expires_at TIMESTAMP WITH TIME ZONE NOT NULL When the lease expires if not renewed

Bind with bind_lease_tables(metadata) from waku.eventsourcing.projection.lock.sqlalchemy.

Further reading

  • Event Store — event persistence and metadata enrichment
  • Schema Evolution — handling evolved events in projections
  • Snapshots — aggregate snapshots and checkpoint strategies
  • Testing — in-memory stores for integration tests