Aggregates
waku supports two approaches to modeling event-sourced aggregates: OOP aggregates (mutable, class-based)
and functional deciders (immutable, function-based).
Domain Events
Both approaches share the same event definitions — frozen dataclasses implementing INotification:
| from dataclasses import dataclass
from waku.cqrs import INotification
@dataclass(frozen=True, kw_only=True)
class AccountOpened(INotification):
account_id: str
owner: str
@dataclass(frozen=True, kw_only=True)
class MoneyDeposited(INotification):
account_id: str
amount: int
@dataclass(frozen=True, kw_only=True)
class MoneyWithdrawn(INotification):
account_id: str
amount: int
|
OOP Aggregates
The classic approach — extend EventSourcedAggregate, raise events through command methods,
and apply them to mutate internal state. This walkthrough builds a complete bank account
example from aggregate to running application.
Defining the Aggregate
| from typing_extensions import override
from waku.cqrs import INotification
from waku.eventsourcing import EventSourcedAggregate
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
class BankAccount(EventSourcedAggregate):
def __init__(self) -> None:
super().__init__()
self.account_id: str = ''
self.owner: str = ''
self.balance: int = 0
def open(self, account_id: str, owner: str) -> None:
self._raise_event(AccountOpened(account_id=account_id, owner=owner))
def deposit(self, account_id: str, amount: int) -> None:
if amount <= 0:
msg = 'Deposit amount must be positive'
raise ValueError(msg)
self._raise_event(MoneyDeposited(account_id=account_id, amount=amount))
def withdraw(self, account_id: str, amount: int) -> None:
if amount > self.balance:
msg = f'Insufficient funds: balance={self.balance}, requested={amount}'
raise ValueError(msg)
self._raise_event(MoneyWithdrawn(account_id=account_id, amount=amount))
@override
def _apply(self, event: INotification) -> None:
match event:
case AccountOpened(account_id=account_id, owner=owner):
self.account_id = account_id
self.owner = owner
case MoneyDeposited(amount=amount):
self.balance += amount
case MoneyWithdrawn(amount=amount):
self.balance -= amount
|
Key points:
_raise_event() first applies the event (state mutation), then queues it for persistence
_apply() must handle every event type the aggregate can produce
- Use
match statements for clean event routing
Repository
Subclass EventSourcedRepository with the aggregate type parameter:
| from waku.eventsourcing import EventSourcedRepository
from app.aggregate import BankAccount
class BankAccountRepository(EventSourcedRepository[BankAccount]):
pass
|
The repository derives aggregate_name from the type parameter by default. This name
determines the event stream prefix (e.g., BankAccount-acc-1). You can override it
by setting aggregate_name explicitly as a class variable.
Command Handler
EventSourcedCommandHandler coordinates loading, executing, saving, and publishing:
| from dataclasses import dataclass
from typing_extensions import override
from waku.cqrs import Request, Response
from waku.eventsourcing import EventSourcedCommandHandler
from app.aggregate import BankAccount
@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
account_id: str
@dataclass(frozen=True, kw_only=True)
class OpenAccountCommand(Request[OpenAccountResult]):
account_id: str
owner: str
class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, OpenAccountResult, BankAccount]):
@override
def _aggregate_id(self, request: OpenAccountCommand) -> str:
return request.account_id
@override
def _is_creation_command(self, request: OpenAccountCommand) -> bool:
return True
@override
async def _execute(self, request: OpenAccountCommand, aggregate: BankAccount) -> None:
aggregate.open(request.account_id, request.owner)
@override
def _to_response(self, aggregate: BankAccount) -> OpenAccountResult:
return OpenAccountResult(account_id=aggregate.account_id)
@dataclass(frozen=True, kw_only=True)
class DepositResult(Response):
balance: int
@dataclass(frozen=True, kw_only=True)
class DepositCommand(Request[DepositResult]):
account_id: str
amount: int
class DepositHandler(EventSourcedCommandHandler[DepositCommand, DepositResult, BankAccount]):
@override
def _aggregate_id(self, request: DepositCommand) -> str:
return request.account_id
@override
async def _execute(self, request: DepositCommand, aggregate: BankAccount) -> None:
aggregate.deposit(request.account_id, request.amount)
@override
def _to_response(self, aggregate: BankAccount) -> DepositResult:
return DepositResult(balance=aggregate.balance)
|
Override _is_creation_command() to return True for commands that create new aggregates.
For all other commands, the handler loads the aggregate from the store.
EventSourcedVoidCommandHandler is available for commands that don't return a response.
Module Wiring
Register aggregates, event types, and command handlers with the module system:
| from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from app.commands import (
DepositCommand,
DepositHandler,
OpenAccountCommand,
OpenAccountHandler,
)
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
from app.repository import BankAccountRepository
@module(
extensions=[
EventSourcingExtension().bind_aggregate(
repository=BankAccountRepository,
event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
),
MediatorExtension()
.bind_request(OpenAccountCommand, OpenAccountHandler)
.bind_request(DepositCommand, DepositHandler),
],
)
class BankModule:
pass
@module(
imports=[
BankModule,
EventSourcingModule.register(EventSourcingConfig()),
MediatorModule.register(),
],
)
class AppModule:
pass
|
Run
Wire everything together and send commands through the mediator:
| import asyncio
from waku import WakuFactory
from waku.cqrs import IMediator
from app.commands import DepositCommand, OpenAccountCommand
from app.modules import AppModule
async def main() -> None:
app = WakuFactory(AppModule).create()
async with app, app.container() as container:
mediator = await container.get(IMediator)
await mediator.send(OpenAccountCommand(account_id='acc-1', owner='dex'))
result = await mediator.send(DepositCommand(account_id='acc-1', amount=500))
print(f'Balance: {result.balance}')
if __name__ == '__main__':
asyncio.run(main())
|
Tip
The default EventSourcingConfig() uses an in-memory event store — perfect for
prototyping. See Event Store for PostgreSQL setup.
Functional Deciders
The decider pattern separates state, decisions, and evolution into pure functions.
State is immutable — each event produces a new state value.
Defining State
| from dataclasses import dataclass
@dataclass(frozen=True)
class BankAccountState:
owner: str = ''
balance: int = 0
|
Defining the Decider
A decider implements three methods from the IDecider protocol:
initial_state() — returns the starting state
decide(command, state) — validates and returns new events
evolve(state, event) — applies an event to produce new state
| from __future__ import annotations
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING
from waku.eventsourcing import IDecider
from app.events import AccountOpened, MoneyDeposited
from app.state import BankAccountState
if TYPE_CHECKING:
from waku.cqrs import INotification
@dataclass(frozen=True, kw_only=True)
class OpenAccount:
account_id: str
owner: str
@dataclass(frozen=True, kw_only=True)
class DepositMoney:
account_id: str
amount: int
BankCommand = OpenAccount | DepositMoney
BankEvent = AccountOpened | MoneyDeposited
class BankAccountDecider(IDecider[BankAccountState, BankCommand, BankEvent]):
def initial_state(self) -> BankAccountState:
return BankAccountState()
def decide(self, command: BankCommand, state: BankAccountState) -> list[BankEvent]:
match command:
case OpenAccount(account_id=aid, owner=owner):
return [AccountOpened(account_id=aid, owner=owner)]
case DepositMoney(account_id=aid, amount=amount):
if amount <= 0:
msg = 'Deposit amount must be positive'
raise ValueError(msg)
return [MoneyDeposited(account_id=aid, amount=amount)]
def evolve(self, state: BankAccountState, event: INotification) -> BankAccountState:
match event:
case AccountOpened(owner=owner):
return replace(state, owner=owner)
case MoneyDeposited(amount=amount):
return replace(state, balance=state.balance + amount)
return state
|
Repository
| from waku.cqrs import INotification
from waku.eventsourcing import DeciderRepository
from app.decider import BankCommand
from app.state import BankAccountState
class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, INotification]):
pass
|
DeciderRepository requires three type parameters: [State, Command, Event].
Like OOP repositories, aggregate_name is auto-resolved from the state type parameter
(e.g., BankAccountState → "BankAccount"). You can override it with an explicit class variable.
Command Handler
DeciderCommandHandler adds a _to_command() step that converts the CQRS request into a domain command:
| from dataclasses import dataclass
from typing_extensions import override
from waku.cqrs import INotification, Request, Response
from waku.eventsourcing import DeciderCommandHandler
from app.decider import BankCommand, OpenAccount
from app.state import BankAccountState
@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
owner: str
@dataclass(frozen=True, kw_only=True)
class OpenAccountRequest(Request[OpenAccountResult]):
account_id: str
owner: str
class OpenAccountDeciderHandler(
DeciderCommandHandler[
OpenAccountRequest,
OpenAccountResult,
BankAccountState,
BankCommand,
INotification,
],
):
@override
def _aggregate_id(self, request: OpenAccountRequest) -> str:
return request.account_id
@override
def _to_command(self, request: OpenAccountRequest) -> BankCommand:
return OpenAccount(account_id=request.account_id, owner=request.owner)
@override
def _is_creation_command(self, request: OpenAccountRequest) -> bool:
return True
@override
def _to_response(self, state: BankAccountState, version: int) -> OpenAccountResult:
return OpenAccountResult(owner=state.owner)
|
DeciderVoidCommandHandler is available for commands that don't return a response.
Module Wiring
Use bind_decider() instead of bind_aggregate():
| from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from app.decider import BankAccountDecider
from app.events import AccountOpened, MoneyDeposited
from app.handler import OpenAccountDeciderHandler, OpenAccountRequest
from app.repository import BankAccountDeciderRepository
@module(
extensions=[
EventSourcingExtension().bind_decider(
repository=BankAccountDeciderRepository,
decider=BankAccountDecider,
event_types=[AccountOpened, MoneyDeposited],
),
MediatorExtension().bind_request(OpenAccountRequest, OpenAccountDeciderHandler),
],
)
class BankDeciderModule:
pass
@module(
imports=[
BankDeciderModule,
EventSourcingModule.register(EventSourcingConfig()),
MediatorModule.register(),
],
)
class AppModule:
pass
|
Idempotency
Command handlers support idempotent event appends through the _idempotency_key() hook.
Override it to extract a deduplication token from the incoming request:
When an idempotency_key is provided, the repository generates per-event keys in the format
{idempotency_key}:0, {idempotency_key}:1, etc. Retrying the same command with the same key
is safe — the event store returns the existing stream version without duplicating events.
See Event Store — Idempotency for deduplication semantics and error handling.
Stream Length Guard
Repositories can enforce a maximum stream length to prevent unbounded event replay. Set the
max_stream_length class variable on your repository:
When a stream exceeds the configured limit, load() raises StreamTooLargeError with a message
guiding you to configure snapshots.
Tip
The default is None (no limit). Use this as a safety valve for aggregates that
might accumulate many events — it catches unbounded growth before it impacts performance.
Note
Snapshot-aware repositories (SnapshotEventSourcedRepository, SnapshotDeciderRepository)
inherit the guard but only apply it during full replay. When a valid snapshot exists, the
repository replays only the events after the snapshot, which naturally stays within bounds.
Concurrency Control
Both repository types use ExpectedVersion for optimistic concurrency:
| Variant |
Behavior |
NoStream() |
Stream must not exist (creation) |
Exact(version=N) |
Stream version must match exactly |
StreamExists() |
Stream must exist (any version) |
AnyVersion() |
No version check |
The repositories handle this automatically — NoStream for new aggregates,
Exact for existing ones. A ConcurrencyConflictError is raised on mismatch.
Choosing an Approach
|
OOP Aggregate |
Functional Decider |
| State |
Mutable object |
Immutable value |
| Testing |
Assert aggregate properties |
Given/When/Then DSL |
| Complexity |
Simpler for basic CRUD |
Better for complex decision logic |
| Snapshots |
SnapshotEventSourcedRepository |
SnapshotDeciderRepository |
Tip
Start with OOP aggregates for straightforward domains. Move to deciders when you need
easily testable business rules or immutable state guarantees.
Aggregate Naming
Both repository types auto-resolve aggregate_name from their type parameters. This name
determines the event stream prefix (e.g., BankAccount-acc-1).
Resolution rules
| Pattern |
Source |
Example |
| OOP |
Aggregate class name, as-is |
EventSourcedRepository[BankAccount] → "BankAccount" |
| Decider |
State class name, State suffix stripped |
DeciderRepository[BankAccountState, ...] → "BankAccount" |
For deciders, the canonical naming convention is {AggregateName}State (e.g., CounterState,
BankAccountState). The State suffix is automatically removed to derive the stream prefix.
If the state class has no State suffix, the full name is used as-is.
Explicit override
Set aggregate_name as a class variable to override auto-resolution:
class LegacyAccountRepo(EventSourcedRepository[BankAccount]):
aggregate_name = 'Account'
Uniqueness
aggregate_name must be unique across all repositories in the application.
Duplicate names are detected at startup and raise DuplicateAggregateNameError.
Two repositories with the same aggregate_name would write to the same event streams,
causing data corruption.
Warning
The stream ID format uses a hyphen separator (BankAccount-acc-1), so aggregate_name
must not contain hyphens. This is validated at StreamId construction time.
Further reading