Skip to content

Snapshots

Snapshots periodically capture aggregate state so that loading doesn't require replaying every event from the beginning of the stream. Use them when aggregates accumulate many events and replay time becomes noticeable.

When to Use Snapshots

Snapshots add complexity — an extra store, serialization logic, and a strategy to decide when to take them. Only introduce snapshots when event replay time becomes a measurable bottleneck.

Tip

Profile first. Most aggregates don't need snapshots until they exceed hundreds of events.

Snapshot Strategy

The ISnapshotStrategy interface defines when a snapshot should be taken:

class ISnapshotStrategy(abc.ABC):
    @abc.abstractmethod
    def should_snapshot(self, version: int, events_since_snapshot: int) -> bool: ...

waku ships with EventCountStrategy(threshold=N), which triggers a snapshot every N events since the last snapshot. The default threshold is 100.

Snapshot Repository

Both aggregate styles have a snapshot-aware repository variant:

Style Base Repository Snapshot Repository
OOP EventSourcedRepository SnapshotEventSourcedRepository
Functional Decider DeciderRepository SnapshotDeciderRepository

SnapshotEventSourcedRepository requires two additional methods for state serialization:

from waku.eventsourcing.snapshot import Snapshot, SnapshotEventSourcedRepository

from app.aggregate import BankAccount


class BankAccountSnapshotRepository(SnapshotEventSourcedRepository[BankAccount]):
    def _snapshot_state(self, aggregate: BankAccount) -> object:
        return {'owner': aggregate.owner, 'balance': aggregate.balance}

    def _restore_from_snapshot(self, snapshot: Snapshot) -> BankAccount:
        account = BankAccount()
        account.owner = snapshot.state['owner']
        account.balance = snapshot.state['balance']
        return account

SnapshotDeciderRepository works the same way as DeciderRepository — state serialization is handled automatically since the state is already a dataclass:

1
2
3
4
5
6
7
8
9
from waku.cqrs import INotification
from waku.eventsourcing import SnapshotDeciderRepository

from app.decider import BankCommand
from app.state import BankAccountState


class BankAccountSnapshotRepository(SnapshotDeciderRepository[BankAccountState, BankCommand, INotification]):
    aggregate_name = 'BankAccount'

When loading, the snapshot repository first checks for a stored snapshot. If one exists, it verifies the schema version — applying migrations if needed or falling back to full replay if no migration path is available (see Schema Versioning). It then deserializes the state and replays only the events recorded after the snapshot version. If no snapshot is found, it falls back to full replay.

graph TD
    L[Load aggregate] --> CS{Snapshot exists?}
    CS -->|Yes| SV{Schema version matches?}
    SV -->|Yes| DS[Deserialize snapshot]
    SV -->|No| MG{Migration path?}
    MG -->|Yes| AP[Apply migrations] --> DS
    MG -->|No| FR
    DS --> RE[Replay events after snapshot version]
    RE --> A[Aggregate ready]
    CS -->|No| FR[Full replay from event 0]
    FR --> A

Module Wiring

Pass snapshot=SnapshotOptions(...) to bind_aggregate() or bind_decider():

from waku import module
from waku.eventsourcing import EventSourcingExtension, SnapshotOptions
from waku.eventsourcing.snapshot.strategy import EventCountStrategy

from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
from app.repository import BankAccountSnapshotRepository


@module(
    extensions=[
        EventSourcingExtension().bind_aggregate(
            repository=BankAccountSnapshotRepository,
            event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
            snapshot=SnapshotOptions(strategy=EventCountStrategy(threshold=50)),
        ),
    ],
)
class BankSnapshotModule:
    pass
from waku import module
from waku.eventsourcing import EventSourcingExtension, SnapshotOptions
from waku.eventsourcing.snapshot.strategy import EventCountStrategy

from app.decider import BankAccountDecider
from app.events import AccountOpened, MoneyDeposited
from app.repository import BankAccountSnapshotRepository


@module(
    extensions=[
        EventSourcingExtension().bind_decider(
            repository=BankAccountSnapshotRepository,
            decider=BankAccountDecider,
            event_types=[AccountOpened, MoneyDeposited],
            snapshot=SnapshotOptions(strategy=EventCountStrategy(threshold=50)),
        ),
    ],
)
class BankSnapshotModule:
    pass

The extension automatically registers the strategy in the DI container when snapshot is provided.

Warning

Snapshot support requires ISnapshotStore and ISnapshotStateSerializer to be registered in EventSourcingConfig. Without them, the snapshot repository will fail to resolve at runtime.

Snapshot Store

ISnapshotStore defines persistence for snapshots:

class ISnapshotStore(abc.ABC):
    async def load(self, stream_id: StreamId, /) -> Snapshot | None: ...
    async def save(self, snapshot: Snapshot, /) -> None: ...

The Snapshot dataclass carries the serialized state:

Field Type Description
stream_id StreamId Stream identifier (e.g., StreamId.for_aggregate('BankAccount', 'acc-1'))
state dict[str, Any] Serialized aggregate state
version int Stream version at snapshot time
state_type str State class name (for type safety on load)
schema_version int Schema version (defaults to 1)

Built-in implementations:

  • InMemorySnapshotStore — dictionary-backed, suitable for testing
  • SqlAlchemySnapshotStore — PostgreSQL-backed via SQLAlchemy async session

Schema Versioning

Aggregate state structures evolve over time — fields get added, renamed, or removed. Without versioning, old snapshots become undeserializable. waku solves this with snapshot schema versioning and a migration chain that transforms old snapshots to the current schema.

Declaring Schema Versions

Set schema_version in the SnapshotOptions passed to bind_aggregate() or bind_decider() to track the current state schema:

from waku.eventsourcing import EventSourcingExtension, SnapshotOptions

EventSourcingExtension().bind_aggregate(
    repository=BankAccountSnapshotRepository,
    event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    snapshot=SnapshotOptions(
        strategy=EventCountStrategy(threshold=50),
        schema_version=2,  # bump when state structure changes
    ),
)

All new snapshots are saved with this version. On load, the repository checks whether the stored snapshot's schema_version matches the configured schema_version in SnapshotOptions.

Writing Migrations

Implement ISnapshotMigration for each schema version transition:

from typing import Any

from waku.eventsourcing.snapshot import ISnapshotMigration, Snapshot, SnapshotEventSourcedRepository

from app.aggregate import BankAccount


class AddEmailField(ISnapshotMigration):
    from_version = 1
    to_version = 2

    def migrate(self, state: dict[str, Any], /) -> dict[str, Any]:
        return {**state, 'email': ''}


class RenameOwnerToName(ISnapshotMigration):
    from_version = 2
    to_version = 3

    def migrate(self, state: dict[str, Any], /) -> dict[str, Any]:
        owner = state.pop('owner', '')
        return {**state, 'name': owner}


class BankAccountSnapshotRepository(SnapshotEventSourcedRepository[BankAccount]):
    def _snapshot_state(self, aggregate: BankAccount) -> object:
        return {'name': aggregate.owner, 'email': '', 'balance': aggregate.balance}

    def _restore_from_snapshot(self, snapshot: Snapshot) -> BankAccount:
        account = BankAccount()
        account.owner = snapshot.state['name']
        account.balance = snapshot.state['balance']
        return account

Each migration specifies from_version and to_version and transforms the state dictionary. The SnapshotMigrationChain applies them in sequence.

Pass migrations alongside the schema version in SnapshotOptions:

from waku.eventsourcing import EventSourcingExtension, SnapshotOptions

EventSourcingExtension().bind_aggregate(
    repository=BankAccountSnapshotRepository,
    event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    snapshot=SnapshotOptions(
        strategy=EventCountStrategy(threshold=50),
        schema_version=3,
        migrations=[AddEmailField(), RenameOwnerToName()],
    ),
)

Migration Chain Validation

SnapshotMigrationChain validates migrations at construction time (during module registration). It rejects:

  • from_version less than 1
  • to_version not greater than from_version
  • Duplicate from_version values
  • Gaps in the chain (e.g., v1→v2 followed by v3→v4 — missing v2→v3)

Validation failures raise SnapshotMigrationChainError.

Graceful Degradation

When a stored snapshot has a different schema_version than the configured value:

graph TD
    L[Load snapshot] --> V{Version matches?}
    V -->|Yes| U[Use snapshot]
    V -->|No| M{Migration path exists?}
    M -->|Yes| A[Apply migrations]
    A --> U
    M -->|No| D[Discard snapshot + log warning]
    D --> R[Full replay from events]

Missing migrations never crash the system. The repository discards the outdated snapshot, logs a warning, and falls back to full event replay. This trades performance for correctness — the aggregate loads correctly, just without the snapshot optimization.

State Serialization

ISnapshotStateSerializer handles converting state objects to and from dictionaries:

class ISnapshotStateSerializer(abc.ABC):
    def serialize(self, state: object, /) -> dict[str, Any]: ...
    def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT: ...

JsonSnapshotStateSerializer is the built-in implementation. It uses an adaptix Retort under the hood and works with any dataclass state out of the box. The same default_retort and .extend() pattern used for custom event serializers applies here.

Configuration

Register the snapshot store and serializer through EventSourcingConfig:

EventSourcingConfig(
    snapshot_store=SqlAlchemySnapshotStore,  # class or factory callable
    snapshot_state_serializer=JsonSnapshotStateSerializer,
)

You can pass a factory callable instead of a class when the store requires additional constructor arguments (e.g., snapshot_store=make_sqlalchemy_snapshot_store(table)).

Table Schema Reference

es_snapshots

Column Type Constraints Description
stream_id Text PK Stream identifier (one snapshot per stream)
state JSONB NOT NULL Serialized aggregate state
version Integer NOT NULL Stream version at snapshot time
state_type Text NOT NULL State class name (for type safety on load)
schema_version Integer NOT NULL, default 1 Schema version for snapshot migrations
created_at TIMESTAMP WITH TIME ZONE default now() First snapshot time
updated_at TIMESTAMP WITH TIME ZONE default now(), auto-update Last snapshot update time

Bind with bind_snapshot_tables(metadata) from waku.eventsourcing.snapshot.sqlalchemy.

Further reading

  • Event Store — event persistence and stream mechanics
  • Schema Evolution — handling state schema changes over time
  • Aggregates — OOP aggregates and functional deciders
  • Testing — in-memory stores for integration tests