Skip to content

Aggregates

waku supports two approaches to modeling event-sourced aggregates: OOP aggregates (mutable, class-based) and functional deciders (immutable, function-based).

Domain Events

Both approaches share the same event definitions — frozen dataclasses implementing INotification:

from dataclasses import dataclass

from waku.cqrs import INotification


@dataclass(frozen=True, kw_only=True)
class AccountOpened(INotification):
    account_id: str
    owner: str


@dataclass(frozen=True, kw_only=True)
class MoneyDeposited(INotification):
    account_id: str
    amount: int


@dataclass(frozen=True, kw_only=True)
class MoneyWithdrawn(INotification):
    account_id: str
    amount: int

OOP Aggregates

The classic approach — extend EventSourcedAggregate, raise events through command methods, and apply them to mutate internal state. This walkthrough builds a complete bank account example from aggregate to running application.

Defining the Aggregate

from typing_extensions import override

from waku.cqrs import INotification
from waku.eventsourcing import EventSourcedAggregate

from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn


class BankAccount(EventSourcedAggregate):
    def __init__(self) -> None:
        super().__init__()
        self.account_id: str = ''
        self.owner: str = ''
        self.balance: int = 0

    def open(self, account_id: str, owner: str) -> None:
        self._raise_event(AccountOpened(account_id=account_id, owner=owner))

    def deposit(self, account_id: str, amount: int) -> None:
        if amount <= 0:
            msg = 'Deposit amount must be positive'
            raise ValueError(msg)
        self._raise_event(MoneyDeposited(account_id=account_id, amount=amount))

    def withdraw(self, account_id: str, amount: int) -> None:
        if amount > self.balance:
            msg = f'Insufficient funds: balance={self.balance}, requested={amount}'
            raise ValueError(msg)
        self._raise_event(MoneyWithdrawn(account_id=account_id, amount=amount))

    @override
    def _apply(self, event: INotification) -> None:
        match event:
            case AccountOpened(account_id=account_id, owner=owner):
                self.account_id = account_id
                self.owner = owner
            case MoneyDeposited(amount=amount):
                self.balance += amount
            case MoneyWithdrawn(amount=amount):
                self.balance -= amount

Key points:

  • _raise_event() first applies the event (state mutation), then queues it for persistence
  • _apply() must handle every event type the aggregate can produce
  • Use match statements for clean event routing

Repository

Subclass EventSourcedRepository with the aggregate type parameter:

1
2
3
4
5
6
7
from waku.eventsourcing import EventSourcedRepository

from app.aggregate import BankAccount


class BankAccountRepository(EventSourcedRepository[BankAccount]):
    pass

The repository derives aggregate_name from the type parameter by default. This name determines the event stream prefix (e.g., BankAccount-acc-1). You can override it by setting aggregate_name explicitly as a class variable.

Command Handler

EventSourcedCommandHandler coordinates loading, executing, saving, and publishing:

from dataclasses import dataclass

from typing_extensions import override

from waku.cqrs import Request, Response
from waku.eventsourcing import EventSourcedCommandHandler

from app.aggregate import BankAccount


@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
    account_id: str


@dataclass(frozen=True, kw_only=True)
class OpenAccountCommand(Request[OpenAccountResult]):
    account_id: str
    owner: str


class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, OpenAccountResult, BankAccount]):
    @override
    def _aggregate_id(self, request: OpenAccountCommand) -> str:
        return request.account_id

    @override
    def _is_creation_command(self, request: OpenAccountCommand) -> bool:
        return True

    @override
    async def _execute(self, request: OpenAccountCommand, aggregate: BankAccount) -> None:
        aggregate.open(request.account_id, request.owner)

    @override
    def _to_response(self, aggregate: BankAccount) -> OpenAccountResult:
        return OpenAccountResult(account_id=aggregate.account_id)


@dataclass(frozen=True, kw_only=True)
class DepositResult(Response):
    balance: int


@dataclass(frozen=True, kw_only=True)
class DepositCommand(Request[DepositResult]):
    account_id: str
    amount: int


class DepositHandler(EventSourcedCommandHandler[DepositCommand, DepositResult, BankAccount]):
    @override
    def _aggregate_id(self, request: DepositCommand) -> str:
        return request.account_id

    @override
    async def _execute(self, request: DepositCommand, aggregate: BankAccount) -> None:
        aggregate.deposit(request.account_id, request.amount)

    @override
    def _to_response(self, aggregate: BankAccount) -> DepositResult:
        return DepositResult(balance=aggregate.balance)

Override _is_creation_command() to return True for commands that create new aggregates. For all other commands, the handler loads the aggregate from the store.

EventSourcedVoidCommandHandler is available for commands that don't return a response.

Module Wiring

Register aggregates, event types, and command handlers with the module system:

from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule

from app.commands import (
    DepositCommand,
    DepositHandler,
    OpenAccountCommand,
    OpenAccountHandler,
)
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
from app.repository import BankAccountRepository


@module(
    extensions=[
        EventSourcingExtension().bind_aggregate(
            repository=BankAccountRepository,
            event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
        ),
        MediatorExtension()
        .bind_request(OpenAccountCommand, OpenAccountHandler)
        .bind_request(DepositCommand, DepositHandler),
    ],
)
class BankModule:
    pass


@module(
    imports=[
        BankModule,
        EventSourcingModule.register(EventSourcingConfig()),
        MediatorModule.register(),
    ],
)
class AppModule:
    pass

Run

Wire everything together and send commands through the mediator:

import asyncio

from waku import WakuFactory
from waku.cqrs import IMediator

from app.commands import DepositCommand, OpenAccountCommand
from app.modules import AppModule


async def main() -> None:
    app = WakuFactory(AppModule).create()

    async with app, app.container() as container:
        mediator = await container.get(IMediator)

        await mediator.send(OpenAccountCommand(account_id='acc-1', owner='dex'))
        result = await mediator.send(DepositCommand(account_id='acc-1', amount=500))
        print(f'Balance: {result.balance}')


if __name__ == '__main__':
    asyncio.run(main())

Tip

The default EventSourcingConfig() uses an in-memory event store — perfect for prototyping. See Event Store for PostgreSQL setup.

Functional Deciders

The decider pattern separates state, decisions, and evolution into pure functions. State is immutable — each event produces a new state value.

Defining State

1
2
3
4
5
6
7
from dataclasses import dataclass


@dataclass(frozen=True)
class BankAccountState:
    owner: str = ''
    balance: int = 0

Defining the Decider

A decider implements three methods from the IDecider protocol:

  • initial_state() — returns the starting state
  • decide(command, state) — validates and returns new events
  • evolve(state, event) — applies an event to produce new state
from __future__ import annotations

from dataclasses import dataclass, replace
from typing import TYPE_CHECKING

from waku.eventsourcing import IDecider

from app.events import AccountOpened, MoneyDeposited
from app.state import BankAccountState

if TYPE_CHECKING:
    from waku.cqrs import INotification


@dataclass(frozen=True, kw_only=True)
class OpenAccount:
    account_id: str
    owner: str


@dataclass(frozen=True, kw_only=True)
class DepositMoney:
    account_id: str
    amount: int


BankCommand = OpenAccount | DepositMoney
BankEvent = AccountOpened | MoneyDeposited


class BankAccountDecider(IDecider[BankAccountState, BankCommand, BankEvent]):
    def initial_state(self) -> BankAccountState:
        return BankAccountState()

    def decide(self, command: BankCommand, state: BankAccountState) -> list[BankEvent]:
        match command:
            case OpenAccount(account_id=aid, owner=owner):
                return [AccountOpened(account_id=aid, owner=owner)]
            case DepositMoney(account_id=aid, amount=amount):
                if amount <= 0:
                    msg = 'Deposit amount must be positive'
                    raise ValueError(msg)
                return [MoneyDeposited(account_id=aid, amount=amount)]

    def evolve(self, state: BankAccountState, event: INotification) -> BankAccountState:
        match event:
            case AccountOpened(owner=owner):
                return replace(state, owner=owner)
            case MoneyDeposited(amount=amount):
                return replace(state, balance=state.balance + amount)
        return state

Repository

1
2
3
4
5
6
7
8
9
from waku.cqrs import INotification
from waku.eventsourcing import DeciderRepository

from app.decider import BankCommand
from app.state import BankAccountState


class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, INotification]):
    pass

DeciderRepository requires three type parameters: [State, Command, Event]. Like OOP repositories, aggregate_name is auto-resolved from the state type parameter (e.g., BankAccountState"BankAccount"). You can override it with an explicit class variable.

Command Handler

DeciderCommandHandler adds a _to_command() step that converts the CQRS request into a domain command:

from dataclasses import dataclass

from typing_extensions import override

from waku.cqrs import INotification, Request, Response
from waku.eventsourcing import DeciderCommandHandler

from app.decider import BankCommand, OpenAccount
from app.state import BankAccountState


@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
    owner: str


@dataclass(frozen=True, kw_only=True)
class OpenAccountRequest(Request[OpenAccountResult]):
    account_id: str
    owner: str


class OpenAccountDeciderHandler(
    DeciderCommandHandler[
        OpenAccountRequest,
        OpenAccountResult,
        BankAccountState,
        BankCommand,
        INotification,
    ],
):
    @override
    def _aggregate_id(self, request: OpenAccountRequest) -> str:
        return request.account_id

    @override
    def _to_command(self, request: OpenAccountRequest) -> BankCommand:
        return OpenAccount(account_id=request.account_id, owner=request.owner)

    @override
    def _is_creation_command(self, request: OpenAccountRequest) -> bool:
        return True

    @override
    def _to_response(self, state: BankAccountState, version: int) -> OpenAccountResult:
        return OpenAccountResult(owner=state.owner)

DeciderVoidCommandHandler is available for commands that don't return a response.

Module Wiring

Use bind_decider() instead of bind_aggregate():

from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule

from app.decider import BankAccountDecider
from app.events import AccountOpened, MoneyDeposited
from app.handler import OpenAccountDeciderHandler, OpenAccountRequest
from app.repository import BankAccountDeciderRepository


@module(
    extensions=[
        EventSourcingExtension().bind_decider(
            repository=BankAccountDeciderRepository,
            decider=BankAccountDecider,
            event_types=[AccountOpened, MoneyDeposited],
        ),
        MediatorExtension().bind_request(OpenAccountRequest, OpenAccountDeciderHandler),
    ],
)
class BankDeciderModule:
    pass


@module(
    imports=[
        BankDeciderModule,
        EventSourcingModule.register(EventSourcingConfig()),
        MediatorModule.register(),
    ],
)
class AppModule:
    pass

Idempotency

Command handlers support idempotent event appends through the _idempotency_key() hook. Override it to extract a deduplication token from the incoming request:

class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, OpenAccountResult, BankAccount]):
    def _idempotency_key(self, request: OpenAccountCommand) -> str | None:
        return request.idempotency_key  # (1)

    # ... other methods ...
  1. Return None (the default) to skip deduplication and use random UUIDs.
class OpenAccountDeciderHandler(
    DeciderCommandHandler[OpenAccountRequest, OpenAccountResult, BankAccountState, BankCommand, INotification],
):
    def _idempotency_key(self, request: OpenAccountRequest) -> str | None:
        return request.idempotency_key

    # ... other methods ...

When an idempotency_key is provided, the repository generates per-event keys in the format {idempotency_key}:0, {idempotency_key}:1, etc. Retrying the same command with the same key is safe — the event store returns the existing stream version without duplicating events.

See Event Store — Idempotency for deduplication semantics and error handling.

Stream Length Guard

Repositories can enforce a maximum stream length to prevent unbounded event replay. Set the max_stream_length class variable on your repository:

class BankAccountRepository(EventSourcedRepository[BankAccount]):
    max_stream_length = 500
class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, INotification]):
    max_stream_length = 500

When a stream exceeds the configured limit, load() raises StreamTooLargeError with a message guiding you to configure snapshots.

Tip

The default is None (no limit). Use this as a safety valve for aggregates that might accumulate many events — it catches unbounded growth before it impacts performance.

Note

Snapshot-aware repositories (SnapshotEventSourcedRepository, SnapshotDeciderRepository) inherit the guard but only apply it during full replay. When a valid snapshot exists, the repository replays only the events after the snapshot, which naturally stays within bounds.

Concurrency Control

Both repository types use ExpectedVersion for optimistic concurrency:

Variant Behavior
NoStream() Stream must not exist (creation)
Exact(version=N) Stream version must match exactly
StreamExists() Stream must exist (any version)
AnyVersion() No version check

The repositories handle this automatically — NoStream for new aggregates, Exact for existing ones. A ConcurrencyConflictError is raised on mismatch.

Choosing an Approach

OOP Aggregate Functional Decider
State Mutable object Immutable value
Testing Assert aggregate properties Given/When/Then DSL
Complexity Simpler for basic CRUD Better for complex decision logic
Snapshots SnapshotEventSourcedRepository SnapshotDeciderRepository

Tip

Start with OOP aggregates for straightforward domains. Move to deciders when you need easily testable business rules or immutable state guarantees.

Aggregate Naming

Both repository types auto-resolve aggregate_name from their type parameters. This name determines the event stream prefix (e.g., BankAccount-acc-1).

Resolution rules

Pattern Source Example
OOP Aggregate class name, as-is EventSourcedRepository[BankAccount]"BankAccount"
Decider State class name, State suffix stripped DeciderRepository[BankAccountState, ...]"BankAccount"

For deciders, the canonical naming convention is {AggregateName}State (e.g., CounterState, BankAccountState). The State suffix is automatically removed to derive the stream prefix. If the state class has no State suffix, the full name is used as-is.

Explicit override

Set aggregate_name as a class variable to override auto-resolution:

class LegacyAccountRepo(EventSourcedRepository[BankAccount]):
    aggregate_name = 'Account'

Uniqueness

aggregate_name must be unique across all repositories in the application. Duplicate names are detected at startup and raise DuplicateAggregateNameError. Two repositories with the same aggregate_name would write to the same event streams, causing data corruption.

Warning

The stream ID format uses a hyphen separator (BankAccount-acc-1), so aggregate_name must not contain hyphens. This is validated at StreamId construction time.

Further reading

  • Event Sourcing — overview, architecture, and installation
  • Event Store — in-memory and PostgreSQL event persistence
  • Snapshots — optimize loading for long-lived aggregates
  • Testing — Given/When/Then DSL and OOP aggregate testing
  • Schema Evolution — upcasting and event type registries