Skip to content

Schema Evolution

As your domain evolves, event schemas change. waku provides an event type registry for serialization and an upcasting chain for migrating old events to current schemas.

Event Serialization

JsonEventSerializer serializes and deserializes events using the adaptix Retort. It requires an EventTypeRegistry to map type names back to Python classes during deserialization.

Configure it through EventSourcingConfig:

from waku.eventsourcing import EventSourcingConfig
from waku.eventsourcing.serialization import JsonEventSerializer

config = EventSourcingConfig(event_serializer=JsonEventSerializer)

Tip

Serialization is only needed for persistent stores (e.g., PostgreSQL with SQLAlchemy). The in-memory store keeps Python objects directly, so no serializer is required.

Custom serializers

IEventSerializer Interface

IEventSerializer defines two methods:

class IEventSerializer(abc.ABC):
    def serialize(self, event: INotification, /) -> dict[str, Any]: ...
    def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification: ...
Method Parameters Returns Description
serialize event: INotification dict[str, Any] Convert a domain event to a JSON-compatible dictionary
deserialize data: dict[str, Any], event_type: str INotification Reconstruct a domain event from stored data and its registered type name

The event_type parameter in deserialize is the string name from the Event Type Registry.

Example: Unix timestamp serializer

default_retort is the pre-configured adaptix Retort used by JsonEventSerializer. It handles StreamId on top of the built-in support for datetime, UUID, Decimal, Enum, and many other types. Use .extend(recipe=[...]) to override specific type handling — here, serializing datetime as Unix timestamps for cross-language interop:

from datetime import UTC, datetime
from typing import Any, cast

from adaptix import dumper, loader
from typing_extensions import override

from waku.cqrs.contracts.notification import INotification
from waku.eventsourcing.serialization import EventTypeRegistry, IEventSerializer, default_retort


class UnixTimestampEventSerializer(IEventSerializer):
    def __init__(self, registry: EventTypeRegistry) -> None:
        self._registry = registry
        self._retort = default_retort.extend(
            recipe=[
                loader(datetime, lambda v: datetime.fromtimestamp(v, tz=UTC)),
                dumper(datetime, lambda v: int(v.timestamp())),
            ],
        )

    @override
    def serialize(self, event: INotification, /) -> dict[str, Any]:
        return cast('dict[str, Any]', self._retort.dump(event, type(event)))

    @override
    def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification:
        cls = self._registry.resolve(event_type)
        return self._retort.load(data, cls)

Register it the same way:

config = EventSourcingConfig(event_serializer=UnixTimestampEventSerializer)

Dishka injects the EventTypeRegistry dependency automatically.

See the adaptix Retort configuration guide for the full list of built-in recipes and customization options.

Event Type Registry

EventTypeRegistry maintains a bidirectional mapping between event classes and string names, and tracks schema versions. The registry is built automatically from the event_types passed to bind_aggregate() or bind_decider() — you do not create it manually.

Simple usage — pass event classes directly:

es_ext = EventSourcingExtension()
es_ext.bind_aggregate(
    repository=AccountRepository,
    event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
)

Each class is registered under its __name__ at version 1.

Advanced usage — wrap classes in EventType for custom names, versions, aliases, and upcasters:

es_ext.bind_aggregate(
    repository=AccountRepository,
    event_types=[
        EventType(AccountOpened, name='AccountOpened', version=3, upcasters=[...]),
        MoneyDeposited,
    ],
)

EventType

EventType controls how an event class is registered and how old versions are migrated.

Field Type Default Description
event_type type[INotification] (required, positional) The Python event class
name str | None None (uses class name) Custom serialization name
aliases Sequence[str] () Alternative names accepted during deserialization
version int 1 Current schema version
upcasters Sequence[IEventUpcaster] () Upcasters for migrating old versions

Type Aliases

When you rename an event class, old events stored under the previous name still need to deserialize. Use aliases to register alternative names that map to the current class:

EventType(
    AccountOpened,
    name='AccountOpened',
    aliases=['AccountCreated'],  # old events stored as "AccountCreated" still resolve
)

New events are always written under the primary name. Aliases are read-only — they only affect deserialization lookup.

Upcasting

Upcasters transform old event data (raw dict payloads) to match the current schema before deserialization into the Python class. Each upcaster declares a from_version indicating which version it upgrades.

When reading an event stored at version N, the UpcasterChain applies every upcaster whose from_version >= N in order, producing data compatible with the current version.

Built-in Helpers

Helper Signature Description
rename_field rename_field(from_version, old=, new=) Rename a field
add_field add_field(from_version, field=, default=) Add a field with a default value
remove_field remove_field(from_version, field=) Remove a field
noop noop(from_version) No-op placeholder for version bumps without data changes
upcast upcast(from_version, fn) Custom function (dict) -> dict

All helpers return an IEventUpcaster instance and are imported from waku.eventsourcing.

Upcasting Pipeline

graph LR
    S[(Stored event<br/>v1)] -->|read| UC[UpcasterChain]
    UC --> U1["rename_field<br/>(from_version=1)"]
    U1 -->|v1 → v2| U2["add_field<br/>(from_version=2)"]
    U2 -->|v2 → v3| D[Deserialize into<br/>current class]

Evolution Example

Consider an AccountOpened event that has gone through three versions:

  1. v1 — had an owner field
  2. v2 — renamed owner to owner_name
  3. v3 — added a currency field
from dataclasses import dataclass

from waku.cqrs import INotification
from waku.eventsourcing import EventType, add_field, rename_field


@dataclass(frozen=True, kw_only=True)
class AccountOpened(INotification):
    account_id: str
    owner_name: str
    currency: str


account_opened_type = EventType(
    AccountOpened,
    name='AccountOpened',
    version=3,
    upcasters=[
        rename_field(from_version=1, old='owner', new='owner_name'),
        add_field(from_version=2, field='currency', default='USD'),
    ],
)

When the store reads a v1 event, the upcaster chain applies two transformations:

  1. rename_field(from_version=1) — renames owner to owner_name (v1 -> v2)
  2. add_field(from_version=2) — adds currency with default 'USD' (v2 -> v3)

The resulting dict matches the current AccountOpened schema and deserializes cleanly.

Warning

Every upcaster's from_version must be less than the event's current version. waku validates this at startup and raises UpcasterChainError if the constraint is violated.

Further reading

  • Event Store — where upcasting happens during deserialization
  • Aggregates — aggregate patterns that produce versioned events
  • Projections — read models that consume upcasted events
  • Testing — testing upcasters and event evolution