Skip to content

waku

WakuApplication

WakuApplication(
    *, container, registry, lifespan, extension_registry
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container

registry property

registry

initialize async

initialize()
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close()
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

WakuFactory

WakuFactory(
    root_module_type,
    /,
    context=None,
    lifespan=(),
    extensions=DEFAULT_EXTENSIONS,
    container_config=None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()

create

create()
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(
        self._root_module_type,
        context=self._context,
        app_extensions=self._extensions,
    ).build()

    container = self._build_container(registry.providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

DynamicModule dataclass

DynamicModule(
    *,
    providers=list(),
    imports=list(),
    exports=list(),
    extensions=list(),
    is_global=False,
    id=uuid4(),
    parent_module,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports = field(default_factory=list)

List of modules imported by this module.

exports class-attribute instance-attribute

exports = field(default_factory=list)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions = field(default_factory=list)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global = False

Whether this module is global or not.

id class-attribute instance-attribute

id = field(default_factory=uuid4)

parent_module instance-attribute

parent_module

Module

Module(module_type, metadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[Provider]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id = id

target instance-attribute

target = module_type

providers instance-attribute

providers = providers

imports instance-attribute

imports = imports

exports instance-attribute

exports = exports

extensions instance-attribute

extensions = extensions

is_global instance-attribute

is_global = is_global

name property

name

provider property

provider

create_provider

create_provider()
Source code in src/waku/modules/_module.py
def create_provider(self) -> BaseProvider:
    cls = cast('type[_ModuleProvider]', type(f'{self.name}Provider', (_ModuleProvider,), {}))
    self._provider = cls(self.providers)
    return self._provider

module

module(
    *,
    providers=(),
    imports=(),
    exports=(),
    extensions=(),
    is_global=False,
)

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[Provider] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[Provider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

application

WakuApplication

WakuApplication(
    *, container, registry, lifespan, extension_registry
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container

registry property

registry

initialize async

initialize()
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close()
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

cqrs

NextHandlerType module-attribute

NextHandlerType = Callable[[RequestT], Awaitable[ResponseT]]

Event dataclass

Event(*, event_id=uuid4())

Bases: INotification

Convenience base class for events with auto-generated ID.

Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.

Example::

@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
    user_id: str
    email: str

event_id class-attribute instance-attribute

event_id = field(default_factory=uuid4)

INotification

Bases: Protocol

Marker interface for notification-type objects (events).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.

MediatR equivalent: INotification

Example::

@dataclass(frozen=True)
class OrderPlaced(INotification):
    order_id: str
    customer_id: str


# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
    aggregate_id: str
    occurred_at: datetime


@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str

IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async

handle(request, /, next_handler)

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

IRequest

Bases: Protocol[ResponseT]

Marker interface for request-type objects (commands/queries).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.

MediatR equivalent: IRequest

Example::

@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
    user_id: str


@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
    customer_id: str
    items: list[OrderItem]

Request dataclass

Request(*, request_id=uuid4())

Bases: IRequest[ResponseT], Generic[ResponseT]

Convenience base class for requests with auto-generated ID.

Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.

Example::

@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
    user_id: str

request_id class-attribute instance-attribute

request_id = field(default_factory=uuid4)

Response dataclass

Response()

Base class for response type objects.

EventHandler

Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]

Abstract base class for event handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.

Example::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, event: UserJoinedEvent, /) -> None:
        await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')

handle abstractmethod async

handle(event)
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: NotificationT, /) -> None:
    raise NotImplementedError

INotificationHandler

Bases: Protocol[NotificationT]

Protocol for notification/event handlers.

MediatR equivalent: INotificationHandler

This protocol allows structural subtyping - any class with a matching handle method signature is compatible.

Example::

class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
    async def handle(self, event: OrderPlaced, /) -> None:
        await self._send_confirmation_email(event.order_id)

handle async

handle(event)

Handle the notification/event.

Source code in src/waku/cqrs/events/handler.py
async def handle(self, event: NotificationT, /) -> None:
    """Handle the notification/event."""
    ...

Mediator

Mediator(container, event_publisher, registry)

Bases: IMediator

Default CQRS implementation.

Source code in src/waku/cqrs/impl.py
def __init__(
    self,
    container: AsyncContainer,
    event_publisher: EventPublisher,
    registry: MediatorRegistry,
) -> None:
    self._container = container
    self._event_publisher = event_publisher
    self._registry = registry

send async

send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)
Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: IRequest[Any], /) -> Any:
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)  # pyrefly: ignore[bad-argument-type]
    return await self._handle_request(handler, request)

publish async

publish(notification)
Source code in src/waku/cqrs/impl.py
@override
async def publish(self, notification: INotification, /) -> None:
    event_type = type(notification)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, notification)

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async

publish(notification)

Asynchronously send notification to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, notification: INotification, /) -> None:
    """Asynchronously send notification to multiple handlers."""

send abstractmethod async

send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: IRequest[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish notification through the cqrs to be handled by multiple handlers.

publish abstractmethod async

publish(notification)

Asynchronously send notification to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, notification: INotification, /) -> None:
    """Asynchronously send notification to multiple handlers."""

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async

send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: IRequest[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type=Mediator,
    event_publisher=SequentialEventPublisher,
    pipeline_behaviors=(),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)

mediator_implementation_type class-attribute instance-attribute

mediator_implementation_type = Mediator

event_publisher class-attribute instance-attribute

event_publisher = SequentialEventPublisher

pipeline_behaviors class-attribute instance-attribute

pipeline_behaviors = ()

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._registry = MediatorRegistry()

registry property

registry

on_module_configure

on_module_configure(metadata)
Source code in src/waku/cqrs/modules.py
@override
def on_module_configure(self, metadata: 'ModuleMetadata') -> None:
    pass

bind_request

bind_request(request_type, handler_type, *, behaviors=None)
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]] | None = None,
) -> Self:
    self._registry.request_map.bind(request_type, handler_type)
    if behaviors:
        self._registry.behavior_map.bind(request_type, behaviors)
    return self

bind_event

bind_event(event_type, handler_types)
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[NotificationT],
    handler_types: list[type[EventHandler[NotificationT]]],
) -> Self:
    self._registry.event_map.bind(event_type, handler_types)
    return self

MediatorModule

register classmethod

register(config=None)

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        extensions=[MediatorRegistryAggregator()],
        is_global=True,
    )

IRequestHandler

Bases: Protocol[RequestT, ResponseT]

Protocol for request handlers (commands/queries).

MediatR equivalent: IRequestHandler

This protocol allows structural subtyping - any class with a matching handle method signature is compatible.

Example::

class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
    async def handle(self, request: GetUserQuery, /) -> UserDTO:
        return await self._repository.get(request.user_id)

handle async

handle(request)

Handle the request and return a response.

Source code in src/waku/cqrs/requests/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    """Handle the request and return a response."""
    ...

RequestHandler

Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]

Abstract base class for request handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: JoinMeetingCommand, /) -> None:
        await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
        link = await self._meetings_api.get_link(request.meeting_id)
        return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)

handle abstractmethod async

handle(request)
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

contracts

NotificationT module-attribute

NotificationT = TypeVar(
    'NotificationT', bound=INotification, contravariant=True
)

NextHandlerType module-attribute

NextHandlerType = Callable[[RequestT], Awaitable[ResponseT]]

RequestT module-attribute

RequestT = TypeVar(
    'RequestT', bound=IRequest[Any], contravariant=True
)

ResponseT module-attribute

ResponseT = TypeVar(
    'ResponseT',
    bound='Response | None',
    default=None,
    covariant=True,
)

Event dataclass

Event(*, event_id=uuid4())

Bases: INotification

Convenience base class for events with auto-generated ID.

Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.

Example::

@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
    user_id: str
    email: str
event_id class-attribute instance-attribute
event_id = field(default_factory=uuid4)

INotification

Bases: Protocol

Marker interface for notification-type objects (events).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.

MediatR equivalent: INotification

Example::

@dataclass(frozen=True)
class OrderPlaced(INotification):
    order_id: str
    customer_id: str


# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
    aggregate_id: str
    occurred_at: datetime


@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str

IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async
handle(request, /, next_handler)

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

IRequest

Bases: Protocol[ResponseT]

Marker interface for request-type objects (commands/queries).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.

MediatR equivalent: IRequest

Example::

@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
    user_id: str


@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
    customer_id: str
    items: list[OrderItem]

Request dataclass

Request(*, request_id=uuid4())

Bases: IRequest[ResponseT], Generic[ResponseT]

Convenience base class for requests with auto-generated ID.

Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.

Example::

@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
    user_id: str
request_id class-attribute instance-attribute
request_id = field(default_factory=uuid4)

Response dataclass

Response()

Base class for response type objects.

event

NotificationT module-attribute
NotificationT = TypeVar(
    'NotificationT', bound=INotification, contravariant=True
)
INotification

Bases: Protocol

Marker interface for notification-type objects (events).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.

MediatR equivalent: INotification

Example::

@dataclass(frozen=True)
class OrderPlaced(INotification):
    order_id: str
    customer_id: str


# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
    aggregate_id: str
    occurred_at: datetime


@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str
Event dataclass
Event(*, event_id=uuid4())

Bases: INotification

Convenience base class for events with auto-generated ID.

Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.

Example::

@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
    user_id: str
    email: str
event_id class-attribute instance-attribute
event_id = field(default_factory=uuid4)

notification

NotificationT module-attribute
NotificationT = TypeVar(
    'NotificationT', bound=INotification, contravariant=True
)
INotification

Bases: Protocol

Marker interface for notification-type objects (events).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.

MediatR equivalent: INotification

Example::

@dataclass(frozen=True)
class OrderPlaced(INotification):
    order_id: str
    customer_id: str


# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
    aggregate_id: str
    occurred_at: datetime


@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str

pipeline

NextHandlerType module-attribute
NextHandlerType = Callable[[RequestT], Awaitable[ResponseT]]
IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async
handle(request, /, next_handler)

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

request

ResponseT module-attribute
ResponseT = TypeVar(
    'ResponseT',
    bound='Response | None',
    default=None,
    covariant=True,
)
RequestT module-attribute
RequestT = TypeVar(
    'RequestT', bound=IRequest[Any], contravariant=True
)
IRequest

Bases: Protocol[ResponseT]

Marker interface for request-type objects (commands/queries).

This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.

MediatR equivalent: IRequest

Example::

@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
    user_id: str


@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
    customer_id: str
    items: list[OrderItem]
Request dataclass
Request(*, request_id=uuid4())

Bases: IRequest[ResponseT], Generic[ResponseT]

Convenience base class for requests with auto-generated ID.

Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.

Example::

@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
    user_id: str
request_id class-attribute instance-attribute
request_id = field(default_factory=uuid4)
Response dataclass
Response()

Base class for response type objects.

events

EventHandler

Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]

Abstract base class for event handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.

Example::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, event: UserJoinedEvent, /) -> None:
        await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
handle abstractmethod async
handle(event)
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: NotificationT, /) -> None:
    raise NotImplementedError

EventPublisher

Bases: Protocol

GroupEventPublisher

SequentialEventPublisher

handler

INotificationHandler

Bases: Protocol[NotificationT]

Protocol for notification/event handlers.

MediatR equivalent: INotificationHandler

This protocol allows structural subtyping - any class with a matching handle method signature is compatible.

Example::

class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
    async def handle(self, event: OrderPlaced, /) -> None:
        await self._send_confirmation_email(event.order_id)
handle async
handle(event)

Handle the notification/event.

Source code in src/waku/cqrs/events/handler.py
async def handle(self, event: NotificationT, /) -> None:
    """Handle the notification/event."""
    ...
EventHandler

Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]

Abstract base class for event handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.

Example::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, event: UserJoinedEvent, /) -> None:
        await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
handle abstractmethod async
handle(event)
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: NotificationT, /) -> None:
    raise NotImplementedError

map

EventMapRegistry module-attribute
EventMapEntry dataclass
EventMapEntry(
    event_type, di_lookup_type, handler_types=list()
)

Bases: Generic[_EventT]

event_type instance-attribute
event_type
di_lookup_type instance-attribute
di_lookup_type
handler_types class-attribute instance-attribute
handler_types = field(default_factory=list)
for_event classmethod
for_event(event_type)
Source code in src/waku/cqrs/events/map.py
@classmethod
def for_event(cls, event_type: type[INotification]) -> Self:
    di_lookup_type = EventHandler[event_type]  # type: ignore[valid-type]
    return cls(event_type=event_type, di_lookup_type=di_lookup_type)  # type: ignore[type-abstract]
add
add(handler_type)
Source code in src/waku/cqrs/events/map.py
def add(self, handler_type: type[EventHandler[_EventT]]) -> None:
    if handler_type in self.handler_types:
        raise EventHandlerAlreadyRegistered(self.event_type, handler_type)
    self.handler_types.append(handler_type)
EventMap
EventMap()
Source code in src/waku/cqrs/events/map.py
def __init__(self) -> None:
    self._registry: EventMapRegistry = {}
    self._frozen = False
is_frozen property
is_frozen
registry property
registry
freeze
freeze()
Source code in src/waku/cqrs/events/map.py
def freeze(self) -> None:
    self._frozen = True
bind
bind(event_type, handler_types)
Source code in src/waku/cqrs/events/map.py
def bind(self, event_type: type[NotificationT], handler_types: list[type[EventHandler[NotificationT]]]) -> Self:
    if self._frozen:
        raise MapFrozenError
    if event_type not in self._registry:
        self._registry[event_type] = EventMapEntry.for_event(event_type)

    entry = self._registry[event_type]
    for handler_type in handler_types:
        entry.add(handler_type)  # type: ignore[arg-type]
    return self
merge
merge(other)
Source code in src/waku/cqrs/events/map.py
def merge(self, other: EventMap) -> Self:
    if self._frozen:
        raise MapFrozenError
    for event_type, entry in other._registry.items():
        self.bind(event_type, entry.handler_types)
    return self
has_handlers
has_handlers(event_type)
Source code in src/waku/cqrs/events/map.py
def has_handlers(self, event_type: type[INotification]) -> bool:
    return event_type in self._registry and len(self._registry[event_type].handler_types) > 0
get_handler_type
get_handler_type(event_type)
Source code in src/waku/cqrs/events/map.py
def get_handler_type(self, event_type: type[INotification]) -> type[EventHandler[INotification]]:
    return self._registry[event_type].di_lookup_type

publish

EventPublisher

Bases: Protocol

SequentialEventPublisher
GroupEventPublisher

exceptions

MediatorError

Bases: WakuError

Base exception for all cqrs-related errors.

MapFrozenError

MapFrozenError()

Bases: MediatorError

Source code in src/waku/cqrs/exceptions.py
def __init__(self) -> None:
    super().__init__('Cannot modify map after it is frozen')

ImproperlyConfiguredError

Bases: MediatorError

Raised when cqrs configuration is invalid.

RequestHandlerAlreadyRegistered

RequestHandlerAlreadyRegistered(request_type, handler_type)

Bases: MediatorError, KeyError

Raised when a request handler is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[IRequest[Any]], handler_type: type[IRequestHandler[Any, Any]]) -> None:
    self.request_type = request_type
    self.handler_type = handler_type
request_type instance-attribute
request_type = request_type
handler_type instance-attribute
handler_type = handler_type

RequestHandlerNotFound

RequestHandlerNotFound(request_type)

Bases: MediatorError, TypeError

Raised when a request handler is not found.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[IRequest[Any]]) -> None:
    self.request_type = request_type
request_type instance-attribute
request_type = request_type

EventHandlerAlreadyRegistered

EventHandlerAlreadyRegistered(event_type, handler_type)

Bases: MediatorError, KeyError

Raised when an notification handler is already registered.

ATTRIBUTE DESCRIPTION
event_type

The type of notification that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, event_type: type[INotification], handler_type: type[INotificationHandler[Any]]) -> None:
    self.event_type = event_type
    self.handler_type = handler_type
event_type instance-attribute
event_type = event_type
handler_type instance-attribute
handler_type = handler_type

PipelineBehaviorAlreadyRegistered

PipelineBehaviorAlreadyRegistered(
    request_type, behavior_type
)

Bases: MediatorError, KeyError

Raised when a pipeline behavior is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

behavior_type

The type of behavior that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[IRequest[Any]], behavior_type: type[IPipelineBehavior[Any, Any]]) -> None:
    self.request_type = request_type
    self.behavior_type = behavior_type
request_type instance-attribute
request_type = request_type
behavior_type instance-attribute
behavior_type = behavior_type

impl

Mediator

Mediator(container, event_publisher, registry)

Bases: IMediator

Default CQRS implementation.

Source code in src/waku/cqrs/impl.py
def __init__(
    self,
    container: AsyncContainer,
    event_publisher: EventPublisher,
    registry: MediatorRegistry,
) -> None:
    self._container = container
    self._event_publisher = event_publisher
    self._registry = registry
send async
send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)
Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: IRequest[Any], /) -> Any:
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)  # pyrefly: ignore[bad-argument-type]
    return await self._handle_request(handler, request)
publish async
publish(notification)
Source code in src/waku/cqrs/impl.py
@override
async def publish(self, notification: INotification, /) -> None:
    event_type = type(notification)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, notification)

interfaces

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async
send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: IRequest[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish notification through the cqrs to be handled by multiple handlers.

publish abstractmethod async
publish(notification)

Asynchronously send notification to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, notification: INotification, /) -> None:
    """Asynchronously send notification to multiple handlers."""

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async
publish(notification)

Asynchronously send notification to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, notification: INotification, /) -> None:
    """Asynchronously send notification to multiple handlers."""
send abstractmethod async
send(request: IRequest[None]) -> None
send(request: IRequest[ResponseT]) -> ResponseT
send(request)

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: IRequest[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

modules

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type=Mediator,
    event_publisher=SequentialEventPublisher,
    pipeline_behaviors=(),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)
mediator_implementation_type class-attribute instance-attribute
mediator_implementation_type = Mediator
event_publisher class-attribute instance-attribute
event_publisher = SequentialEventPublisher
pipeline_behaviors class-attribute instance-attribute
pipeline_behaviors = ()

MediatorModule

register classmethod
register(config=None)

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        extensions=[MediatorRegistryAggregator()],
        is_global=True,
    )

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._registry = MediatorRegistry()
registry property
registry
on_module_configure
on_module_configure(metadata)
Source code in src/waku/cqrs/modules.py
@override
def on_module_configure(self, metadata: 'ModuleMetadata') -> None:
    pass
bind_request
bind_request(request_type, handler_type, *, behaviors=None)
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]] | None = None,
) -> Self:
    self._registry.request_map.bind(request_type, handler_type)
    if behaviors:
        self._registry.behavior_map.bind(request_type, behaviors)
    return self
bind_event
bind_event(event_type, handler_types)
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[NotificationT],
    handler_types: list[type[EventHandler[NotificationT]]],
) -> Self:
    self._registry.event_map.bind(event_type, handler_types)
    return self

MediatorRegistryAggregator

Bases: OnModuleRegistration

on_module_registration
on_module_registration(registry, owning_module, context)
Source code in src/waku/cqrs/modules.py
@override
def on_module_registration(
    self,
    registry: ModuleMetadataRegistry,
    owning_module: 'ModuleType',
    context: Mapping[Any, Any] | None,
) -> None:
    aggregated = MediatorRegistry()

    for module_type, ext in registry.find_extensions(MediatorExtension):
        aggregated.merge(ext.registry)
        for provider in ext.registry.handler_providers():
            registry.add_provider(module_type, provider)

    for provider in aggregated.collector_providers():
        registry.add_provider(owning_module, provider)

    aggregated.freeze()
    registry.add_provider(owning_module, object_(aggregated))

pipeline

PipelineBehaviorWrapper

PipelineBehaviorWrapper(behaviors)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    self._behaviors = tuple(behaviors)
wrap
wrap(handle)

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    if not self._behaviors:
        return handle

    behaviors = self._behaviors

    async def pipeline(request: RequestT) -> ResponseT:
        async def execute(req: RequestT, idx: int) -> ResponseT:
            if idx >= len(behaviors):
                return await handle(req)
            return await behaviors[idx].handle(req, next_handler=lambda r: execute(r, idx + 1))

        return await execute(request, 0)

    return pipeline

chain

PipelineBehaviorWrapper
PipelineBehaviorWrapper(behaviors)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    self._behaviors = tuple(behaviors)
wrap
wrap(handle)

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    if not self._behaviors:
        return handle

    behaviors = self._behaviors

    async def pipeline(request: RequestT) -> ResponseT:
        async def execute(req: RequestT, idx: int) -> ResponseT:
            if idx >= len(behaviors):
                return await handle(req)
            return await behaviors[idx].handle(req, next_handler=lambda r: execute(r, idx + 1))

        return await execute(request, 0)

    return pipeline

map

PipelineBehaviorMapRegistry module-attribute
PipelineBehaviorMapEntry dataclass
PipelineBehaviorMapEntry(
    request_type, di_lookup_type, behavior_types=list()
)

Bases: Generic[RequestT, ResponseT]

request_type instance-attribute
request_type
di_lookup_type instance-attribute
di_lookup_type
behavior_types class-attribute instance-attribute
behavior_types = field(default_factory=list)
for_request classmethod
for_request(request_type)
Source code in src/waku/cqrs/pipeline/map.py
@classmethod
def for_request(cls, request_type: type[IRequest[ResponseT]]) -> Self:
    response_type = get_request_response_type(request_type)
    di_lookup_type = IPipelineBehavior[request_type, response_type]  # type: ignore[valid-type]
    return cls(request_type=request_type, di_lookup_type=di_lookup_type)  # type: ignore[type-abstract]
add
add(behavior_type)
Source code in src/waku/cqrs/pipeline/map.py
def add(self, behavior_type: type[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    if behavior_type in self.behavior_types:
        raise PipelineBehaviorAlreadyRegistered(self.request_type, behavior_type)
    self.behavior_types.append(behavior_type)
PipelineBehaviorMap
PipelineBehaviorMap()
Source code in src/waku/cqrs/pipeline/map.py
def __init__(self) -> None:
    self._registry: PipelineBehaviorMapRegistry[Any, Any] = {}
    self._frozen = False
is_frozen property
is_frozen
registry property
registry
freeze
freeze()
Source code in src/waku/cqrs/pipeline/map.py
def freeze(self) -> None:
    self._frozen = True
bind
bind(request_type, behavior_types)
Source code in src/waku/cqrs/pipeline/map.py
def bind(
    self,
    request_type: type[RequestT],
    behavior_types: list[type[IPipelineBehavior[RequestT, ResponseT]]],
) -> Self:
    if self._frozen:
        raise MapFrozenError
    if request_type not in self._registry:
        self._registry[request_type] = PipelineBehaviorMapEntry.for_request(request_type)

    entry = self._registry[request_type]
    for behavior_type in behavior_types:
        entry.add(behavior_type)
    return self
merge
merge(other)
Source code in src/waku/cqrs/pipeline/map.py
def merge(self, other: PipelineBehaviorMap) -> Self:
    if self._frozen:
        raise MapFrozenError
    for request_type, entry in other._registry.items():
        self.bind(request_type, entry.behavior_types)
    return self
has_behaviors
has_behaviors(request_type)
Source code in src/waku/cqrs/pipeline/map.py
def has_behaviors(self, request_type: type[RequestT]) -> bool:
    return request_type in self._registry and len(self._registry[request_type].behavior_types) > 0
get_lookup_type
get_lookup_type(request_type)
Source code in src/waku/cqrs/pipeline/map.py
def get_lookup_type(self, request_type: type[RequestT]) -> type[IPipelineBehavior[Any, Any]]:
    return self._registry[request_type].di_lookup_type

registry

MediatorRegistry dataclass

MediatorRegistry(
    *,
    request_map=RequestMap(),
    event_map=EventMap(),
    behavior_map=PipelineBehaviorMap(),
)
request_map class-attribute instance-attribute
request_map = field(default_factory=RequestMap)
event_map class-attribute instance-attribute
event_map = field(default_factory=EventMap)
behavior_map class-attribute instance-attribute
behavior_map = field(default_factory=PipelineBehaviorMap)
merge
merge(other)
Source code in src/waku/cqrs/registry.py
def merge(self, other: MediatorRegistry) -> None:
    self.request_map.merge(other.request_map)
    self.event_map.merge(other.event_map)
    self.behavior_map.merge(other.behavior_map)
freeze
freeze()
Source code in src/waku/cqrs/registry.py
def freeze(self) -> None:
    self.request_map.freeze()
    self.event_map.freeze()
    self.behavior_map.freeze()
handler_providers
handler_providers()
Source code in src/waku/cqrs/registry.py
def handler_providers(self) -> Iterator[Provider]:
    for entry in self.request_map.registry.values():
        yield scoped(entry.di_lookup_type, entry.handler_type)
    for entry in self.event_map.registry.values():
        yield many(entry.di_lookup_type, *entry.handler_types, collect=False)
    for entry in self.behavior_map.registry.values():
        yield many(entry.di_lookup_type, *entry.behavior_types, collect=False)
collector_providers
collector_providers()
Source code in src/waku/cqrs/registry.py
def collector_providers(self) -> Iterator[Provider]:
    for entry in self.event_map.registry.values():
        yield many(entry.di_lookup_type, collect=True)
    for entry in self.behavior_map.registry.values():
        yield many(entry.di_lookup_type, collect=True)

requests

RequestHandler

Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]

Abstract base class for request handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: JoinMeetingCommand, /) -> None:
        await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
        link = await self._meetings_api.get_link(request.meeting_id)
        return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handle abstractmethod async
handle(request)
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

handler

IRequestHandler

Bases: Protocol[RequestT, ResponseT]

Protocol for request handlers (commands/queries).

MediatR equivalent: IRequestHandler

This protocol allows structural subtyping - any class with a matching handle method signature is compatible.

Example::

class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
    async def handle(self, request: GetUserQuery, /) -> UserDTO:
        return await self._repository.get(request.user_id)
handle async
handle(request)

Handle the request and return a response.

Source code in src/waku/cqrs/requests/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    """Handle the request and return a response."""
    ...
RequestHandler

Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]

Abstract base class for request handlers.

Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: JoinMeetingCommand, /) -> None:
        await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
        link = await self._meetings_api.get_link(request.meeting_id)
        return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handle abstractmethod async
handle(request)
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

map

RequestMapRegistry module-attribute
RequestMapRegistry = MutableMapping[
    type[IRequest[Response | None]],
    RequestMapEntry[
        IRequest[Response | None], Response | None
    ],
]
RequestMapEntry dataclass
RequestMapEntry(handler_type, di_lookup_type)

Bases: Generic[_MapReqT, _MapResT]

handler_type instance-attribute
handler_type
di_lookup_type instance-attribute
di_lookup_type
RequestMap
RequestMap()
Source code in src/waku/cqrs/requests/map.py
def __init__(self) -> None:
    self._registry: RequestMapRegistry = {}
    self._frozen = False
is_frozen property
is_frozen
registry property
registry
freeze
freeze()
Source code in src/waku/cqrs/requests/map.py
def freeze(self) -> None:
    self._frozen = True
bind
bind(request_type, handler_type)
Source code in src/waku/cqrs/requests/map.py
def bind(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, ResponseT]],
) -> Self:
    if self._frozen:
        raise MapFrozenError
    if request_type in self._registry:
        raise RequestHandlerAlreadyRegistered(request_type, handler_type)
    response_type = get_request_response_type(request_type)
    di_lookup_type = RequestHandler[request_type, response_type]  # type: ignore[valid-type]
    self._registry[request_type] = RequestMapEntry(handler_type, di_lookup_type)  # type: ignore[type-abstract, arg-type]
    return self
merge
merge(other)
Source code in src/waku/cqrs/requests/map.py
def merge(self, other: RequestMap) -> Self:
    if self._frozen:
        raise MapFrozenError
    for request_type, entry in other._registry.items():
        self.bind(request_type, entry.handler_type)  # ty: ignore[invalid-argument-type]
    return self
has_handler
has_handler(request_type)
Source code in src/waku/cqrs/requests/map.py
def has_handler(self, request_type: type[RequestT]) -> bool:
    return request_type in self._registry
get_handler_type
get_handler_type(request_type)
Source code in src/waku/cqrs/requests/map.py
def get_handler_type(self, request_type: type[RequestT]) -> type[RequestHandler[RequestT, Response | None]]:
    return self._registry[request_type].di_lookup_type

utils

get_request_response_type cached

get_request_response_type(request_type)

Extract the response type from an IRequest implementation.

Searches through the class hierarchy to find IRequest or its subclasses, supporting direct implementations, Request subclasses, and nested inheritance.

RAISES DESCRIPTION
TypeError

if response type cannot be extracted from the request type.

Source code in src/waku/cqrs/utils.py
@functools.cache
def get_request_response_type(request_type: type[IRequest[ResponseT]]) -> type[ResponseT]:
    """Extract the response type from an IRequest implementation.

    Searches through the class hierarchy to find IRequest or its subclasses,
    supporting direct implementations, Request subclasses, and nested inheritance.

    Raises:
        TypeError: if response type cannot be extracted from the request type.
    """
    for cls in request_type.__mro__:
        if cls is object:
            break
        if response_type := _extract_response_from_bases(cls):
            return response_type  # type: ignore[return-value]

    msg = f'Could not extract response type from {request_type.__name__}'
    raise TypeError(msg)

di

activator

activator(fn, *markers)

Create a Provider with an activator for simple cases.

PARAMETER DESCRIPTION
fn

Callable that returns bool to determine marker activation.

TYPE: Callable[..., bool]

*markers

Marker instances or types to activate.

TYPE: Any DEFAULT: ()

RETURNS DESCRIPTION
Provider

Provider with the activator registered.

Source code in src/waku/di/_providers.py
def activator(fn: Callable[..., bool], *markers: Any) -> Provider:
    """Create a Provider with an activator for simple cases.

    Args:
        fn: Callable that returns bool to determine marker activation.
        *markers: Marker instances or types to activate.

    Returns:
        Provider with the activator registered.
    """
    p = Provider()
    p.activate(fn, *markers)
    return p

contextual

contextual(provided_type, *, scope=REQUEST)

Provide a dependency from the current context (e.g., app/request).

PARAMETER DESCRIPTION
provided_type

The type to resolve from context.

TYPE: Any

scope

Scope of the context variable (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

RETURNS DESCRIPTION
Provider

Provider configured for context resolution.

Source code in src/waku/di/_providers.py
def contextual(
    provided_type: Any,
    *,
    scope: Scope = Scope.REQUEST,
) -> Provider:
    """Provide a dependency from the current context (e.g., app/request).

    Args:
        provided_type: The type to resolve from context.
        scope: Scope of the context variable (default: Scope.REQUEST).

    Returns:
        Provider configured for context resolution.
    """
    provider_ = Provider()
    provider_.from_context(provided_type, scope=scope)
    return provider_

many

many(
    interface,
    *implementations,
    scope=REQUEST,
    cache=True,
    when=None,
    collect=True,
)

Register multiple implementations as a collection.

PARAMETER DESCRIPTION
interface

Interface type for the collection.

TYPE: Any

*implementations

Implementation types or factory functions to include in collection.

TYPE: Any DEFAULT: ()

scope

Scope of the collection (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

cache

Whether to cache the resolve results within scope.

TYPE: bool DEFAULT: True

when

Optional marker to conditionally activate the provider.

TYPE: BaseMarker | None DEFAULT: None

collect

Whether to include collect+alias for Sequence/list resolution. Set to False to only register implementations without the collector.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Provider

Provider configured for collection resolution.

RAISES DESCRIPTION
ValueError

If no implementations and collect is False.

Source code in src/waku/di/_providers.py
def many(
    interface: Any,
    *implementations: Any,
    scope: Scope = Scope.REQUEST,
    cache: bool = True,
    when: BaseMarker | None = None,
    collect: bool = True,
) -> Provider:
    """Register multiple implementations as a collection.

    Args:
        interface: Interface type for the collection.
        *implementations: Implementation types or factory functions to include in collection.
        scope: Scope of the collection (default: Scope.REQUEST).
        cache: Whether to cache the resolve results within scope.
        when: Optional marker to conditionally activate the provider.
        collect: Whether to include collect+alias for Sequence/list resolution.
            Set to False to only register implementations without the collector.

    Returns:
        Provider configured for collection resolution.

    Raises:
        ValueError: If no implementations and collect is False.
    """
    if not implementations and not collect:
        msg = 'At least one implementation must be provided when collect=False'
        raise ValueError(msg)

    provider_ = Provider(scope=scope)
    for impl in implementations:
        provider_.provide(impl, provides=interface, cache=cache, when=when)
    if collect:
        provider_.collect(interface, scope=scope, cache=cache, provides=Sequence[interface])
        provider_.alias(Sequence[interface], provides=list[interface], cache=cache)
    return provider_

object_

object_(obj, *, provided_type=None, when=None)

Provide the exact object passed at creation time as a singleton dependency.

PARAMETER DESCRIPTION
obj

The instance to provide as-is.

TYPE: Any

provided_type

Explicit type to provide (default: inferred).

TYPE: Any | None DEFAULT: None

when

Optional marker to conditionally activate the provider.

TYPE: BaseMarker | None DEFAULT: None

RETURNS DESCRIPTION
Provider

Provider configured to return the given object.

Source code in src/waku/di/_providers.py
def object_(
    obj: Any,
    *,
    provided_type: Any | None = None,
    when: BaseMarker | None = None,
) -> Provider:
    """Provide the exact object passed at creation time as a singleton dependency.

    Args:
        obj: The instance to provide as-is.
        provided_type: Explicit type to provide (default: inferred).
        when: Optional marker to conditionally activate the provider.

    Returns:
        Provider configured to return the given object.
    """
    actual_type = provided_type if provided_type is not None else type(obj)
    return provider(lambda: obj, scope=Scope.APP, provided_type=actual_type, cache=True, when=when)

provider

provider(
    source,
    *,
    scope=REQUEST,
    provided_type=None,
    cache=True,
    when=None,
)
Source code in src/waku/di/_providers.py
def provider(
    source: Callable[..., Any] | type[Any],
    *,
    scope: Scope = Scope.REQUEST,
    provided_type: Any | None = None,
    cache: bool = True,
    when: BaseMarker | None = None,
) -> Provider:
    provider_ = Provider(scope=scope)
    provider_.provide(source, provides=provided_type, cache=cache, when=when)
    return provider_

scoped

scoped(
    interface_or_source,
    implementation=None,
    /,
    *,
    when=None,
)

Create a scoped provider (lifetime: request).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional marker to conditionally activate the provider.

TYPE: BaseMarker | None DEFAULT: None

RETURNS DESCRIPTION
Provider

Provider configured for request scope.

Source code in src/waku/di/_providers.py
def scoped(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: BaseMarker | None = None,
) -> Provider:
    """Create a scoped provider (lifetime: request).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional marker to conditionally activate the provider.

    Returns:
        Provider configured for request scope.
    """
    if implementation is not None:
        return provider(implementation, scope=Scope.REQUEST, provided_type=interface_or_source, when=when)
    return provider(interface_or_source, scope=Scope.REQUEST, when=when)

singleton

singleton(
    interface_or_source,
    implementation=None,
    /,
    *,
    when=None,
)

Create a singleton provider (lifetime: app).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional marker to conditionally activate the provider.

TYPE: BaseMarker | None DEFAULT: None

RETURNS DESCRIPTION
Provider

Provider configured for singleton scope.

Source code in src/waku/di/_providers.py
def singleton(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: BaseMarker | None = None,
) -> Provider:
    """Create a singleton provider (lifetime: app).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional marker to conditionally activate the provider.

    Returns:
        Provider configured for singleton scope.
    """
    if implementation is not None:
        return provider(implementation, scope=Scope.APP, provided_type=interface_or_source, when=when)
    return provider(interface_or_source, scope=Scope.APP, when=when)

transient

transient(
    interface_or_source,
    implementation=None,
    /,
    *,
    when=None,
)

Create a transient provider (new instance per injection).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional marker to conditionally activate the provider.

TYPE: BaseMarker | None DEFAULT: None

RETURNS DESCRIPTION
Provider

Provider configured for transient (no cache) scope.

Source code in src/waku/di/_providers.py
def transient(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: BaseMarker | None = None,
) -> Provider:
    """Create a transient provider (new instance per injection).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional marker to conditionally activate the provider.

    Returns:
        Provider configured for transient (no cache) scope.
    """
    if implementation is not None:
        return provider(implementation, scope=Scope.REQUEST, provided_type=interface_or_source, cache=False, when=when)
    return provider(interface_or_source, scope=Scope.REQUEST, cache=False, when=when)

eventsourcing

ExpectedVersion module-attribute

ExpectedVersion = (
    Exact | NoStream | StreamExists | AnyVersion
)

EventTypeSpec module-attribute

EventTypeSpec = 'type[INotification] | EventType'

EventSourcedAggregate

EventSourcedAggregate()

Bases: ABC

Source code in src/waku/eventsourcing/contracts/aggregate.py
def __init__(self) -> None:
    self._version = -1
    self._pending_events = []

version property

version

collect_events

collect_events()
Source code in src/waku/eventsourcing/contracts/aggregate.py
def collect_events(self) -> list[INotification]:
    events = list(self._pending_events)
    self._pending_events.clear()
    return events

mark_persisted

mark_persisted(version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def mark_persisted(self, version: int) -> None:
    self._version = version

load_from_history

load_from_history(events, version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def load_from_history(self, events: Sequence[INotification], version: int) -> None:
    for event in events:
        self._apply(event)
    self._version = version

IDecider

Bases: Protocol[StateT, CommandT, EventT]

initial_state

initial_state()
Source code in src/waku/eventsourcing/contracts/aggregate.py
def initial_state(self) -> StateT: ...

decide

decide(command, state)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def decide(self, command: CommandT, state: StateT) -> Sequence[EventT]: ...

evolve

evolve(state, event)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def evolve(self, state: StateT, event: EventT) -> StateT: ...

EventEnvelope dataclass

EventEnvelope(
    *,
    domain_event,
    idempotency_key,
    metadata=EventMetadata(),
)

domain_event instance-attribute

domain_event

idempotency_key instance-attribute

idempotency_key

metadata class-attribute instance-attribute

metadata = field(default_factory=EventMetadata)

EventMetadata dataclass

EventMetadata(
    *, correlation_id=None, causation_id=None, extra=dict()
)

correlation_id class-attribute instance-attribute

correlation_id = None

causation_id class-attribute instance-attribute

causation_id = None

extra class-attribute instance-attribute

extra = field(default_factory=dict)

IMetadataEnricher

Bases: ABC

Enriches event metadata before persistence.

enrich abstractmethod

enrich(metadata)
Source code in src/waku/eventsourcing/contracts/event.py
@abc.abstractmethod
def enrich(self, metadata: EventMetadata, /) -> EventMetadata: ...

StoredEvent dataclass

StoredEvent(
    *,
    event_id,
    stream_id,
    event_type,
    position,
    global_position,
    timestamp,
    data,
    metadata,
    idempotency_key,
    schema_version=1,
)

event_id instance-attribute

event_id

stream_id instance-attribute

stream_id

event_type instance-attribute

event_type

position instance-attribute

position

global_position instance-attribute

global_position

timestamp instance-attribute

timestamp

data instance-attribute

data

metadata instance-attribute

metadata

idempotency_key instance-attribute

idempotency_key

schema_version class-attribute instance-attribute

schema_version = 1

AnyVersion dataclass

AnyVersion()

Exact dataclass

Exact(version)

version instance-attribute

version

NoStream dataclass

NoStream()

StreamExists dataclass

StreamExists()

StreamId dataclass

StreamId(stream_type, stream_key)

stream_type instance-attribute

stream_type

stream_key instance-attribute

stream_key

value property

value

for_aggregate classmethod

for_aggregate(aggregate_type, aggregate_id)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def for_aggregate(cls, aggregate_type: str, aggregate_id: str) -> StreamId:
    return cls(stream_type=aggregate_type, stream_key=aggregate_id)

from_value classmethod

from_value(value)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def from_value(cls, value: str) -> StreamId:
    stream_type, sep, stream_key = value.partition('-')
    if not sep or not stream_type or not stream_key:
        msg = f"Invalid stream ID format: {value!r}. Expected '{{stream_type}}-{{stream_key}}'"
        raise ValueError(msg)
    return cls(stream_type=stream_type, stream_key=stream_key)

StreamPosition

Bases: Enum

START class-attribute instance-attribute

START = 'start'

END class-attribute instance-attribute

END = 'end'

DeciderCommandHandler

DeciderCommandHandler(repository, decider, publisher)

Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher

handle async

handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)

DeciderRepository

DeciderRepository(decider, event_store)

Bases: ABC, Generic[StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
) -> None:
    self._decider = decider
    self._event_store = event_store

aggregate_name class-attribute

aggregate_name

max_stream_length class-attribute

max_stream_length = None

load async

load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    count = self.max_stream_length + 1 if self.max_stream_length is not None else None
    try:
        stored_events = await self._event_store.read_stream(stream_id, count=count)
    except StreamNotFoundError:
        raise AggregateNotFoundError(
            aggregate_type=self.aggregate_name,
            aggregate_id=aggregate_id,
        ) from None
    if self.max_stream_length is not None and len(stored_events) > self.max_stream_length:
        raise StreamTooLargeError(stream_id, self.max_stream_length)
    state = self._decider.initial_state()
    for stored in stored_events:
        state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
    version = len(stored_events) - 1
    return state, version

save async

save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,  # noqa: ARG002
    idempotency_key: str | None = None,
) -> int:
    if not events:
        return expected_version
    stream_id = self._stream_id(aggregate_id)
    envelopes = [
        EventEnvelope(
            domain_event=e,
            idempotency_key=f'{idempotency_key}:{i}' if idempotency_key else str(uuid.uuid4()),
        )
        for i, e in enumerate(events)
    ]
    expected = Exact(version=expected_version) if expected_version >= 0 else NoStream()
    return await self._event_store.append_to_stream(stream_id, envelopes, expected_version=expected)

DeciderVoidCommandHandler

DeciderVoidCommandHandler(repository, decider, publisher)

Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher

handle async

handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)

SnapshotDeciderRepository

SnapshotDeciderRepository(
    decider,
    event_store,
    snapshot_store,
    snapshot_config_registry,
    state_serializer,
)

Bases: DeciderRepository[StateT, CommandT, EventT], ABC

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
    snapshot_store: ISnapshotStore,
    snapshot_config_registry: SnapshotConfigRegistry,
    state_serializer: ISnapshotStateSerializer,
) -> None:
    super().__init__(decider, event_store)
    self._snapshot_store = snapshot_store
    self._state_serializer = state_serializer
    self._last_snapshot_versions: dict[str, int] = {}
    self._state_type: type[StateT] = type(self._decider.initial_state())
    snapshot_config = snapshot_config_registry.get(self.aggregate_name)
    self._snapshot_strategy = snapshot_config.strategy
    self._snapshot_schema_version = snapshot_config.schema_version
    self._migration_chain = snapshot_config.migration_chain

aggregate_name class-attribute

aggregate_name

max_stream_length class-attribute

max_stream_length = None

load async

load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    snapshot = await self._snapshot_store.load(stream_id)

    if snapshot is not None:
        if snapshot.state_type != self._state_type.__name__:
            raise SnapshotTypeMismatchError(stream_id, self._state_type.__name__, snapshot.state_type)

        if snapshot.schema_version != self._snapshot_schema_version:
            snapshot = migrate_snapshot_or_discard(
                self._migration_chain,
                snapshot,
                self._snapshot_schema_version,
                stream_id,
            )
            if snapshot is None:
                self._last_snapshot_versions[aggregate_id] = -1
                return await super().load(aggregate_id)

        self._last_snapshot_versions[aggregate_id] = snapshot.version
        state = self._state_serializer.deserialize(snapshot.state, self._state_type)
        try:
            stored_events = await self._event_store.read_stream(stream_id, start=snapshot.version + 1)
        except StreamNotFoundError:
            stored_events = []
        for stored in stored_events:
            state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
        version = snapshot.version + len(stored_events)
        return state, version

    self._last_snapshot_versions[aggregate_id] = -1
    return await super().load(aggregate_id)

save async

save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,
    idempotency_key: str | None = None,
) -> int:
    new_version = await super().save(
        aggregate_id,
        events,
        expected_version,
        current_state=current_state,
        idempotency_key=idempotency_key,
    )

    if events:
        last_snapshot_version = self._last_snapshot_versions.get(aggregate_id, -1)
        events_since_snapshot = new_version - last_snapshot_version
        if self._snapshot_strategy.should_snapshot(new_version, events_since_snapshot):
            if current_state is not None:
                state = current_state
            else:
                state, _ = await self.load(aggregate_id)
            state_data = self._state_serializer.serialize(state)
            new_snapshot = Snapshot(
                stream_id=self._stream_id(aggregate_id),
                state=state_data,
                version=new_version,
                state_type=self._state_type.__name__,
                schema_version=self._snapshot_schema_version,
            )
            await self._snapshot_store.save(new_snapshot)
            self._last_snapshot_versions[aggregate_id] = new_version

    return new_version

AggregateNotFoundError

AggregateNotFoundError(aggregate_type, aggregate_id)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_type: str, aggregate_id: str) -> None:
    self.aggregate_type = aggregate_type
    self.aggregate_id = aggregate_id
    super().__init__(f'{aggregate_type} with id {aggregate_id!r} not found')

aggregate_type instance-attribute

aggregate_type = aggregate_type

aggregate_id instance-attribute

aggregate_id = aggregate_id

ConcurrencyConflictError

ConcurrencyConflictError(
    stream_id, expected_version, actual_version
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, expected_version: int, actual_version: int) -> None:
    self.stream_id = stream_id
    self.expected_version = expected_version
    self.actual_version = actual_version
    super().__init__(
        f'Concurrency conflict on stream {stream_id}: expected version {expected_version}, actual {actual_version}'
    )

stream_id instance-attribute

stream_id = stream_id

expected_version instance-attribute

expected_version = expected_version

actual_version instance-attribute

actual_version = actual_version

ConflictingEventTypeError

ConflictingEventTypeError(
    event_type,
    existing_name,
    existing_version,
    attempted_name,
    attempted_version,
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(
    self,
    event_type: type,
    existing_name: str,
    existing_version: int,
    attempted_name: str,
    attempted_version: int,
) -> None:
    self.event_type = event_type
    self.existing_name = existing_name
    self.existing_version = existing_version
    self.attempted_name = attempted_name
    self.attempted_version = attempted_version
    if existing_name != attempted_name:
        detail = f'name {existing_name!r} → {attempted_name!r}'
    else:
        detail = f'version v{existing_version} → v{attempted_version}'
    super().__init__(f'Conflicting registration for event type {event_type.__name__!r}: {detail}')

event_type instance-attribute

event_type = event_type

existing_name instance-attribute

existing_name = existing_name

existing_version instance-attribute

existing_version = existing_version

attempted_name instance-attribute

attempted_name = attempted_name

attempted_version instance-attribute

attempted_version = attempted_version

DuplicateAggregateNameError

DuplicateAggregateNameError(aggregate_name, repositories)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_name: str, repositories: list[type]) -> None:
    self.aggregate_name = aggregate_name
    self.repositories = repositories
    repo_names = ', '.join(r.__name__ for r in repositories)
    super().__init__(f'Duplicate aggregate name {aggregate_name!r} used by multiple repositories: {repo_names}')

aggregate_name instance-attribute

aggregate_name = aggregate_name

repositories instance-attribute

repositories = repositories

DuplicateEventTypeError

DuplicateEventTypeError(event_type_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, event_type_name: str) -> None:
    self.event_type_name = event_type_name
    super().__init__(f'Event type {event_type_name!r} is already registered')

event_type_name instance-attribute

event_type_name = event_type_name

DuplicateIdempotencyKeyError

DuplicateIdempotencyKeyError(stream_id, *, reason)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, *, reason: str) -> None:
    self.stream_id = stream_id
    self.reason = reason
    super().__init__(f'Duplicate idempotency keys ({reason}) on stream {stream_id}')

stream_id instance-attribute

stream_id = stream_id

reason instance-attribute

reason = reason

EventSourcingError

Bases: WakuError

PartialDuplicateAppendError

PartialDuplicateAppendError(
    stream_id, existing_count, total_count
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, existing_count: int, total_count: int) -> None:
    self.stream_id = stream_id
    self.existing_count = existing_count
    self.total_count = total_count
    super().__init__(
        f'Partial duplicate append on stream {stream_id}: '
        f'{existing_count} of {total_count} idempotency keys already exist'
    )

stream_id instance-attribute

stream_id = stream_id

existing_count instance-attribute

existing_count = existing_count

total_count instance-attribute

total_count = total_count

ProjectionError

ProjectionStoppedError

ProjectionStoppedError(projection_name, cause)

Bases: ProjectionError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, projection_name: str, cause: Exception) -> None:
    self.projection_name = projection_name
    self.cause = cause
    super().__init__(f'Projection {projection_name!r} stopped due to error: {cause}')

projection_name instance-attribute

projection_name = projection_name

cause instance-attribute

cause = cause

RegistryFrozenError

RegistryFrozenError()

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self) -> None:
    super().__init__('Cannot register event types after registry is frozen')

RetryExhaustedError

RetryExhaustedError(projection_name, attempts, cause)

Bases: ProjectionError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, projection_name: str, attempts: int, cause: Exception) -> None:
    self.projection_name = projection_name
    self.attempts = attempts
    self.cause = cause
    super().__init__(f'Projection {projection_name!r} exhausted {attempts} retry attempts: {cause}')

projection_name instance-attribute

projection_name = projection_name

attempts instance-attribute

attempts = attempts

cause instance-attribute

cause = cause

SnapshotConfigNotFoundError

SnapshotConfigNotFoundError(aggregate_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_name: str) -> None:
    self.aggregate_name = aggregate_name
    super().__init__(
        f'No snapshot config found for aggregate {aggregate_name!r}. '
        f'Provide snapshot=SnapshotOptions(...) via bind_aggregate() or bind_decider().'
    )

aggregate_name instance-attribute

aggregate_name = aggregate_name

SnapshotMigrationChainError

SnapshotTypeMismatchError

SnapshotTypeMismatchError(
    stream_id, expected_type, actual_type
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, expected_type: str, actual_type: str) -> None:
    self.stream_id = stream_id
    self.expected_type = expected_type
    self.actual_type = actual_type
    super().__init__(
        f'Snapshot type mismatch on stream {stream_id}: expected {expected_type!r}, got {actual_type!r}'
    )

stream_id instance-attribute

stream_id = stream_id

expected_type instance-attribute

expected_type = expected_type

actual_type instance-attribute

actual_type = actual_type

StreamNotFoundError

StreamNotFoundError(stream_id)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId) -> None:
    self.stream_id = stream_id
    super().__init__(f'Stream {stream_id} not found')

stream_id instance-attribute

stream_id = stream_id

StreamTooLargeError

StreamTooLargeError(stream_id, max_length)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, max_length: int) -> None:
    self.stream_id = stream_id
    self.max_length = max_length
    super().__init__(
        f'Stream {stream_id} exceeds maximum length of {max_length} events. '
        f'Configure snapshots to reduce stream replay size.'
    )

stream_id instance-attribute

stream_id = stream_id

max_length instance-attribute

max_length = max_length

UnknownEventTypeError

UnknownEventTypeError(event_type_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, event_type_name: str) -> None:
    self.event_type_name = event_type_name
    super().__init__(f'Unknown event type: {event_type_name!r}')

event_type_name instance-attribute

event_type_name = event_type_name

UpcasterChainError

EventSourcedCommandHandler

EventSourcedCommandHandler(repository, publisher)

Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, AggregateT]

Source code in src/waku/eventsourcing/handler.py
def __init__(
    self,
    repository: EventSourcedRepository[AggregateT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._publisher = publisher

handle async

handle(request)
Source code in src/waku/eventsourcing/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)

    if self._is_creation_command(request):
        aggregate = self._repository.create_aggregate()
    else:
        aggregate = await self._repository.load(aggregate_id)

    await self._execute(request, aggregate)
    _, events = await self._repository.save(
        aggregate_id,
        aggregate,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(aggregate)

EventSourcedVoidCommandHandler

EventSourcedVoidCommandHandler(repository, publisher)

Bases: EventSourcedCommandHandler[RequestT, None, AggregateT], ABC, Generic[RequestT, AggregateT]

Source code in src/waku/eventsourcing/handler.py
def __init__(
    self,
    repository: EventSourcedRepository[AggregateT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._publisher = publisher

handle async

handle(request)
Source code in src/waku/eventsourcing/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)

    if self._is_creation_command(request):
        aggregate = self._repository.create_aggregate()
    else:
        aggregate = await self._repository.load(aggregate_id)

    await self._execute(request, aggregate)
    _, events = await self._repository.save(
        aggregate_id,
        aggregate,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(aggregate)

EventSourcingConfig dataclass

EventSourcingConfig(
    *,
    store=None,
    event_serializer=None,
    snapshot_store=None,
    snapshot_state_serializer=None,
    checkpoint_store=None,
    enrichers=(),
)

store class-attribute instance-attribute

store = None

event_serializer class-attribute instance-attribute

event_serializer = None

snapshot_store class-attribute instance-attribute

snapshot_store = None

snapshot_state_serializer class-attribute instance-attribute

snapshot_state_serializer = None

checkpoint_store class-attribute instance-attribute

checkpoint_store = None

enrichers class-attribute instance-attribute

enrichers = ()

EventSourcingExtension dataclass

EventSourcingExtension()

Bases: OnModuleConfigure

registry property

registry

bind_aggregate

bind_aggregate(
    repository,
    event_types=(),
    projections=(),
    snapshot=None,
)
Source code in src/waku/eventsourcing/modules.py
def bind_aggregate(
    self,
    repository: type[EventSourcedRepository[Any]],
    event_types: Sequence[EventTypeSpec] = (),
    projections: Sequence[type[IProjection]] = (),
    snapshot: SnapshotOptions | None = None,
) -> Self:
    self._bindings.append(
        AggregateBinding(
            repository=repository,
            event_types=event_types,
            projections=projections,
            snapshot=snapshot,
        )
    )
    self._registry.projection_types.extend(projections)
    self._registry.event_type_bindings.extend(event_types)
    return self

bind_decider

bind_decider(
    repository,
    decider,
    event_types=(),
    projections=(),
    snapshot=None,
)
Source code in src/waku/eventsourcing/modules.py
def bind_decider(
    self,
    repository: type[DeciderRepository[Any, Any, Any]],
    decider: type[IDecider[Any, Any, Any]],
    event_types: Sequence[EventTypeSpec] = (),
    projections: Sequence[type[IProjection]] = (),
    snapshot: SnapshotOptions | None = None,
) -> Self:
    self._decider_bindings.append(
        DeciderBinding(
            repository=repository,
            decider=decider,
            event_types=event_types,
            projections=projections,
            snapshot=snapshot,
        )
    )
    self._registry.projection_types.extend(projections)
    self._registry.event_type_bindings.extend(event_types)
    return self

bind_catch_up_projections

bind_catch_up_projections(projections)
Source code in src/waku/eventsourcing/modules.py
def bind_catch_up_projections(self, projections: Sequence[type[ICatchUpProjection]]) -> Self:
    self._registry.catch_up_projection_types.extend(projections)
    return self

aggregate_names

aggregate_names()
Source code in src/waku/eventsourcing/modules.py
def aggregate_names(self) -> Iterator[tuple[str, type]]:
    for binding in self._bindings:
        yield binding.repository.aggregate_name, binding.repository
    for binding in self._decider_bindings:
        yield binding.repository.aggregate_name, binding.repository

snapshot_bindings

snapshot_bindings()
Source code in src/waku/eventsourcing/modules.py
def snapshot_bindings(self) -> Iterator[tuple[str, SnapshotOptions]]:
    for binding in self._bindings:
        if binding.snapshot is not None:
            yield binding.repository.aggregate_name, binding.snapshot
    for binding in self._decider_bindings:
        if binding.snapshot is not None:
            yield binding.repository.aggregate_name, binding.snapshot

on_module_configure

on_module_configure(metadata)
Source code in src/waku/eventsourcing/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    for binding in self._bindings:
        repo_type = binding.repository
        metadata.providers.append(scoped(WithParents[repo_type], repo_type))  # type: ignore[misc,valid-type]

    for binding in self._decider_bindings:
        repo_type = binding.repository
        metadata.providers.append(scoped(WithParents[repo_type], repo_type))  # type: ignore[misc,valid-type]
        decider_iface = self._resolve_decider_interface(repo_type)
        metadata.providers.append(scoped(decider_iface, binding.decider))

EventSourcingModule

register classmethod

register(config=None)
Source code in src/waku/eventsourcing/modules.py
@classmethod
def register(cls, config: EventSourcingConfig | None = None, /) -> DynamicModule:
    config_ = config or EventSourcingConfig()
    providers: list[Provider] = []

    if config_.store is not None:
        providers.append(scoped(IEventStore, config_.store))
    else:
        providers.append(scoped(WithParents[IEventStore], InMemoryEventStore))  # ty:ignore[not-subscriptable]

    if config_.event_serializer is not None:
        providers.append(scoped(IEventSerializer, config_.event_serializer))

    if config_.snapshot_store is not None:
        providers.append(scoped(ISnapshotStore, config_.snapshot_store))

    if config_.snapshot_state_serializer is not None:
        providers.append(scoped(ISnapshotStateSerializer, config_.snapshot_state_serializer))

    if config_.checkpoint_store is not None:
        providers.append(scoped(ICheckpointStore, config_.checkpoint_store))

    providers.append(many(IMetadataEnricher, *config_.enrichers))

    return DynamicModule(
        parent_module=cls,
        providers=providers,
        extensions=[
            EventSourcingRegistryAggregator(has_serializer=config_.event_serializer is not None),
        ],
        is_global=True,
    )

EventType dataclass

EventType(
    event_type,
    *,
    name=None,
    aliases=(),
    version=1,
    upcasters=(),
)

event_type instance-attribute

event_type

name class-attribute instance-attribute

name = field(default=None, kw_only=True)

aliases class-attribute instance-attribute

aliases = field(default=(), kw_only=True)

version class-attribute instance-attribute

version = field(default=1, kw_only=True)

upcasters class-attribute instance-attribute

upcasters = field(default=(), kw_only=True)

SnapshotOptions dataclass

SnapshotOptions(
    *, strategy, schema_version=1, migrations=()
)

strategy instance-attribute

strategy

schema_version class-attribute instance-attribute

schema_version = 1

migrations class-attribute instance-attribute

migrations = ()

EventSourcedRepository

EventSourcedRepository(event_store)

Bases: ABC, Generic[AggregateT]

Source code in src/waku/eventsourcing/repository.py
def __init__(self, event_store: IEventStore) -> None:
    self._event_store = event_store

aggregate_name class-attribute

aggregate_name

max_stream_length class-attribute

max_stream_length = None

load async

load(aggregate_id)
Source code in src/waku/eventsourcing/repository.py
async def load(self, aggregate_id: str) -> AggregateT:
    stream_id = self._stream_id(aggregate_id)
    count = self.max_stream_length + 1 if self.max_stream_length is not None else None
    try:
        stored_events = await self._event_store.read_stream(stream_id, count=count)
    except StreamNotFoundError:
        raise AggregateNotFoundError(
            aggregate_type=self.aggregate_name,
            aggregate_id=aggregate_id,
        ) from None
    if self.max_stream_length is not None and len(stored_events) > self.max_stream_length:
        raise StreamTooLargeError(stream_id, self.max_stream_length)
    if not stored_events:
        msg = f'Stream contains no events: {stream_id}'
        raise EventSourcingError(msg)
    aggregate = self.create_aggregate()
    domain_events = [e.data for e in stored_events]
    version = len(stored_events) - 1
    aggregate.load_from_history(domain_events, version)
    return aggregate

save async

save(aggregate_id, aggregate, *, idempotency_key=None)
Source code in src/waku/eventsourcing/repository.py
async def save(
    self,
    aggregate_id: str,
    aggregate: AggregateT,
    *,
    idempotency_key: str | None = None,
) -> tuple[int, list[INotification]]:
    stream_id = self._stream_id(aggregate_id)
    events = aggregate.collect_events()
    if not events:
        return aggregate.version, []

    envelopes = [
        EventEnvelope(
            domain_event=event,
            idempotency_key=f'{idempotency_key}:{i}' if idempotency_key else str(uuid.uuid4()),
        )
        for i, event in enumerate(events)
    ]
    expected = Exact(version=aggregate.version) if aggregate.version >= 0 else NoStream()
    new_version = await self._event_store.append_to_stream(stream_id, envelopes, expected_version=expected)
    aggregate.mark_persisted(new_version)
    return new_version, events

create_aggregate

create_aggregate()
Source code in src/waku/eventsourcing/repository.py
def create_aggregate(self) -> AggregateT:
    aggregate_cls = self._resolve_aggregate_type()
    if aggregate_cls is None:
        msg = f'{type(self).__name__}: cannot auto-create aggregate, override create_aggregate()'
        raise TypeError(msg)
    return aggregate_cls()

FnUpcaster

FnUpcaster(from_version, fn)

Bases: IEventUpcaster

Source code in src/waku/eventsourcing/upcasting/fn.py
def __init__(self, from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> None:
    self.from_version = from_version
    self._fn = fn

from_version instance-attribute

from_version = from_version

upcast

upcast(data)
Source code in src/waku/eventsourcing/upcasting/fn.py
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]:
    return self._fn(data)

IEventUpcaster

Bases: ABC

from_version instance-attribute

from_version

upcast abstractmethod

upcast(data)
Source code in src/waku/eventsourcing/upcasting/interfaces.py
@abc.abstractmethod
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]: ...

UpcasterChain

UpcasterChain(upcasters_by_type)
Source code in src/waku/eventsourcing/upcasting/chain.py
def __init__(self, upcasters_by_type: Mapping[str, Sequence[IEventUpcaster]]) -> None:
    chains: dict[str, tuple[IEventUpcaster, ...]] = {}
    for event_type, upcasters in upcasters_by_type.items():
        sorted_upcasters = sorted(upcasters, key=lambda u: u.from_version)
        seen: set[int] = set()
        for u in sorted_upcasters:
            if u.from_version < 1:
                msg = f'Invalid from_version {u.from_version} for event type {event_type!r}: must be >= 1'
                raise UpcasterChainError(msg)
            if u.from_version in seen:
                msg = f'Duplicate upcaster for event type {event_type!r} at from_version {u.from_version}'
                raise UpcasterChainError(msg)
            seen.add(u.from_version)
        chains[event_type] = tuple(sorted_upcasters)
    self._chains = chains

upcast

upcast(event_type, data, schema_version)
Source code in src/waku/eventsourcing/upcasting/chain.py
def upcast(self, event_type: str, data: dict[str, Any], schema_version: int) -> dict[str, Any]:
    upcasters = self._chains.get(event_type)
    if not upcasters:
        return data
    if schema_version > upcasters[-1].from_version:
        return data
    for u in upcasters:
        if u.from_version >= schema_version:
            data = u.upcast(data)
    return data

add_field

add_field(from_version, *, field, default)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def add_field(from_version: int, *, field: str, default: Any) -> IEventUpcaster:
    def _add(data: dict[str, Any]) -> dict[str, Any]:
        result = dict(data)
        if field not in result:
            result[field] = copy.copy(default)
        return result

    return FnUpcaster(from_version, fn=_add)

noop

noop(from_version)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def noop(from_version: int) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=dict)

remove_field

remove_field(from_version, *, field)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def remove_field(from_version: int, *, field: str) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=lambda data: {k: v for k, v in data.items() if k != field})

rename_field

rename_field(from_version, *, old, new)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def rename_field(from_version: int, *, old: str, new: str) -> IEventUpcaster:
    def _rename(data: dict[str, Any]) -> dict[str, Any]:
        result = {k: v for k, v in data.items() if k != old}
        if old in data:
            result[new] = data[old]
        return result

    return FnUpcaster(from_version, fn=_rename)

upcast

upcast(from_version, fn)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def upcast(from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=fn)

contracts

ExpectedVersion module-attribute

ExpectedVersion = (
    Exact | NoStream | StreamExists | AnyVersion
)

EventSourcedAggregate

EventSourcedAggregate()

Bases: ABC

Source code in src/waku/eventsourcing/contracts/aggregate.py
def __init__(self) -> None:
    self._version = -1
    self._pending_events = []
version property
version
collect_events
collect_events()
Source code in src/waku/eventsourcing/contracts/aggregate.py
def collect_events(self) -> list[INotification]:
    events = list(self._pending_events)
    self._pending_events.clear()
    return events
mark_persisted
mark_persisted(version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def mark_persisted(self, version: int) -> None:
    self._version = version
load_from_history
load_from_history(events, version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def load_from_history(self, events: Sequence[INotification], version: int) -> None:
    for event in events:
        self._apply(event)
    self._version = version

EventEnvelope dataclass

EventEnvelope(
    *,
    domain_event,
    idempotency_key,
    metadata=EventMetadata(),
)
domain_event instance-attribute
domain_event
idempotency_key instance-attribute
idempotency_key
metadata class-attribute instance-attribute
metadata = field(default_factory=EventMetadata)

EventMetadata dataclass

EventMetadata(
    *, correlation_id=None, causation_id=None, extra=dict()
)
correlation_id class-attribute instance-attribute
correlation_id = None
causation_id class-attribute instance-attribute
causation_id = None
extra class-attribute instance-attribute
extra = field(default_factory=dict)

IMetadataEnricher

Bases: ABC

Enriches event metadata before persistence.

enrich abstractmethod
enrich(metadata)
Source code in src/waku/eventsourcing/contracts/event.py
@abc.abstractmethod
def enrich(self, metadata: EventMetadata, /) -> EventMetadata: ...

StoredEvent dataclass

StoredEvent(
    *,
    event_id,
    stream_id,
    event_type,
    position,
    global_position,
    timestamp,
    data,
    metadata,
    idempotency_key,
    schema_version=1,
)
event_id instance-attribute
event_id
stream_id instance-attribute
stream_id
event_type instance-attribute
event_type
position instance-attribute
position
global_position instance-attribute
global_position
timestamp instance-attribute
timestamp
data instance-attribute
data
metadata instance-attribute
metadata
idempotency_key instance-attribute
idempotency_key
schema_version class-attribute instance-attribute
schema_version = 1

AnyVersion dataclass

AnyVersion()

Exact dataclass

Exact(version)
version instance-attribute
version

NoStream dataclass

NoStream()

StreamExists dataclass

StreamExists()

StreamId dataclass

StreamId(stream_type, stream_key)
stream_type instance-attribute
stream_type
stream_key instance-attribute
stream_key
value property
value
for_aggregate classmethod
for_aggregate(aggregate_type, aggregate_id)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def for_aggregate(cls, aggregate_type: str, aggregate_id: str) -> StreamId:
    return cls(stream_type=aggregate_type, stream_key=aggregate_id)
from_value classmethod
from_value(value)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def from_value(cls, value: str) -> StreamId:
    stream_type, sep, stream_key = value.partition('-')
    if not sep or not stream_type or not stream_key:
        msg = f"Invalid stream ID format: {value!r}. Expected '{{stream_type}}-{{stream_key}}'"
        raise ValueError(msg)
    return cls(stream_type=stream_type, stream_key=stream_key)

StreamPosition

Bases: Enum

START class-attribute instance-attribute
START = 'start'
END class-attribute instance-attribute
END = 'end'

aggregate

StateT module-attribute
StateT = TypeVar('StateT')
CommandT module-attribute
CommandT = TypeVar('CommandT', contravariant=True)
EventT module-attribute
EventT = TypeVar('EventT', bound=INotification)
IDecider

Bases: Protocol[StateT, CommandT, EventT]

initial_state
initial_state()
Source code in src/waku/eventsourcing/contracts/aggregate.py
def initial_state(self) -> StateT: ...
decide
decide(command, state)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def decide(self, command: CommandT, state: StateT) -> Sequence[EventT]: ...
evolve
evolve(state, event)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def evolve(self, state: StateT, event: EventT) -> StateT: ...
EventSourcedAggregate
EventSourcedAggregate()

Bases: ABC

Source code in src/waku/eventsourcing/contracts/aggregate.py
def __init__(self) -> None:
    self._version = -1
    self._pending_events = []
version property
version
collect_events
collect_events()
Source code in src/waku/eventsourcing/contracts/aggregate.py
def collect_events(self) -> list[INotification]:
    events = list(self._pending_events)
    self._pending_events.clear()
    return events
mark_persisted
mark_persisted(version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def mark_persisted(self, version: int) -> None:
    self._version = version
load_from_history
load_from_history(events, version)
Source code in src/waku/eventsourcing/contracts/aggregate.py
def load_from_history(self, events: Sequence[INotification], version: int) -> None:
    for event in events:
        self._apply(event)
    self._version = version

event

EventMetadata dataclass
EventMetadata(
    *, correlation_id=None, causation_id=None, extra=dict()
)
correlation_id class-attribute instance-attribute
correlation_id = None
causation_id class-attribute instance-attribute
causation_id = None
extra class-attribute instance-attribute
extra = field(default_factory=dict)
IMetadataEnricher

Bases: ABC

Enriches event metadata before persistence.

enrich abstractmethod
enrich(metadata)
Source code in src/waku/eventsourcing/contracts/event.py
@abc.abstractmethod
def enrich(self, metadata: EventMetadata, /) -> EventMetadata: ...
EventEnvelope dataclass
EventEnvelope(
    *,
    domain_event,
    idempotency_key,
    metadata=EventMetadata(),
)
domain_event instance-attribute
domain_event
idempotency_key instance-attribute
idempotency_key
metadata class-attribute instance-attribute
metadata = field(default_factory=EventMetadata)
StoredEvent dataclass
StoredEvent(
    *,
    event_id,
    stream_id,
    event_type,
    position,
    global_position,
    timestamp,
    data,
    metadata,
    idempotency_key,
    schema_version=1,
)
event_id instance-attribute
event_id
stream_id instance-attribute
stream_id
event_type instance-attribute
event_type
position instance-attribute
position
global_position instance-attribute
global_position
timestamp instance-attribute
timestamp
data instance-attribute
data
metadata instance-attribute
metadata
idempotency_key instance-attribute
idempotency_key
schema_version class-attribute instance-attribute
schema_version = 1

stream

ExpectedVersion module-attribute
ExpectedVersion = (
    Exact | NoStream | StreamExists | AnyVersion
)
StreamId dataclass
StreamId(stream_type, stream_key)
stream_type instance-attribute
stream_type
stream_key instance-attribute
stream_key
value property
value
for_aggregate classmethod
for_aggregate(aggregate_type, aggregate_id)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def for_aggregate(cls, aggregate_type: str, aggregate_id: str) -> StreamId:
    return cls(stream_type=aggregate_type, stream_key=aggregate_id)
from_value classmethod
from_value(value)
Source code in src/waku/eventsourcing/contracts/stream.py
@classmethod
def from_value(cls, value: str) -> StreamId:
    stream_type, sep, stream_key = value.partition('-')
    if not sep or not stream_type or not stream_key:
        msg = f"Invalid stream ID format: {value!r}. Expected '{{stream_type}}-{{stream_key}}'"
        raise ValueError(msg)
    return cls(stream_type=stream_type, stream_key=stream_key)
Exact dataclass
Exact(version)
version instance-attribute
version
NoStream dataclass
NoStream()
StreamExists dataclass
StreamExists()
AnyVersion dataclass
AnyVersion()
StreamPosition

Bases: Enum

START class-attribute instance-attribute
START = 'start'
END class-attribute instance-attribute
END = 'end'

decider

DeciderCommandHandler

DeciderCommandHandler(repository, decider, publisher)

Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)

DeciderVoidCommandHandler

DeciderVoidCommandHandler(repository, decider, publisher)

Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)

DeciderRepository

DeciderRepository(decider, event_store)

Bases: ABC, Generic[StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
) -> None:
    self._decider = decider
    self._event_store = event_store
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    count = self.max_stream_length + 1 if self.max_stream_length is not None else None
    try:
        stored_events = await self._event_store.read_stream(stream_id, count=count)
    except StreamNotFoundError:
        raise AggregateNotFoundError(
            aggregate_type=self.aggregate_name,
            aggregate_id=aggregate_id,
        ) from None
    if self.max_stream_length is not None and len(stored_events) > self.max_stream_length:
        raise StreamTooLargeError(stream_id, self.max_stream_length)
    state = self._decider.initial_state()
    for stored in stored_events:
        state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
    version = len(stored_events) - 1
    return state, version
save async
save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,  # noqa: ARG002
    idempotency_key: str | None = None,
) -> int:
    if not events:
        return expected_version
    stream_id = self._stream_id(aggregate_id)
    envelopes = [
        EventEnvelope(
            domain_event=e,
            idempotency_key=f'{idempotency_key}:{i}' if idempotency_key else str(uuid.uuid4()),
        )
        for i, e in enumerate(events)
    ]
    expected = Exact(version=expected_version) if expected_version >= 0 else NoStream()
    return await self._event_store.append_to_stream(stream_id, envelopes, expected_version=expected)

SnapshotDeciderRepository

SnapshotDeciderRepository(
    decider,
    event_store,
    snapshot_store,
    snapshot_config_registry,
    state_serializer,
)

Bases: DeciderRepository[StateT, CommandT, EventT], ABC

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
    snapshot_store: ISnapshotStore,
    snapshot_config_registry: SnapshotConfigRegistry,
    state_serializer: ISnapshotStateSerializer,
) -> None:
    super().__init__(decider, event_store)
    self._snapshot_store = snapshot_store
    self._state_serializer = state_serializer
    self._last_snapshot_versions: dict[str, int] = {}
    self._state_type: type[StateT] = type(self._decider.initial_state())
    snapshot_config = snapshot_config_registry.get(self.aggregate_name)
    self._snapshot_strategy = snapshot_config.strategy
    self._snapshot_schema_version = snapshot_config.schema_version
    self._migration_chain = snapshot_config.migration_chain
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    snapshot = await self._snapshot_store.load(stream_id)

    if snapshot is not None:
        if snapshot.state_type != self._state_type.__name__:
            raise SnapshotTypeMismatchError(stream_id, self._state_type.__name__, snapshot.state_type)

        if snapshot.schema_version != self._snapshot_schema_version:
            snapshot = migrate_snapshot_or_discard(
                self._migration_chain,
                snapshot,
                self._snapshot_schema_version,
                stream_id,
            )
            if snapshot is None:
                self._last_snapshot_versions[aggregate_id] = -1
                return await super().load(aggregate_id)

        self._last_snapshot_versions[aggregate_id] = snapshot.version
        state = self._state_serializer.deserialize(snapshot.state, self._state_type)
        try:
            stored_events = await self._event_store.read_stream(stream_id, start=snapshot.version + 1)
        except StreamNotFoundError:
            stored_events = []
        for stored in stored_events:
            state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
        version = snapshot.version + len(stored_events)
        return state, version

    self._last_snapshot_versions[aggregate_id] = -1
    return await super().load(aggregate_id)
save async
save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,
    idempotency_key: str | None = None,
) -> int:
    new_version = await super().save(
        aggregate_id,
        events,
        expected_version,
        current_state=current_state,
        idempotency_key=idempotency_key,
    )

    if events:
        last_snapshot_version = self._last_snapshot_versions.get(aggregate_id, -1)
        events_since_snapshot = new_version - last_snapshot_version
        if self._snapshot_strategy.should_snapshot(new_version, events_since_snapshot):
            if current_state is not None:
                state = current_state
            else:
                state, _ = await self.load(aggregate_id)
            state_data = self._state_serializer.serialize(state)
            new_snapshot = Snapshot(
                stream_id=self._stream_id(aggregate_id),
                state=state_data,
                version=new_version,
                state_type=self._state_type.__name__,
                schema_version=self._snapshot_schema_version,
            )
            await self._snapshot_store.save(new_snapshot)
            self._last_snapshot_versions[aggregate_id] = new_version

    return new_version

handler

StateT module-attribute
StateT = TypeVar('StateT', default=object)
CommandT module-attribute
CommandT = TypeVar('CommandT', default=object)
EventT module-attribute
EventT = TypeVar(
    'EventT', bound=INotification, default=INotification
)
DeciderCommandHandler
DeciderCommandHandler(repository, decider, publisher)

Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)
DeciderVoidCommandHandler
DeciderVoidCommandHandler(repository, decider, publisher)

Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/handler.py
def __init__(
    self,
    repository: DeciderRepository[StateT, CommandT, EventT],
    decider: IDecider[StateT, CommandT, EventT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._decider = decider
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/decider/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)
    command = self._to_command(request)

    if self._is_creation_command(request):
        state = self._decider.initial_state()
        version = -1
    else:
        state, version = await self._repository.load(aggregate_id)

    events = self._decider.decide(command, state)

    for event in events:
        state = self._decider.evolve(state, event)

    new_version = await self._repository.save(
        aggregate_id,
        events,
        version,
        current_state=state,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(state, new_version)

repository

DeciderRepository
DeciderRepository(decider, event_store)

Bases: ABC, Generic[StateT, CommandT, EventT]

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
) -> None:
    self._decider = decider
    self._event_store = event_store
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    count = self.max_stream_length + 1 if self.max_stream_length is not None else None
    try:
        stored_events = await self._event_store.read_stream(stream_id, count=count)
    except StreamNotFoundError:
        raise AggregateNotFoundError(
            aggregate_type=self.aggregate_name,
            aggregate_id=aggregate_id,
        ) from None
    if self.max_stream_length is not None and len(stored_events) > self.max_stream_length:
        raise StreamTooLargeError(stream_id, self.max_stream_length)
    state = self._decider.initial_state()
    for stored in stored_events:
        state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
    version = len(stored_events) - 1
    return state, version
save async
save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,  # noqa: ARG002
    idempotency_key: str | None = None,
) -> int:
    if not events:
        return expected_version
    stream_id = self._stream_id(aggregate_id)
    envelopes = [
        EventEnvelope(
            domain_event=e,
            idempotency_key=f'{idempotency_key}:{i}' if idempotency_key else str(uuid.uuid4()),
        )
        for i, e in enumerate(events)
    ]
    expected = Exact(version=expected_version) if expected_version >= 0 else NoStream()
    return await self._event_store.append_to_stream(stream_id, envelopes, expected_version=expected)
SnapshotDeciderRepository
SnapshotDeciderRepository(
    decider,
    event_store,
    snapshot_store,
    snapshot_config_registry,
    state_serializer,
)

Bases: DeciderRepository[StateT, CommandT, EventT], ABC

Source code in src/waku/eventsourcing/decider/repository.py
def __init__(
    self,
    decider: IDecider[StateT, CommandT, EventT],
    event_store: IEventStore,
    snapshot_store: ISnapshotStore,
    snapshot_config_registry: SnapshotConfigRegistry,
    state_serializer: ISnapshotStateSerializer,
) -> None:
    super().__init__(decider, event_store)
    self._snapshot_store = snapshot_store
    self._state_serializer = state_serializer
    self._last_snapshot_versions: dict[str, int] = {}
    self._state_type: type[StateT] = type(self._decider.initial_state())
    snapshot_config = snapshot_config_registry.get(self.aggregate_name)
    self._snapshot_strategy = snapshot_config.strategy
    self._snapshot_schema_version = snapshot_config.schema_version
    self._migration_chain = snapshot_config.migration_chain
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/decider/repository.py
async def load(self, aggregate_id: str) -> tuple[StateT, int]:
    stream_id = self._stream_id(aggregate_id)
    snapshot = await self._snapshot_store.load(stream_id)

    if snapshot is not None:
        if snapshot.state_type != self._state_type.__name__:
            raise SnapshotTypeMismatchError(stream_id, self._state_type.__name__, snapshot.state_type)

        if snapshot.schema_version != self._snapshot_schema_version:
            snapshot = migrate_snapshot_or_discard(
                self._migration_chain,
                snapshot,
                self._snapshot_schema_version,
                stream_id,
            )
            if snapshot is None:
                self._last_snapshot_versions[aggregate_id] = -1
                return await super().load(aggregate_id)

        self._last_snapshot_versions[aggregate_id] = snapshot.version
        state = self._state_serializer.deserialize(snapshot.state, self._state_type)
        try:
            stored_events = await self._event_store.read_stream(stream_id, start=snapshot.version + 1)
        except StreamNotFoundError:
            stored_events = []
        for stored in stored_events:
            state = self._decider.evolve(state, stored.data)  # type: ignore[arg-type]
        version = snapshot.version + len(stored_events)
        return state, version

    self._last_snapshot_versions[aggregate_id] = -1
    return await super().load(aggregate_id)
save async
save(
    aggregate_id,
    events,
    expected_version,
    *,
    current_state=None,
    idempotency_key=None,
)
Source code in src/waku/eventsourcing/decider/repository.py
async def save(
    self,
    aggregate_id: str,
    events: typing.Sequence[EventT],
    expected_version: int,
    *,
    current_state: StateT | None = None,
    idempotency_key: str | None = None,
) -> int:
    new_version = await super().save(
        aggregate_id,
        events,
        expected_version,
        current_state=current_state,
        idempotency_key=idempotency_key,
    )

    if events:
        last_snapshot_version = self._last_snapshot_versions.get(aggregate_id, -1)
        events_since_snapshot = new_version - last_snapshot_version
        if self._snapshot_strategy.should_snapshot(new_version, events_since_snapshot):
            if current_state is not None:
                state = current_state
            else:
                state, _ = await self.load(aggregate_id)
            state_data = self._state_serializer.serialize(state)
            new_snapshot = Snapshot(
                stream_id=self._stream_id(aggregate_id),
                state=state_data,
                version=new_version,
                state_type=self._state_type.__name__,
                schema_version=self._snapshot_schema_version,
            )
            await self._snapshot_store.save(new_snapshot)
            self._last_snapshot_versions[aggregate_id] = new_version

    return new_version

exceptions

EventSourcingError

Bases: WakuError

StreamNotFoundError

StreamNotFoundError(stream_id)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId) -> None:
    self.stream_id = stream_id
    super().__init__(f'Stream {stream_id} not found')
stream_id instance-attribute
stream_id = stream_id

ConcurrencyConflictError

ConcurrencyConflictError(
    stream_id, expected_version, actual_version
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, expected_version: int, actual_version: int) -> None:
    self.stream_id = stream_id
    self.expected_version = expected_version
    self.actual_version = actual_version
    super().__init__(
        f'Concurrency conflict on stream {stream_id}: expected version {expected_version}, actual {actual_version}'
    )
stream_id instance-attribute
stream_id = stream_id
expected_version instance-attribute
expected_version = expected_version
actual_version instance-attribute
actual_version = actual_version

AggregateNotFoundError

AggregateNotFoundError(aggregate_type, aggregate_id)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_type: str, aggregate_id: str) -> None:
    self.aggregate_type = aggregate_type
    self.aggregate_id = aggregate_id
    super().__init__(f'{aggregate_type} with id {aggregate_id!r} not found')
aggregate_type instance-attribute
aggregate_type = aggregate_type
aggregate_id instance-attribute
aggregate_id = aggregate_id

DuplicateAggregateNameError

DuplicateAggregateNameError(aggregate_name, repositories)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_name: str, repositories: list[type]) -> None:
    self.aggregate_name = aggregate_name
    self.repositories = repositories
    repo_names = ', '.join(r.__name__ for r in repositories)
    super().__init__(f'Duplicate aggregate name {aggregate_name!r} used by multiple repositories: {repo_names}')
aggregate_name instance-attribute
aggregate_name = aggregate_name
repositories instance-attribute
repositories = repositories

UnknownEventTypeError

UnknownEventTypeError(event_type_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, event_type_name: str) -> None:
    self.event_type_name = event_type_name
    super().__init__(f'Unknown event type: {event_type_name!r}')
event_type_name instance-attribute
event_type_name = event_type_name

DuplicateEventTypeError

DuplicateEventTypeError(event_type_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, event_type_name: str) -> None:
    self.event_type_name = event_type_name
    super().__init__(f'Event type {event_type_name!r} is already registered')
event_type_name instance-attribute
event_type_name = event_type_name

ConflictingEventTypeError

ConflictingEventTypeError(
    event_type,
    existing_name,
    existing_version,
    attempted_name,
    attempted_version,
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(
    self,
    event_type: type,
    existing_name: str,
    existing_version: int,
    attempted_name: str,
    attempted_version: int,
) -> None:
    self.event_type = event_type
    self.existing_name = existing_name
    self.existing_version = existing_version
    self.attempted_name = attempted_name
    self.attempted_version = attempted_version
    if existing_name != attempted_name:
        detail = f'name {existing_name!r} → {attempted_name!r}'
    else:
        detail = f'version v{existing_version} → v{attempted_version}'
    super().__init__(f'Conflicting registration for event type {event_type.__name__!r}: {detail}')
event_type instance-attribute
event_type = event_type
existing_name instance-attribute
existing_name = existing_name
existing_version instance-attribute
existing_version = existing_version
attempted_name instance-attribute
attempted_name = attempted_name
attempted_version instance-attribute
attempted_version = attempted_version

SnapshotTypeMismatchError

SnapshotTypeMismatchError(
    stream_id, expected_type, actual_type
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, expected_type: str, actual_type: str) -> None:
    self.stream_id = stream_id
    self.expected_type = expected_type
    self.actual_type = actual_type
    super().__init__(
        f'Snapshot type mismatch on stream {stream_id}: expected {expected_type!r}, got {actual_type!r}'
    )
stream_id instance-attribute
stream_id = stream_id
expected_type instance-attribute
expected_type = expected_type
actual_type instance-attribute
actual_type = actual_type

StreamTooLargeError

StreamTooLargeError(stream_id, max_length)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, max_length: int) -> None:
    self.stream_id = stream_id
    self.max_length = max_length
    super().__init__(
        f'Stream {stream_id} exceeds maximum length of {max_length} events. '
        f'Configure snapshots to reduce stream replay size.'
    )
stream_id instance-attribute
stream_id = stream_id
max_length instance-attribute
max_length = max_length

RegistryFrozenError

RegistryFrozenError()

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self) -> None:
    super().__init__('Cannot register event types after registry is frozen')

ProjectionError

ProjectionStoppedError

ProjectionStoppedError(projection_name, cause)

Bases: ProjectionError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, projection_name: str, cause: Exception) -> None:
    self.projection_name = projection_name
    self.cause = cause
    super().__init__(f'Projection {projection_name!r} stopped due to error: {cause}')
projection_name instance-attribute
projection_name = projection_name
cause instance-attribute
cause = cause

RetryExhaustedError

RetryExhaustedError(projection_name, attempts, cause)

Bases: ProjectionError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, projection_name: str, attempts: int, cause: Exception) -> None:
    self.projection_name = projection_name
    self.attempts = attempts
    self.cause = cause
    super().__init__(f'Projection {projection_name!r} exhausted {attempts} retry attempts: {cause}')
projection_name instance-attribute
projection_name = projection_name
attempts instance-attribute
attempts = attempts
cause instance-attribute
cause = cause

DuplicateIdempotencyKeyError

DuplicateIdempotencyKeyError(stream_id, *, reason)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, *, reason: str) -> None:
    self.stream_id = stream_id
    self.reason = reason
    super().__init__(f'Duplicate idempotency keys ({reason}) on stream {stream_id}')
stream_id instance-attribute
stream_id = stream_id
reason instance-attribute
reason = reason

PartialDuplicateAppendError

PartialDuplicateAppendError(
    stream_id, existing_count, total_count
)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, stream_id: StreamId, existing_count: int, total_count: int) -> None:
    self.stream_id = stream_id
    self.existing_count = existing_count
    self.total_count = total_count
    super().__init__(
        f'Partial duplicate append on stream {stream_id}: '
        f'{existing_count} of {total_count} idempotency keys already exist'
    )
stream_id instance-attribute
stream_id = stream_id
existing_count instance-attribute
existing_count = existing_count
total_count instance-attribute
total_count = total_count

SnapshotConfigNotFoundError

SnapshotConfigNotFoundError(aggregate_name)

Bases: EventSourcingError

Source code in src/waku/eventsourcing/exceptions.py
def __init__(self, aggregate_name: str) -> None:
    self.aggregate_name = aggregate_name
    super().__init__(
        f'No snapshot config found for aggregate {aggregate_name!r}. '
        f'Provide snapshot=SnapshotOptions(...) via bind_aggregate() or bind_decider().'
    )
aggregate_name instance-attribute
aggregate_name = aggregate_name

SnapshotMigrationChainError

UpcasterChainError

handler

AggregateT module-attribute

AggregateT = TypeVar(
    'AggregateT',
    bound=EventSourcedAggregate,
    default=EventSourcedAggregate,
)

EventSourcedCommandHandler

EventSourcedCommandHandler(repository, publisher)

Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, AggregateT]

Source code in src/waku/eventsourcing/handler.py
def __init__(
    self,
    repository: EventSourcedRepository[AggregateT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)

    if self._is_creation_command(request):
        aggregate = self._repository.create_aggregate()
    else:
        aggregate = await self._repository.load(aggregate_id)

    await self._execute(request, aggregate)
    _, events = await self._repository.save(
        aggregate_id,
        aggregate,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(aggregate)

EventSourcedVoidCommandHandler

EventSourcedVoidCommandHandler(repository, publisher)

Bases: EventSourcedCommandHandler[RequestT, None, AggregateT], ABC, Generic[RequestT, AggregateT]

Source code in src/waku/eventsourcing/handler.py
def __init__(
    self,
    repository: EventSourcedRepository[AggregateT],
    publisher: IPublisher,
) -> None:
    self._repository = repository
    self._publisher = publisher
handle async
handle(request)
Source code in src/waku/eventsourcing/handler.py
async def handle(self, request: RequestT, /) -> ResponseT:
    aggregate_id = self._aggregate_id(request)

    if self._is_creation_command(request):
        aggregate = self._repository.create_aggregate()
    else:
        aggregate = await self._repository.load(aggregate_id)

    await self._execute(request, aggregate)
    _, events = await self._repository.save(
        aggregate_id,
        aggregate,
        idempotency_key=self._idempotency_key(request),
    )

    for event in events:
        await self._publisher.publish(event)

    return self._to_response(aggregate)

modules

EventTypeSpec module-attribute

EventTypeSpec = 'type[INotification] | EventType'

EventType dataclass

EventType(
    event_type,
    *,
    name=None,
    aliases=(),
    version=1,
    upcasters=(),
)
event_type instance-attribute
event_type
name class-attribute instance-attribute
name = field(default=None, kw_only=True)
aliases class-attribute instance-attribute
aliases = field(default=(), kw_only=True)
version class-attribute instance-attribute
version = field(default=1, kw_only=True)
upcasters class-attribute instance-attribute
upcasters = field(default=(), kw_only=True)

SnapshotOptions dataclass

SnapshotOptions(
    *, strategy, schema_version=1, migrations=()
)
strategy instance-attribute
strategy
schema_version class-attribute instance-attribute
schema_version = 1
migrations class-attribute instance-attribute
migrations = ()

AggregateBinding dataclass

AggregateBinding(
    *,
    repository,
    event_types=(),
    projections=(),
    snapshot=None,
)
repository instance-attribute
repository
event_types class-attribute instance-attribute
event_types = ()
projections class-attribute instance-attribute
projections = ()
snapshot class-attribute instance-attribute
snapshot = None

DeciderBinding dataclass

DeciderBinding(
    *,
    repository,
    decider,
    event_types=(),
    projections=(),
    snapshot=None,
)
repository instance-attribute
repository
decider instance-attribute
decider
event_types class-attribute instance-attribute
event_types = ()
projections class-attribute instance-attribute
projections = ()
snapshot class-attribute instance-attribute
snapshot = None

EventSourcingConfig dataclass

EventSourcingConfig(
    *,
    store=None,
    event_serializer=None,
    snapshot_store=None,
    snapshot_state_serializer=None,
    checkpoint_store=None,
    enrichers=(),
)
store class-attribute instance-attribute
store = None
event_serializer class-attribute instance-attribute
event_serializer = None
snapshot_store class-attribute instance-attribute
snapshot_store = None
snapshot_state_serializer class-attribute instance-attribute
snapshot_state_serializer = None
checkpoint_store class-attribute instance-attribute
checkpoint_store = None
enrichers class-attribute instance-attribute
enrichers = ()

EventSourcingRegistry dataclass

EventSourcingRegistry(
    projection_types=list(),
    catch_up_projection_types=list(),
    event_type_bindings=list(),
)
projection_types class-attribute instance-attribute
projection_types = field(default_factory=list)
catch_up_projection_types class-attribute instance-attribute
catch_up_projection_types = field(default_factory=list)
event_type_bindings class-attribute instance-attribute
event_type_bindings = field(default_factory=list)
merge
merge(other)
Source code in src/waku/eventsourcing/modules.py
def merge(self, other: EventSourcingRegistry) -> None:
    self._check_not_frozen()
    self.projection_types.extend(other.projection_types)
    self.catch_up_projection_types.extend(other.catch_up_projection_types)
    self.event_type_bindings.extend(other.event_type_bindings)
freeze
freeze()
Source code in src/waku/eventsourcing/modules.py
def freeze(self) -> None:
    self._frozen = True
handler_providers
handler_providers()
Source code in src/waku/eventsourcing/modules.py
def handler_providers(self) -> Iterator[Provider]:
    if self.projection_types:
        yield many(IProjection, *self.projection_types, collect=False)
    if self.catch_up_projection_types:
        yield many(ICatchUpProjection, *self.catch_up_projection_types, collect=False)
collector_providers staticmethod
collector_providers()
Source code in src/waku/eventsourcing/modules.py
@staticmethod
def collector_providers() -> Iterator[Provider]:
    yield many(IProjection, collect=True)
    yield many(ICatchUpProjection, collect=True)

EventSourcingModule

register classmethod
register(config=None)
Source code in src/waku/eventsourcing/modules.py
@classmethod
def register(cls, config: EventSourcingConfig | None = None, /) -> DynamicModule:
    config_ = config or EventSourcingConfig()
    providers: list[Provider] = []

    if config_.store is not None:
        providers.append(scoped(IEventStore, config_.store))
    else:
        providers.append(scoped(WithParents[IEventStore], InMemoryEventStore))  # ty:ignore[not-subscriptable]

    if config_.event_serializer is not None:
        providers.append(scoped(IEventSerializer, config_.event_serializer))

    if config_.snapshot_store is not None:
        providers.append(scoped(ISnapshotStore, config_.snapshot_store))

    if config_.snapshot_state_serializer is not None:
        providers.append(scoped(ISnapshotStateSerializer, config_.snapshot_state_serializer))

    if config_.checkpoint_store is not None:
        providers.append(scoped(ICheckpointStore, config_.checkpoint_store))

    providers.append(many(IMetadataEnricher, *config_.enrichers))

    return DynamicModule(
        parent_module=cls,
        providers=providers,
        extensions=[
            EventSourcingRegistryAggregator(has_serializer=config_.event_serializer is not None),
        ],
        is_global=True,
    )

EventSourcingExtension dataclass

EventSourcingExtension()

Bases: OnModuleConfigure

registry property
registry
bind_aggregate
bind_aggregate(
    repository,
    event_types=(),
    projections=(),
    snapshot=None,
)
Source code in src/waku/eventsourcing/modules.py
def bind_aggregate(
    self,
    repository: type[EventSourcedRepository[Any]],
    event_types: Sequence[EventTypeSpec] = (),
    projections: Sequence[type[IProjection]] = (),
    snapshot: SnapshotOptions | None = None,
) -> Self:
    self._bindings.append(
        AggregateBinding(
            repository=repository,
            event_types=event_types,
            projections=projections,
            snapshot=snapshot,
        )
    )
    self._registry.projection_types.extend(projections)
    self._registry.event_type_bindings.extend(event_types)
    return self
bind_decider
bind_decider(
    repository,
    decider,
    event_types=(),
    projections=(),
    snapshot=None,
)
Source code in src/waku/eventsourcing/modules.py
def bind_decider(
    self,
    repository: type[DeciderRepository[Any, Any, Any]],
    decider: type[IDecider[Any, Any, Any]],
    event_types: Sequence[EventTypeSpec] = (),
    projections: Sequence[type[IProjection]] = (),
    snapshot: SnapshotOptions | None = None,
) -> Self:
    self._decider_bindings.append(
        DeciderBinding(
            repository=repository,
            decider=decider,
            event_types=event_types,
            projections=projections,
            snapshot=snapshot,
        )
    )
    self._registry.projection_types.extend(projections)
    self._registry.event_type_bindings.extend(event_types)
    return self
bind_catch_up_projections
bind_catch_up_projections(projections)
Source code in src/waku/eventsourcing/modules.py
def bind_catch_up_projections(self, projections: Sequence[type[ICatchUpProjection]]) -> Self:
    self._registry.catch_up_projection_types.extend(projections)
    return self
aggregate_names
aggregate_names()
Source code in src/waku/eventsourcing/modules.py
def aggregate_names(self) -> Iterator[tuple[str, type]]:
    for binding in self._bindings:
        yield binding.repository.aggregate_name, binding.repository
    for binding in self._decider_bindings:
        yield binding.repository.aggregate_name, binding.repository
snapshot_bindings
snapshot_bindings()
Source code in src/waku/eventsourcing/modules.py
def snapshot_bindings(self) -> Iterator[tuple[str, SnapshotOptions]]:
    for binding in self._bindings:
        if binding.snapshot is not None:
            yield binding.repository.aggregate_name, binding.snapshot
    for binding in self._decider_bindings:
        if binding.snapshot is not None:
            yield binding.repository.aggregate_name, binding.snapshot
on_module_configure
on_module_configure(metadata)
Source code in src/waku/eventsourcing/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    for binding in self._bindings:
        repo_type = binding.repository
        metadata.providers.append(scoped(WithParents[repo_type], repo_type))  # type: ignore[misc,valid-type]

    for binding in self._decider_bindings:
        repo_type = binding.repository
        metadata.providers.append(scoped(WithParents[repo_type], repo_type))  # type: ignore[misc,valid-type]
        decider_iface = self._resolve_decider_interface(repo_type)
        metadata.providers.append(scoped(decider_iface, binding.decider))

EventSourcingRegistryAggregator

EventSourcingRegistryAggregator(*, has_serializer=False)

Bases: OnModuleRegistration

Source code in src/waku/eventsourcing/modules.py
def __init__(self, *, has_serializer: bool = False) -> None:
    self._has_serializer = has_serializer
on_module_registration
on_module_registration(registry, owning_module, context)
Source code in src/waku/eventsourcing/modules.py
@override
def on_module_registration(
    self,
    registry: ModuleMetadataRegistry,
    owning_module: ModuleType,
    context: Mapping[Any, Any] | None,
) -> None:
    aggregated = EventSourcingRegistry()
    all_aggregate_names: dict[str, list[type]] = {}

    for module_type, ext in registry.find_extensions(EventSourcingExtension):
        aggregated.merge(ext.registry)
        for provider in ext.registry.handler_providers():
            registry.add_provider(module_type, provider)
        for name, repo_type in ext.aggregate_names():
            all_aggregate_names.setdefault(name, []).append(repo_type)

    for name, repos in all_aggregate_names.items():
        if len(repos) > 1:
            raise DuplicateAggregateNameError(name, repos)

    for provider in aggregated.collector_providers():
        registry.add_provider(owning_module, provider)

    aggregated.freeze()
    registry.add_provider(owning_module, object_(aggregated))

    event_type_registry, upcaster_chain = self._build_type_registry(aggregated)
    registry.add_provider(owning_module, object_(event_type_registry))
    registry.add_provider(owning_module, object_(upcaster_chain))

    snapshot_config_registry = self._build_snapshot_config_registry(
        registry.find_extensions(EventSourcingExtension),
    )
    registry.add_provider(owning_module, object_(snapshot_config_registry))

    if self._has_serializer and len(event_type_registry) == 0:
        warnings.warn(
            'A serializer is configured but no event types were registered via '
            'bind_aggregate(event_types=[...]). Deserialization will fail at runtime.',
            UserWarning,
            stacklevel=1,
        )

projection

Checkpoint dataclass

Checkpoint(*, projection_name, position, updated_at)
projection_name instance-attribute
projection_name
position instance-attribute
position
updated_at instance-attribute
updated_at

CatchUpProjectionConfig dataclass

CatchUpProjectionConfig(
    *,
    batch_size=100,
    max_attempts=3,
    base_retry_delay_seconds=10.0,
    max_retry_delay_seconds=300.0,
    poll_interval_min_seconds=0.5,
    poll_interval_max_seconds=5.0,
    poll_interval_step_seconds=1.0,
    poll_interval_jitter_factor=0.1,
)
batch_size class-attribute instance-attribute
batch_size = 100
max_attempts class-attribute instance-attribute
max_attempts = 3
base_retry_delay_seconds class-attribute instance-attribute
base_retry_delay_seconds = 10.0
max_retry_delay_seconds class-attribute instance-attribute
max_retry_delay_seconds = 300.0
poll_interval_min_seconds class-attribute instance-attribute
poll_interval_min_seconds = 0.5
poll_interval_max_seconds class-attribute instance-attribute
poll_interval_max_seconds = 5.0
poll_interval_step_seconds class-attribute instance-attribute
poll_interval_step_seconds = 1.0
poll_interval_jitter_factor class-attribute instance-attribute
poll_interval_jitter_factor = 0.1

LeaseConfig dataclass

LeaseConfig(
    *, ttl_seconds=30.0, renew_interval_factor=1 / 3
)
ttl_seconds class-attribute instance-attribute
ttl_seconds = 30.0
renew_interval_factor class-attribute instance-attribute
renew_interval_factor = 1 / 3
renew_interval_seconds property
renew_interval_seconds

InMemoryCheckpointStore

InMemoryCheckpointStore()

Bases: ICheckpointStore

Source code in src/waku/eventsourcing/projection/in_memory.py
def __init__(self) -> None:
    self._checkpoints: dict[str, Checkpoint] = {}
load async
load(projection_name)
Source code in src/waku/eventsourcing/projection/in_memory.py
async def load(self, projection_name: str, /) -> Checkpoint | None:
    return self._checkpoints.get(projection_name)
save async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/in_memory.py
async def save(self, checkpoint: Checkpoint, /) -> None:
    self._checkpoints[checkpoint.projection_name] = checkpoint

ErrorPolicy

Bases: Enum

RETRY class-attribute instance-attribute
RETRY = 'retry'
SKIP class-attribute instance-attribute
SKIP = 'skip'
STOP class-attribute instance-attribute
STOP = 'stop'

ICatchUpProjection

Bases: IProjection

projection_name class-attribute
projection_name
error_policy class-attribute
error_policy = RETRY
project abstractmethod async
project(events)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def project(self, events: Sequence[StoredEvent], /) -> None: ...
teardown async
teardown()
Source code in src/waku/eventsourcing/projection/interfaces.py
async def teardown(self) -> None:
    pass

ICheckpointStore

Bases: ABC

load abstractmethod async
load(projection_name)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def load(self, projection_name: str, /) -> Checkpoint | None: ...
save abstractmethod async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def save(self, checkpoint: Checkpoint, /) -> None: ...

IProjection

Bases: ABC

projection_name class-attribute
projection_name
project abstractmethod async
project(events)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def project(self, events: Sequence[StoredEvent], /) -> None: ...

CatchUpProjectionRunner

CatchUpProjectionRunner(
    container, lock, projection_types, config
)
Source code in src/waku/eventsourcing/projection/runner.py
def __init__(
    self,
    container: AsyncContainer,
    lock: IProjectionLock,
    projection_types: Sequence[type[ICatchUpProjection]],
    config: CatchUpProjectionConfig,
) -> None:
    self._container = container
    self._lock = lock
    self._specs = tuple(
        _ProjectionSpec(
            projection_type=pt,
            name=pt.projection_name,
            error_policy=pt.error_policy,
        )
        for pt in projection_types
    )
    self._config = config
    self._shutdown_event = anyio.Event()
run async
run()
Source code in src/waku/eventsourcing/projection/runner.py
async def run(self) -> None:
    if not self._specs:
        logger.warning('No catch-up projections registered, exiting')
        return

    async with anyio.create_task_group() as tg:
        tg.start_soon(self._signal_listener, tg.cancel_scope)
        tg.start_soon(self._run_all_projections, tg.cancel_scope)
rebuild async
rebuild(projection_name)
Source code in src/waku/eventsourcing/projection/runner.py
async def rebuild(self, projection_name: str) -> None:
    spec = self._find_spec(projection_name)

    async with self._lock.acquire(projection_name) as acquired:
        if not acquired:
            msg = f'Projection {projection_name!r} is locked by another instance'
            raise RuntimeError(msg)

        async with self._container() as scope:
            projection = await scope.get(spec.projection_type)
            await projection.teardown()

        processor = ProjectionProcessor(projection_name, spec.error_policy, self._config)

        async with self._container() as scope:
            checkpoint_store = await scope.get(ICheckpointStore)
            await processor.reset_checkpoint(checkpoint_store)

        while True:
            async with self._container() as scope:
                projection = await scope.get(spec.projection_type)
                reader = await scope.get(IEventReader)
                checkpoint_store = await scope.get(ICheckpointStore)
                processed = await processor.run_once(projection, reader, checkpoint_store)

            if processed == 0:
                break
request_shutdown
request_shutdown()
Source code in src/waku/eventsourcing/projection/runner.py
def request_shutdown(self) -> None:
    self._shutdown_event.set()

adaptive_interval

AdaptiveInterval
AdaptiveInterval(
    min_seconds,
    max_seconds,
    step_seconds,
    jitter_factor=0.1,
)

Fast when busy, slow when idle.

Source code in src/waku/eventsourcing/projection/adaptive_interval.py
def __init__(
    self,
    min_seconds: float,
    max_seconds: float,
    step_seconds: float,
    jitter_factor: float = 0.1,
) -> None:
    self._min = min_seconds
    self._max = max_seconds
    self._step = step_seconds
    self._jitter_factor = jitter_factor
    self._current = min_seconds
current property
current
current_with_jitter
current_with_jitter()
Source code in src/waku/eventsourcing/projection/adaptive_interval.py
def current_with_jitter(self) -> float:
    return self._current * random.uniform(1 - self._jitter_factor, 1 + self._jitter_factor)  # noqa: S311
on_work_done
on_work_done()
Source code in src/waku/eventsourcing/projection/adaptive_interval.py
def on_work_done(self) -> None:
    self._current = self._min
on_idle
on_idle()
Source code in src/waku/eventsourcing/projection/adaptive_interval.py
def on_idle(self) -> None:
    self._current = min(self._current + self._step, self._max)
calculate_backoff_with_jitter
calculate_backoff_with_jitter(
    attempt, base_delay_seconds, max_delay_seconds
)

Full jitter: random(0, min(base * 2^attempt, max_delay)).

Source code in src/waku/eventsourcing/projection/adaptive_interval.py
def calculate_backoff_with_jitter(
    attempt: int,
    base_delay_seconds: float,
    max_delay_seconds: float,
) -> float:
    """Full jitter: random(0, min(base * 2^attempt, max_delay))."""
    max_delay = min(base_delay_seconds * (2**attempt), max_delay_seconds)
    return random.uniform(0, max_delay)  # noqa: S311

checkpoint

Checkpoint dataclass
Checkpoint(*, projection_name, position, updated_at)
projection_name instance-attribute
projection_name
position instance-attribute
position
updated_at instance-attribute
updated_at

config

CatchUpProjectionConfig dataclass
CatchUpProjectionConfig(
    *,
    batch_size=100,
    max_attempts=3,
    base_retry_delay_seconds=10.0,
    max_retry_delay_seconds=300.0,
    poll_interval_min_seconds=0.5,
    poll_interval_max_seconds=5.0,
    poll_interval_step_seconds=1.0,
    poll_interval_jitter_factor=0.1,
)
batch_size class-attribute instance-attribute
batch_size = 100
max_attempts class-attribute instance-attribute
max_attempts = 3
base_retry_delay_seconds class-attribute instance-attribute
base_retry_delay_seconds = 10.0
max_retry_delay_seconds class-attribute instance-attribute
max_retry_delay_seconds = 300.0
poll_interval_min_seconds class-attribute instance-attribute
poll_interval_min_seconds = 0.5
poll_interval_max_seconds class-attribute instance-attribute
poll_interval_max_seconds = 5.0
poll_interval_step_seconds class-attribute instance-attribute
poll_interval_step_seconds = 1.0
poll_interval_jitter_factor class-attribute instance-attribute
poll_interval_jitter_factor = 0.1
LeaseConfig dataclass
LeaseConfig(
    *, ttl_seconds=30.0, renew_interval_factor=1 / 3
)
ttl_seconds class-attribute instance-attribute
ttl_seconds = 30.0
renew_interval_factor class-attribute instance-attribute
renew_interval_factor = 1 / 3
renew_interval_seconds property
renew_interval_seconds

in_memory

InMemoryCheckpointStore
InMemoryCheckpointStore()

Bases: ICheckpointStore

Source code in src/waku/eventsourcing/projection/in_memory.py
def __init__(self) -> None:
    self._checkpoints: dict[str, Checkpoint] = {}
load async
load(projection_name)
Source code in src/waku/eventsourcing/projection/in_memory.py
async def load(self, projection_name: str, /) -> Checkpoint | None:
    return self._checkpoints.get(projection_name)
save async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/in_memory.py
async def save(self, checkpoint: Checkpoint, /) -> None:
    self._checkpoints[checkpoint.projection_name] = checkpoint

interfaces

IProjection

Bases: ABC

projection_name class-attribute
projection_name
project abstractmethod async
project(events)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def project(self, events: Sequence[StoredEvent], /) -> None: ...
ErrorPolicy

Bases: Enum

RETRY class-attribute instance-attribute
RETRY = 'retry'
SKIP class-attribute instance-attribute
SKIP = 'skip'
STOP class-attribute instance-attribute
STOP = 'stop'
ICatchUpProjection

Bases: IProjection

error_policy class-attribute
error_policy = RETRY
projection_name class-attribute
projection_name
teardown async
teardown()
Source code in src/waku/eventsourcing/projection/interfaces.py
async def teardown(self) -> None:
    pass
project abstractmethod async
project(events)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def project(self, events: Sequence[StoredEvent], /) -> None: ...
ICheckpointStore

Bases: ABC

load abstractmethod async
load(projection_name)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def load(self, projection_name: str, /) -> Checkpoint | None: ...
save abstractmethod async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/interfaces.py
@abc.abstractmethod
async def save(self, checkpoint: Checkpoint, /) -> None: ...

lock

InMemoryProjectionLock
InMemoryProjectionLock()

Bases: IProjectionLock

Always acquires in single-process. Tracks held locks for testing.

Source code in src/waku/eventsourcing/projection/lock/in_memory.py
def __init__(self) -> None:
    self._held: set[str] = set()
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/in_memory.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    if projection_name in self._held:
        yield False
        return

    self._held.add(projection_name)
    try:
        yield True
    finally:
        self._held.discard(projection_name)
IProjectionLock

Bases: ABC

Lock abstraction ensuring only one catch-up projection instance runs at a time.

acquire abstractmethod async
acquire(projection_name)

Yields True if the lock was acquired, False if held by another instance.

Source code in src/waku/eventsourcing/projection/lock/interfaces.py
@abc.abstractmethod
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    """Yields True if the lock was acquired, False if held by another instance."""
    yield False  # pragma: no cover
in_memory
InMemoryProjectionLock
InMemoryProjectionLock()

Bases: IProjectionLock

Always acquires in single-process. Tracks held locks for testing.

Source code in src/waku/eventsourcing/projection/lock/in_memory.py
def __init__(self) -> None:
    self._held: set[str] = set()
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/in_memory.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    if projection_name in self._held:
        yield False
        return

    self._held.add(projection_name)
    try:
        yield True
    finally:
        self._held.discard(projection_name)
interfaces
IProjectionLock

Bases: ABC

Lock abstraction ensuring only one catch-up projection instance runs at a time.

acquire abstractmethod async
acquire(projection_name)

Yields True if the lock was acquired, False if held by another instance.

Source code in src/waku/eventsourcing/projection/lock/interfaces.py
@abc.abstractmethod
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    """Yields True if the lock was acquired, False if held by another instance."""
    yield False  # pragma: no cover
sqlalchemy
PostgresAdvisoryProjectionLock
PostgresAdvisoryProjectionLock(engine)

Bases: IProjectionLock

Session-level PostgreSQL advisory lock.

Holds a database connection for the entire duration of the lock because pg_advisory_lock is bound to the session — releasing the connection releases the lock. For long-running projections consider :class:PostgresLeaseProjectionLock which only connects during heartbeats.

Not compatible with PgBouncer in transaction-pooling mode.

Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
def __init__(self, engine: AsyncEngine) -> None:
    self._engine = engine
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    async with self._engine.connect() as conn:
        await conn.execution_options(isolation_level='AUTOCOMMIT')
        result = await conn.execute(_LOCK_SQL, {'name': projection_name})
        acquired = bool(result.scalar_one())

        if not acquired:
            yield False
            return

        logger.debug('Advisory lock acquired for %s', projection_name)
        try:
            yield True
        finally:
            try:
                await conn.execute(_UNLOCK_SQL, {'name': projection_name})
                logger.debug('Advisory lock released for %s', projection_name)
            except Exception:
                logger.warning('Failed to release advisory lock for %s', projection_name, exc_info=True)
PostgresLeaseProjectionLock
PostgresLeaseProjectionLock(engine, config)

Bases: IProjectionLock

Production lease-based projection lock backed by PostgreSQL.

Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
def __init__(self, engine: AsyncEngine, config: LeaseConfig) -> None:
    self._engine = engine
    self._config = config
    self._holder_id = str(uuid.uuid4())
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    async with self._engine.connect() as conn:
        await conn.execution_options(isolation_level='AUTOCOMMIT')
        result = await conn.execute(
            _UPSERT_SQL,
            {'name': projection_name, 'holder': self._holder_id, 'ttl': self._config.ttl_seconds},
        )
        row = result.fetchone()

    if row is None:
        yield False
        return

    logger.debug('Lease acquired for %s by %s', projection_name, self._holder_id)

    try:
        async with anyio.create_task_group() as tg:
            tg.start_soon(self._heartbeat, projection_name, tg.cancel_scope)
            try:
                yield True
            finally:
                tg.cancel_scope.cancel()
    finally:
        await self._release(projection_name)
bind_lease_tables
bind_lease_tables(metadata)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/tables.py
def bind_lease_tables(metadata: MetaData) -> Table:
    return es_projection_leases_table.to_metadata(metadata)
advisory
logger module-attribute
logger = getLogger(__name__)
PostgresAdvisoryProjectionLock
PostgresAdvisoryProjectionLock(engine)

Bases: IProjectionLock

Session-level PostgreSQL advisory lock.

Holds a database connection for the entire duration of the lock because pg_advisory_lock is bound to the session — releasing the connection releases the lock. For long-running projections consider :class:PostgresLeaseProjectionLock which only connects during heartbeats.

Not compatible with PgBouncer in transaction-pooling mode.

Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
def __init__(self, engine: AsyncEngine) -> None:
    self._engine = engine
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    async with self._engine.connect() as conn:
        await conn.execution_options(isolation_level='AUTOCOMMIT')
        result = await conn.execute(_LOCK_SQL, {'name': projection_name})
        acquired = bool(result.scalar_one())

        if not acquired:
            yield False
            return

        logger.debug('Advisory lock acquired for %s', projection_name)
        try:
            yield True
        finally:
            try:
                await conn.execute(_UNLOCK_SQL, {'name': projection_name})
                logger.debug('Advisory lock released for %s', projection_name)
            except Exception:
                logger.warning('Failed to release advisory lock for %s', projection_name, exc_info=True)
lock
logger module-attribute
logger = getLogger(__name__)
PostgresLeaseProjectionLock
PostgresLeaseProjectionLock(engine, config)

Bases: IProjectionLock

Production lease-based projection lock backed by PostgreSQL.

Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
def __init__(self, engine: AsyncEngine, config: LeaseConfig) -> None:
    self._engine = engine
    self._config = config
    self._holder_id = str(uuid.uuid4())
acquire async
acquire(projection_name)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
@contextlib.asynccontextmanager
async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
    async with self._engine.connect() as conn:
        await conn.execution_options(isolation_level='AUTOCOMMIT')
        result = await conn.execute(
            _UPSERT_SQL,
            {'name': projection_name, 'holder': self._holder_id, 'ttl': self._config.ttl_seconds},
        )
        row = result.fetchone()

    if row is None:
        yield False
        return

    logger.debug('Lease acquired for %s by %s', projection_name, self._holder_id)

    try:
        async with anyio.create_task_group() as tg:
            tg.start_soon(self._heartbeat, projection_name, tg.cancel_scope)
            try:
                yield True
            finally:
                tg.cancel_scope.cancel()
    finally:
        await self._release(projection_name)
tables
es_projection_leases_table module-attribute
es_projection_leases_table = Table(
    'es_projection_leases',
    _internal_metadata,
    Column('projection_name', Text, primary_key=True),
    Column('holder_id', Text, nullable=False),
    Column(
        'acquired_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
    ),
    Column(
        'renewed_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
    ),
    Column(
        'expires_at',
        TIMESTAMP(timezone=True),
        nullable=False,
    ),
)
bind_lease_tables
bind_lease_tables(metadata)
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/tables.py
def bind_lease_tables(metadata: MetaData) -> Table:
    return es_projection_leases_table.to_metadata(metadata)

processor

logger module-attribute
logger = getLogger(__name__)
ProjectionProcessor
ProjectionProcessor(projection_name, error_policy, config)
Source code in src/waku/eventsourcing/projection/processor.py
def __init__(self, projection_name: str, error_policy: ErrorPolicy, config: CatchUpProjectionConfig) -> None:
    self._projection_name = projection_name
    self._error_policy = error_policy
    self._config = config
    self._attempts: int = 0
projection_name property
projection_name
run_once async
run_once(projection, event_reader, checkpoint_store)
Source code in src/waku/eventsourcing/projection/processor.py
async def run_once(
    self,
    projection: ICatchUpProjection,
    event_reader: IEventReader,
    checkpoint_store: ICheckpointStore,
) -> int:
    checkpoint = await checkpoint_store.load(self._projection_name)
    position = checkpoint.position if checkpoint is not None else -1

    events = await event_reader.read_all(after_position=position, count=self._config.batch_size)
    if not events:
        return 0

    try:
        await projection.project(events)
    except Exception as exc:  # noqa: BLE001
        return await self._handle_error(exc, events[-1].global_position, checkpoint_store)

    await checkpoint_store.save(
        Checkpoint(
            projection_name=self._projection_name,
            position=events[-1].global_position,
            updated_at=datetime.now(UTC),
        ),
    )
    self._attempts = 0
    return len(events)
reset_checkpoint async
reset_checkpoint(checkpoint_store)
Source code in src/waku/eventsourcing/projection/processor.py
async def reset_checkpoint(self, checkpoint_store: ICheckpointStore) -> None:
    await checkpoint_store.save(
        Checkpoint(
            projection_name=self._projection_name,
            position=-1,
            updated_at=datetime.now(UTC),
        ),
    )

runner

logger module-attribute
logger = getLogger(__name__)
CatchUpProjectionRunner
CatchUpProjectionRunner(
    container, lock, projection_types, config
)
Source code in src/waku/eventsourcing/projection/runner.py
def __init__(
    self,
    container: AsyncContainer,
    lock: IProjectionLock,
    projection_types: Sequence[type[ICatchUpProjection]],
    config: CatchUpProjectionConfig,
) -> None:
    self._container = container
    self._lock = lock
    self._specs = tuple(
        _ProjectionSpec(
            projection_type=pt,
            name=pt.projection_name,
            error_policy=pt.error_policy,
        )
        for pt in projection_types
    )
    self._config = config
    self._shutdown_event = anyio.Event()
run async
run()
Source code in src/waku/eventsourcing/projection/runner.py
async def run(self) -> None:
    if not self._specs:
        logger.warning('No catch-up projections registered, exiting')
        return

    async with anyio.create_task_group() as tg:
        tg.start_soon(self._signal_listener, tg.cancel_scope)
        tg.start_soon(self._run_all_projections, tg.cancel_scope)
rebuild async
rebuild(projection_name)
Source code in src/waku/eventsourcing/projection/runner.py
async def rebuild(self, projection_name: str) -> None:
    spec = self._find_spec(projection_name)

    async with self._lock.acquire(projection_name) as acquired:
        if not acquired:
            msg = f'Projection {projection_name!r} is locked by another instance'
            raise RuntimeError(msg)

        async with self._container() as scope:
            projection = await scope.get(spec.projection_type)
            await projection.teardown()

        processor = ProjectionProcessor(projection_name, spec.error_policy, self._config)

        async with self._container() as scope:
            checkpoint_store = await scope.get(ICheckpointStore)
            await processor.reset_checkpoint(checkpoint_store)

        while True:
            async with self._container() as scope:
                projection = await scope.get(spec.projection_type)
                reader = await scope.get(IEventReader)
                checkpoint_store = await scope.get(ICheckpointStore)
                processed = await processor.run_once(projection, reader, checkpoint_store)

            if processed == 0:
                break
request_shutdown
request_shutdown()
Source code in src/waku/eventsourcing/projection/runner.py
def request_shutdown(self) -> None:
    self._shutdown_event.set()

sqlalchemy

SqlAlchemyCheckpointStore
SqlAlchemyCheckpointStore(session, checkpoints_table)

Bases: ICheckpointStore

Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
def __init__(self, session: AsyncSession, checkpoints_table: Table) -> None:
    self._session = session
    self._checkpoints = checkpoints_table
load async
load(projection_name)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
async def load(self, projection_name: str, /) -> Checkpoint | None:
    query = select(self._checkpoints).where(self._checkpoints.c.projection_name == projection_name)
    result = await self._session.execute(query)
    row: Any = result.one_or_none()
    if row is None:
        return None
    return Checkpoint(
        projection_name=row.projection_name,
        position=row.position,
        updated_at=row.updated_at,
    )
save async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
async def save(self, checkpoint: Checkpoint, /) -> None:
    stmt = pg_insert(self._checkpoints).values(
        projection_name=checkpoint.projection_name,
        position=checkpoint.position,
        updated_at=checkpoint.updated_at,
    )
    stmt = stmt.on_conflict_do_update(
        index_elements=['projection_name'],
        set_={
            'position': stmt.excluded.position,
            'updated_at': stmt.excluded.updated_at,
        },
    )
    await self._session.execute(stmt)
    await self._session.flush()
make_sqlalchemy_checkpoint_store
make_sqlalchemy_checkpoint_store(checkpoints_table)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
def make_sqlalchemy_checkpoint_store(
    checkpoints_table: Table,
) -> Callable[..., SqlAlchemyCheckpointStore]:
    def factory(session: AsyncSession) -> SqlAlchemyCheckpointStore:
        return SqlAlchemyCheckpointStore(session, checkpoints_table)

    return factory
bind_checkpoint_tables
bind_checkpoint_tables(metadata)
Source code in src/waku/eventsourcing/projection/sqlalchemy/tables.py
def bind_checkpoint_tables(metadata: MetaData) -> Table:
    return es_checkpoints_table.to_metadata(metadata)
store
SqlAlchemyCheckpointStore
SqlAlchemyCheckpointStore(session, checkpoints_table)

Bases: ICheckpointStore

Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
def __init__(self, session: AsyncSession, checkpoints_table: Table) -> None:
    self._session = session
    self._checkpoints = checkpoints_table
load async
load(projection_name)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
async def load(self, projection_name: str, /) -> Checkpoint | None:
    query = select(self._checkpoints).where(self._checkpoints.c.projection_name == projection_name)
    result = await self._session.execute(query)
    row: Any = result.one_or_none()
    if row is None:
        return None
    return Checkpoint(
        projection_name=row.projection_name,
        position=row.position,
        updated_at=row.updated_at,
    )
save async
save(checkpoint)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
async def save(self, checkpoint: Checkpoint, /) -> None:
    stmt = pg_insert(self._checkpoints).values(
        projection_name=checkpoint.projection_name,
        position=checkpoint.position,
        updated_at=checkpoint.updated_at,
    )
    stmt = stmt.on_conflict_do_update(
        index_elements=['projection_name'],
        set_={
            'position': stmt.excluded.position,
            'updated_at': stmt.excluded.updated_at,
        },
    )
    await self._session.execute(stmt)
    await self._session.flush()
make_sqlalchemy_checkpoint_store
make_sqlalchemy_checkpoint_store(checkpoints_table)
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
def make_sqlalchemy_checkpoint_store(
    checkpoints_table: Table,
) -> Callable[..., SqlAlchemyCheckpointStore]:
    def factory(session: AsyncSession) -> SqlAlchemyCheckpointStore:
        return SqlAlchemyCheckpointStore(session, checkpoints_table)

    return factory
tables
es_checkpoints_table module-attribute
es_checkpoints_table = Table(
    'es_checkpoints',
    _internal_metadata,
    Column('projection_name', Text, primary_key=True),
    Column('position', BigInteger, nullable=False),
    Column(
        'updated_at',
        TIMESTAMP(timezone=True),
        nullable=False,
    ),
    Column(
        'created_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
    ),
)
bind_checkpoint_tables
bind_checkpoint_tables(metadata)
Source code in src/waku/eventsourcing/projection/sqlalchemy/tables.py
def bind_checkpoint_tables(metadata: MetaData) -> Table:
    return es_checkpoints_table.to_metadata(metadata)

repository

AggregateT module-attribute

AggregateT = TypeVar(
    'AggregateT', bound=EventSourcedAggregate
)

EventSourcedRepository

EventSourcedRepository(event_store)

Bases: ABC, Generic[AggregateT]

Source code in src/waku/eventsourcing/repository.py
def __init__(self, event_store: IEventStore) -> None:
    self._event_store = event_store
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/repository.py
async def load(self, aggregate_id: str) -> AggregateT:
    stream_id = self._stream_id(aggregate_id)
    count = self.max_stream_length + 1 if self.max_stream_length is not None else None
    try:
        stored_events = await self._event_store.read_stream(stream_id, count=count)
    except StreamNotFoundError:
        raise AggregateNotFoundError(
            aggregate_type=self.aggregate_name,
            aggregate_id=aggregate_id,
        ) from None
    if self.max_stream_length is not None and len(stored_events) > self.max_stream_length:
        raise StreamTooLargeError(stream_id, self.max_stream_length)
    if not stored_events:
        msg = f'Stream contains no events: {stream_id}'
        raise EventSourcingError(msg)
    aggregate = self.create_aggregate()
    domain_events = [e.data for e in stored_events]
    version = len(stored_events) - 1
    aggregate.load_from_history(domain_events, version)
    return aggregate
save async
save(aggregate_id, aggregate, *, idempotency_key=None)
Source code in src/waku/eventsourcing/repository.py
async def save(
    self,
    aggregate_id: str,
    aggregate: AggregateT,
    *,
    idempotency_key: str | None = None,
) -> tuple[int, list[INotification]]:
    stream_id = self._stream_id(aggregate_id)
    events = aggregate.collect_events()
    if not events:
        return aggregate.version, []

    envelopes = [
        EventEnvelope(
            domain_event=event,
            idempotency_key=f'{idempotency_key}:{i}' if idempotency_key else str(uuid.uuid4()),
        )
        for i, event in enumerate(events)
    ]
    expected = Exact(version=aggregate.version) if aggregate.version >= 0 else NoStream()
    new_version = await self._event_store.append_to_stream(stream_id, envelopes, expected_version=expected)
    aggregate.mark_persisted(new_version)
    return new_version, events
create_aggregate
create_aggregate()
Source code in src/waku/eventsourcing/repository.py
def create_aggregate(self) -> AggregateT:
    aggregate_cls = self._resolve_aggregate_type()
    if aggregate_cls is None:
        msg = f'{type(self).__name__}: cannot auto-create aggregate, override create_aggregate()'
        raise TypeError(msg)
    return aggregate_cls()

serialization

default_retort module-attribute

default_retort = Retort(
    recipe=[
        loader(UUID, UUID),
        dumper(UUID, str),
        loader(StreamId, from_value),
        dumper(StreamId, str),
    ]
)

IEventSerializer

Bases: ABC

serialize abstractmethod
serialize(event)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def serialize(self, event: INotification, /) -> dict[str, Any]: ...
deserialize abstractmethod
deserialize(data, event_type)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification: ...

ISnapshotStateSerializer

Bases: ABC

serialize abstractmethod
serialize(state)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def serialize(self, state: object, /) -> dict[str, Any]: ...
deserialize abstractmethod
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT: ...

JsonEventSerializer

JsonEventSerializer(registry)

Bases: IEventSerializer

Source code in src/waku/eventsourcing/serialization/json.py
def __init__(self, registry: EventTypeRegistry) -> None:
    self._registry = registry
serialize
serialize(event)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def serialize(self, event: INotification, /) -> dict[str, Any]:
    validate_dataclass_instance(event)
    return cast('dict[str, Any]', default_retort.dump(event, type(event)))
deserialize
deserialize(data, event_type)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification:
    cls = self._registry.resolve(event_type)
    return default_retort.load(data, cls)

JsonSnapshotStateSerializer

Bases: ISnapshotStateSerializer

serialize
serialize(state)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def serialize(self, state: object, /) -> dict[str, Any]:
    validate_dataclass_instance(state)
    return cast('dict[str, Any]', default_retort.dump(state, type(state)))
deserialize
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT:
    return default_retort.load(data, state_type)

EventTypeRegistry

EventTypeRegistry()
Source code in src/waku/eventsourcing/serialization/registry.py
def __init__(self) -> None:
    self._name_to_type: dict[str, type[INotification]] = {}
    self._type_to_name: dict[type[INotification], str] = {}
    self._type_to_version: dict[type[INotification], int] = {}
    self._frozen = False
is_frozen property
is_frozen
register
register(event_type, /, *, name=None, version=1)
Source code in src/waku/eventsourcing/serialization/registry.py
def register(self, event_type: type[INotification], /, *, name: str | None = None, version: int = 1) -> None:
    if self._frozen:
        raise RegistryFrozenError
    type_name = name or event_type.__name__

    if event_type in self._type_to_name:
        existing_name = self._type_to_name[event_type]
        existing_version = self._type_to_version[event_type]
        if existing_name == type_name and existing_version == version:
            return
        raise ConflictingEventTypeError(event_type, existing_name, existing_version, type_name, version)

    if type_name in self._name_to_type:
        raise DuplicateEventTypeError(type_name)

    self._name_to_type[type_name] = event_type
    self._type_to_name[event_type] = type_name
    self._type_to_version[event_type] = version
add_alias
add_alias(event_type, alias)
Source code in src/waku/eventsourcing/serialization/registry.py
def add_alias(self, event_type: type[INotification], alias: str, /) -> None:
    if self._frozen:
        raise RegistryFrozenError
    if event_type not in self._type_to_name:
        raise UnknownEventTypeError(event_type.__name__)
    if alias in self._name_to_type:
        if self._name_to_type[alias] is event_type:
            return
        raise DuplicateEventTypeError(alias)
    self._name_to_type[alias] = event_type
resolve
resolve(event_type_name)
Source code in src/waku/eventsourcing/serialization/registry.py
def resolve(self, event_type_name: str, /) -> type[INotification]:
    try:
        return self._name_to_type[event_type_name]
    except KeyError:
        raise UnknownEventTypeError(event_type_name) from None
get_name
get_name(event_type)
Source code in src/waku/eventsourcing/serialization/registry.py
def get_name(self, event_type: type[INotification], /) -> str:
    try:
        return self._type_to_name[event_type]
    except KeyError:
        raise UnknownEventTypeError(event_type.__name__) from None
get_version
get_version(event_type)
Source code in src/waku/eventsourcing/serialization/registry.py
def get_version(self, event_type: type[INotification], /) -> int:
    try:
        return self._type_to_version[event_type]
    except KeyError:
        raise UnknownEventTypeError(event_type.__name__) from None
freeze
freeze()
Source code in src/waku/eventsourcing/serialization/registry.py
def freeze(self) -> None:
    self._frozen = True

interfaces

IEventSerializer

Bases: ABC

serialize abstractmethod
serialize(event)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def serialize(self, event: INotification, /) -> dict[str, Any]: ...
deserialize abstractmethod
deserialize(data, event_type)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification: ...
ISnapshotStateSerializer

Bases: ABC

serialize abstractmethod
serialize(state)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def serialize(self, state: object, /) -> dict[str, Any]: ...
deserialize abstractmethod
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT: ...

json

JsonEventSerializer
JsonEventSerializer(registry)

Bases: IEventSerializer

Source code in src/waku/eventsourcing/serialization/json.py
def __init__(self, registry: EventTypeRegistry) -> None:
    self._registry = registry
serialize
serialize(event)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def serialize(self, event: INotification, /) -> dict[str, Any]:
    validate_dataclass_instance(event)
    return cast('dict[str, Any]', default_retort.dump(event, type(event)))
deserialize
deserialize(data, event_type)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def deserialize(self, data: dict[str, Any], event_type: str, /) -> INotification:
    cls = self._registry.resolve(event_type)
    return default_retort.load(data, cls)
JsonSnapshotStateSerializer

Bases: ISnapshotStateSerializer

serialize
serialize(state)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def serialize(self, state: object, /) -> dict[str, Any]:
    validate_dataclass_instance(state)
    return cast('dict[str, Any]', default_retort.dump(state, type(state)))
deserialize
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT:
    return default_retort.load(data, state_type)

registry

EventTypeRegistry
EventTypeRegistry()
Source code in src/waku/eventsourcing/serialization/registry.py
def __init__(self) -> None:
    self._name_to_type: dict[str, type[INotification]] = {}
    self._type_to_name: dict[type[INotification], str] = {}
    self._type_to_version: dict[type[INotification], int] = {}
    self._frozen = False
is_frozen property
is_frozen
register
register(event_type, /, *, name=None, version=1)
Source code in src/waku/eventsourcing/serialization/registry.py
def register(self, event_type: type[INotification], /, *, name: str | None = None, version: int = 1) -> None:
    if self._frozen:
        raise RegistryFrozenError
    type_name = name or event_type.__name__

    if event_type in self._type_to_name:
        existing_name = self._type_to_name[event_type]
        existing_version = self._type_to_version[event_type]
        if existing_name == type_name and existing_version == version:
            return
        raise ConflictingEventTypeError(event_type, existing_name, existing_version, type_name, version)

    if type_name in self._name_to_type:
        raise DuplicateEventTypeError(type_name)

    self._name_to_type[type_name] = event_type
    self._type_to_name[event_type] = type_name
    self._type_to_version[event_type] = version
add_alias
add_alias(event_type, alias)
Source code in src/waku/eventsourcing/serialization/registry.py
def add_alias(self, event_type: type[INotification], alias: str, /) -> None:
    if self._frozen:
        raise RegistryFrozenError
    if event_type not in self._type_to_name:
        raise UnknownEventTypeError(event_type.__name__)
    if alias in self._name_to_type:
        if self._name_to_type[alias] is event_type:
            return
        raise DuplicateEventTypeError(alias)
    self._name_to_type[alias] = event_type
resolve
resolve(event_type_name)
Source code in src/waku/eventsourcing/serialization/registry.py
def resolve(self, event_type_name: str, /) -> type[INotification]:
    try:
        return self._name_to_type[event_type_name]
    except KeyError:
        raise UnknownEventTypeError(event_type_name) from None
get_name
get_name(event_type)
Source code in src/waku/eventsourcing/serialization/registry.py
def get_name(self, event_type: type[INotification], /) -> str:
    try:
        return self._type_to_name[event_type]
    except KeyError:
        raise UnknownEventTypeError(event_type.__name__) from None
get_version
get_version(event_type)
Source code in src/waku/eventsourcing/serialization/registry.py
def get_version(self, event_type: type[INotification], /) -> int:
    try:
        return self._type_to_version[event_type]
    except KeyError:
        raise UnknownEventTypeError(event_type.__name__) from None
freeze
freeze()
Source code in src/waku/eventsourcing/serialization/registry.py
def freeze(self) -> None:
    self._frozen = True

snapshot

ISnapshotStateSerializer

Bases: ABC

serialize abstractmethod
serialize(state)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def serialize(self, state: object, /) -> dict[str, Any]: ...
deserialize abstractmethod
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/interfaces.py
@abc.abstractmethod
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT: ...

JsonSnapshotStateSerializer

Bases: ISnapshotStateSerializer

serialize
serialize(state)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def serialize(self, state: object, /) -> dict[str, Any]:
    validate_dataclass_instance(state)
    return cast('dict[str, Any]', default_retort.dump(state, type(state)))
deserialize
deserialize(data, state_type)
Source code in src/waku/eventsourcing/serialization/json.py
@override
def deserialize(self, data: dict[str, Any], state_type: type[StateT], /) -> StateT:
    return default_retort.load(data, state_type)

InMemorySnapshotStore

InMemorySnapshotStore()

Bases: ISnapshotStore

Source code in src/waku/eventsourcing/snapshot/in_memory.py
def __init__(self) -> None:
    self._snapshots: dict[StreamId, Snapshot] = {}
load async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/in_memory.py
async def load(self, stream_id: StreamId, /) -> Snapshot | None:
    return self._snapshots.get(stream_id)
save async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/in_memory.py
async def save(self, snapshot: Snapshot, /) -> None:
    self._snapshots[snapshot.stream_id] = snapshot

ISnapshotStore

Bases: ABC

load abstractmethod async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
async def load(self, stream_id: StreamId, /) -> Snapshot | None: ...
save abstractmethod async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
async def save(self, snapshot: Snapshot, /) -> None: ...

ISnapshotStrategy

Bases: ABC

should_snapshot abstractmethod
should_snapshot(version, events_since_snapshot)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
def should_snapshot(self, version: int, events_since_snapshot: int) -> bool: ...

Snapshot dataclass

Snapshot(
    *,
    stream_id,
    state,
    version,
    state_type,
    schema_version=1,
)
stream_id instance-attribute
stream_id
state instance-attribute
state
version instance-attribute
version
state_type instance-attribute
state_type
schema_version class-attribute instance-attribute
schema_version = 1

ISnapshotMigration

Bases: ABC

from_version instance-attribute
from_version
to_version instance-attribute
to_version
migrate abstractmethod
migrate(state)
Source code in src/waku/eventsourcing/snapshot/migration.py
@abc.abstractmethod
def migrate(self, state: dict[str, Any], /) -> dict[str, Any]: ...

SnapshotMigrationChain

SnapshotMigrationChain(migrations)
Source code in src/waku/eventsourcing/snapshot/migration.py
def __init__(self, migrations: Sequence[ISnapshotMigration]) -> None:
    sorted_migrations = sorted(migrations, key=lambda m: m.from_version)
    seen: set[int] = set()
    prev_to: int | None = None
    for m in sorted_migrations:
        if m.from_version < 1:
            msg = f'Invalid from_version {m.from_version}: must be >= 1'
            raise SnapshotMigrationChainError(msg)
        if m.to_version <= m.from_version:
            msg = f'Invalid migration: to_version {m.to_version} must be > from_version {m.from_version}'
            raise SnapshotMigrationChainError(msg)
        if m.from_version in seen:
            msg = f'Duplicate snapshot migration at from_version {m.from_version}'
            raise SnapshotMigrationChainError(msg)
        if prev_to is not None and m.from_version != prev_to:
            msg = (
                f'Gap in snapshot migration chain: '
                f'migration to version {prev_to} is not followed by migration from version {prev_to} '
                f'(found from_version {m.from_version})'
            )
            raise SnapshotMigrationChainError(msg)
        seen.add(m.from_version)
        prev_to = m.to_version
    self._migrations = tuple(sorted_migrations)
migrations property
migrations
migrate
migrate(state, from_version)
Source code in src/waku/eventsourcing/snapshot/migration.py
def migrate(self, state: dict[str, Any], from_version: int) -> tuple[dict[str, Any], int]:
    current = from_version
    for m in self._migrations:
        if m.from_version == current:
            state = m.migrate(state)
            current = m.to_version
    return state, current

SnapshotConfig dataclass

SnapshotConfig(
    *,
    strategy,
    schema_version=1,
    migration_chain=_EMPTY_CHAIN,
)
strategy instance-attribute
strategy
schema_version class-attribute instance-attribute
schema_version = 1
migration_chain class-attribute instance-attribute
migration_chain = field(default=_EMPTY_CHAIN)

SnapshotConfigRegistry

SnapshotConfigRegistry(configs)
Source code in src/waku/eventsourcing/snapshot/registry.py
def __init__(self, configs: Mapping[str, SnapshotConfig]) -> None:
    self._configs = dict(configs)
get
get(aggregate_name)
Source code in src/waku/eventsourcing/snapshot/registry.py
def get(self, aggregate_name: str) -> SnapshotConfig:
    config = self._configs.get(aggregate_name)
    if config is None:
        raise SnapshotConfigNotFoundError(aggregate_name)
    return config

SnapshotEventSourcedRepository

SnapshotEventSourcedRepository(
    event_store,
    snapshot_store,
    snapshot_config_registry,
    state_serializer,
)

Bases: EventSourcedRepository[AggregateT], ABC, Generic[AggregateT]

Source code in src/waku/eventsourcing/snapshot/repository.py
def __init__(
    self,
    event_store: IEventStore,
    snapshot_store: ISnapshotStore,
    snapshot_config_registry: SnapshotConfigRegistry,
    state_serializer: ISnapshotStateSerializer,
) -> None:
    super().__init__(event_store)
    self._snapshot_store = snapshot_store
    self._state_serializer = state_serializer
    self._last_snapshot_versions: dict[str, int] = {}
    snapshot_config = snapshot_config_registry.get(self.aggregate_name)
    self._snapshot_strategy = snapshot_config.strategy
    self._snapshot_schema_version = snapshot_config.schema_version
    self._migration_chain = snapshot_config.migration_chain
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
create_aggregate
create_aggregate()
Source code in src/waku/eventsourcing/repository.py
def create_aggregate(self) -> AggregateT:
    aggregate_cls = self._resolve_aggregate_type()
    if aggregate_cls is None:
        msg = f'{type(self).__name__}: cannot auto-create aggregate, override create_aggregate()'
        raise TypeError(msg)
    return aggregate_cls()
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/snapshot/repository.py
async def load(self, aggregate_id: str) -> AggregateT:
    stream_id = self._stream_id(aggregate_id)
    snapshot = await self._snapshot_store.load(stream_id)

    if snapshot is not None:
        if snapshot.state_type != self.aggregate_name:
            raise SnapshotTypeMismatchError(stream_id, self.aggregate_name, snapshot.state_type)

        if snapshot.schema_version != self._snapshot_schema_version:
            snapshot = migrate_snapshot_or_discard(
                self._migration_chain,
                snapshot,
                self._snapshot_schema_version,
                stream_id,
            )
            if snapshot is None:
                self._last_snapshot_versions[aggregate_id] = -1
                return await super().load(aggregate_id)

        self._last_snapshot_versions[aggregate_id] = snapshot.version
        aggregate = self._restore_from_snapshot(snapshot)
        start = snapshot.version + 1
        try:
            stored_events = await self._event_store.read_stream(stream_id, start=start)
        except StreamNotFoundError:
            stored_events = []
        domain_events = [e.data for e in stored_events]
        version = snapshot.version + len(stored_events)
        if domain_events:
            aggregate.load_from_history(domain_events, version)
        else:
            aggregate.mark_persisted(version)
    else:
        self._last_snapshot_versions[aggregate_id] = -1
        aggregate = await super().load(aggregate_id)

    return aggregate
save async
save(aggregate_id, aggregate, *, idempotency_key=None)
Source code in src/waku/eventsourcing/snapshot/repository.py
async def save(
    self,
    aggregate_id: str,
    aggregate: AggregateT,
    *,
    idempotency_key: str | None = None,
) -> tuple[int, list[INotification]]:
    new_version, events = await super().save(aggregate_id, aggregate, idempotency_key=idempotency_key)

    if events:
        stream_id = self._stream_id(aggregate_id)
        last_snapshot_version = self._last_snapshot_versions.get(aggregate_id, -1)
        events_since_snapshot = new_version - last_snapshot_version

        if self._snapshot_strategy.should_snapshot(new_version, events_since_snapshot):
            state_obj = self._snapshot_state(aggregate)
            state_data = self._state_serializer.serialize(state_obj)
            new_snapshot = Snapshot(
                stream_id=stream_id,
                state=state_data,
                version=new_version,
                state_type=self.aggregate_name,
                schema_version=self._snapshot_schema_version,
            )
            await self._snapshot_store.save(new_snapshot)
            self._last_snapshot_versions[aggregate_id] = new_version

    return new_version, events

EventCountStrategy

EventCountStrategy(threshold=100)

Bases: ISnapshotStrategy

Source code in src/waku/eventsourcing/snapshot/strategy.py
def __init__(self, threshold: int = 100) -> None:
    if threshold < 1:
        msg = f'Threshold must be at least 1, got {threshold}'
        raise ValueError(msg)
    self._threshold = threshold
should_snapshot
should_snapshot(version, events_since_snapshot)
Source code in src/waku/eventsourcing/snapshot/strategy.py
def should_snapshot(self, version: int, events_since_snapshot: int) -> bool:  # noqa: ARG002
    return events_since_snapshot >= self._threshold

in_memory

InMemorySnapshotStore
InMemorySnapshotStore()

Bases: ISnapshotStore

Source code in src/waku/eventsourcing/snapshot/in_memory.py
def __init__(self) -> None:
    self._snapshots: dict[StreamId, Snapshot] = {}
load async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/in_memory.py
async def load(self, stream_id: StreamId, /) -> Snapshot | None:
    return self._snapshots.get(stream_id)
save async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/in_memory.py
async def save(self, snapshot: Snapshot, /) -> None:
    self._snapshots[snapshot.stream_id] = snapshot

interfaces

Snapshot dataclass
Snapshot(
    *,
    stream_id,
    state,
    version,
    state_type,
    schema_version=1,
)
stream_id instance-attribute
stream_id
state instance-attribute
state
version instance-attribute
version
state_type instance-attribute
state_type
schema_version class-attribute instance-attribute
schema_version = 1
ISnapshotStore

Bases: ABC

load abstractmethod async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
async def load(self, stream_id: StreamId, /) -> Snapshot | None: ...
save abstractmethod async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
async def save(self, snapshot: Snapshot, /) -> None: ...
ISnapshotStrategy

Bases: ABC

should_snapshot abstractmethod
should_snapshot(version, events_since_snapshot)
Source code in src/waku/eventsourcing/snapshot/interfaces.py
@abc.abstractmethod
def should_snapshot(self, version: int, events_since_snapshot: int) -> bool: ...

migration

logger module-attribute
logger = getLogger(__name__)
ISnapshotMigration

Bases: ABC

from_version instance-attribute
from_version
to_version instance-attribute
to_version
migrate abstractmethod
migrate(state)
Source code in src/waku/eventsourcing/snapshot/migration.py
@abc.abstractmethod
def migrate(self, state: dict[str, Any], /) -> dict[str, Any]: ...
SnapshotMigrationChain
SnapshotMigrationChain(migrations)
Source code in src/waku/eventsourcing/snapshot/migration.py
def __init__(self, migrations: Sequence[ISnapshotMigration]) -> None:
    sorted_migrations = sorted(migrations, key=lambda m: m.from_version)
    seen: set[int] = set()
    prev_to: int | None = None
    for m in sorted_migrations:
        if m.from_version < 1:
            msg = f'Invalid from_version {m.from_version}: must be >= 1'
            raise SnapshotMigrationChainError(msg)
        if m.to_version <= m.from_version:
            msg = f'Invalid migration: to_version {m.to_version} must be > from_version {m.from_version}'
            raise SnapshotMigrationChainError(msg)
        if m.from_version in seen:
            msg = f'Duplicate snapshot migration at from_version {m.from_version}'
            raise SnapshotMigrationChainError(msg)
        if prev_to is not None and m.from_version != prev_to:
            msg = (
                f'Gap in snapshot migration chain: '
                f'migration to version {prev_to} is not followed by migration from version {prev_to} '
                f'(found from_version {m.from_version})'
            )
            raise SnapshotMigrationChainError(msg)
        seen.add(m.from_version)
        prev_to = m.to_version
    self._migrations = tuple(sorted_migrations)
migrations property
migrations
migrate
migrate(state, from_version)
Source code in src/waku/eventsourcing/snapshot/migration.py
def migrate(self, state: dict[str, Any], from_version: int) -> tuple[dict[str, Any], int]:
    current = from_version
    for m in self._migrations:
        if m.from_version == current:
            state = m.migrate(state)
            current = m.to_version
    return state, current
migrate_snapshot_or_discard
migrate_snapshot_or_discard(
    chain, snapshot, target_version, stream_id
)
Source code in src/waku/eventsourcing/snapshot/migration.py
def migrate_snapshot_or_discard(
    chain: SnapshotMigrationChain,
    snapshot: Snapshot,
    target_version: int,
    stream_id: StreamId,
) -> Snapshot | None:
    migrated_state, reached = chain.migrate(snapshot.state, snapshot.schema_version)
    if reached != target_version:
        logger.warning(
            'Snapshot schema version %d does not match expected %d for stream %s. '
            'No complete migration path. Discarding snapshot and replaying from events.',
            snapshot.schema_version,
            target_version,
            stream_id,
        )
        return None
    return Snapshot(
        stream_id=snapshot.stream_id,
        state=migrated_state,
        version=snapshot.version,
        state_type=snapshot.state_type,
        schema_version=reached,
    )

registry

SnapshotConfig dataclass
SnapshotConfig(
    *,
    strategy,
    schema_version=1,
    migration_chain=_EMPTY_CHAIN,
)
strategy instance-attribute
strategy
schema_version class-attribute instance-attribute
schema_version = 1
migration_chain class-attribute instance-attribute
migration_chain = field(default=_EMPTY_CHAIN)
SnapshotConfigRegistry
SnapshotConfigRegistry(configs)
Source code in src/waku/eventsourcing/snapshot/registry.py
def __init__(self, configs: Mapping[str, SnapshotConfig]) -> None:
    self._configs = dict(configs)
get
get(aggregate_name)
Source code in src/waku/eventsourcing/snapshot/registry.py
def get(self, aggregate_name: str) -> SnapshotConfig:
    config = self._configs.get(aggregate_name)
    if config is None:
        raise SnapshotConfigNotFoundError(aggregate_name)
    return config

repository

AggregateT module-attribute
AggregateT = TypeVar(
    'AggregateT', bound=EventSourcedAggregate
)
SnapshotEventSourcedRepository
SnapshotEventSourcedRepository(
    event_store,
    snapshot_store,
    snapshot_config_registry,
    state_serializer,
)

Bases: EventSourcedRepository[AggregateT], ABC, Generic[AggregateT]

Source code in src/waku/eventsourcing/snapshot/repository.py
def __init__(
    self,
    event_store: IEventStore,
    snapshot_store: ISnapshotStore,
    snapshot_config_registry: SnapshotConfigRegistry,
    state_serializer: ISnapshotStateSerializer,
) -> None:
    super().__init__(event_store)
    self._snapshot_store = snapshot_store
    self._state_serializer = state_serializer
    self._last_snapshot_versions: dict[str, int] = {}
    snapshot_config = snapshot_config_registry.get(self.aggregate_name)
    self._snapshot_strategy = snapshot_config.strategy
    self._snapshot_schema_version = snapshot_config.schema_version
    self._migration_chain = snapshot_config.migration_chain
aggregate_name class-attribute
aggregate_name
max_stream_length class-attribute
max_stream_length = None
load async
load(aggregate_id)
Source code in src/waku/eventsourcing/snapshot/repository.py
async def load(self, aggregate_id: str) -> AggregateT:
    stream_id = self._stream_id(aggregate_id)
    snapshot = await self._snapshot_store.load(stream_id)

    if snapshot is not None:
        if snapshot.state_type != self.aggregate_name:
            raise SnapshotTypeMismatchError(stream_id, self.aggregate_name, snapshot.state_type)

        if snapshot.schema_version != self._snapshot_schema_version:
            snapshot = migrate_snapshot_or_discard(
                self._migration_chain,
                snapshot,
                self._snapshot_schema_version,
                stream_id,
            )
            if snapshot is None:
                self._last_snapshot_versions[aggregate_id] = -1
                return await super().load(aggregate_id)

        self._last_snapshot_versions[aggregate_id] = snapshot.version
        aggregate = self._restore_from_snapshot(snapshot)
        start = snapshot.version + 1
        try:
            stored_events = await self._event_store.read_stream(stream_id, start=start)
        except StreamNotFoundError:
            stored_events = []
        domain_events = [e.data for e in stored_events]
        version = snapshot.version + len(stored_events)
        if domain_events:
            aggregate.load_from_history(domain_events, version)
        else:
            aggregate.mark_persisted(version)
    else:
        self._last_snapshot_versions[aggregate_id] = -1
        aggregate = await super().load(aggregate_id)

    return aggregate
save async
save(aggregate_id, aggregate, *, idempotency_key=None)
Source code in src/waku/eventsourcing/snapshot/repository.py
async def save(
    self,
    aggregate_id: str,
    aggregate: AggregateT,
    *,
    idempotency_key: str | None = None,
) -> tuple[int, list[INotification]]:
    new_version, events = await super().save(aggregate_id, aggregate, idempotency_key=idempotency_key)

    if events:
        stream_id = self._stream_id(aggregate_id)
        last_snapshot_version = self._last_snapshot_versions.get(aggregate_id, -1)
        events_since_snapshot = new_version - last_snapshot_version

        if self._snapshot_strategy.should_snapshot(new_version, events_since_snapshot):
            state_obj = self._snapshot_state(aggregate)
            state_data = self._state_serializer.serialize(state_obj)
            new_snapshot = Snapshot(
                stream_id=stream_id,
                state=state_data,
                version=new_version,
                state_type=self.aggregate_name,
                schema_version=self._snapshot_schema_version,
            )
            await self._snapshot_store.save(new_snapshot)
            self._last_snapshot_versions[aggregate_id] = new_version

    return new_version, events
create_aggregate
create_aggregate()
Source code in src/waku/eventsourcing/repository.py
def create_aggregate(self) -> AggregateT:
    aggregate_cls = self._resolve_aggregate_type()
    if aggregate_cls is None:
        msg = f'{type(self).__name__}: cannot auto-create aggregate, override create_aggregate()'
        raise TypeError(msg)
    return aggregate_cls()

sqlalchemy

SqlAlchemySnapshotStore
SqlAlchemySnapshotStore(session, snapshots_table)

Bases: ISnapshotStore

Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
def __init__(self, session: AsyncSession, snapshots_table: Table) -> None:
    self._session = session
    self._snapshots = snapshots_table
load async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
async def load(self, stream_id: StreamId, /) -> Snapshot | None:
    key = str(stream_id)
    query = select(self._snapshots).where(self._snapshots.c.stream_id == key)
    result = await self._session.execute(query)
    row: Any = result.one_or_none()
    if row is None:
        return None
    return Snapshot(
        stream_id=StreamId.from_value(row.stream_id),
        state=row.state,
        version=row.version,
        state_type=row.state_type,
        schema_version=row.schema_version,
    )
save async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
async def save(self, snapshot: Snapshot, /) -> None:
    stmt = pg_insert(self._snapshots).values(
        stream_id=str(snapshot.stream_id),
        state=snapshot.state,
        version=snapshot.version,
        state_type=snapshot.state_type,
        schema_version=snapshot.schema_version,
    )
    stmt = stmt.on_conflict_do_update(
        index_elements=['stream_id'],
        set_={
            'state': stmt.excluded.state,
            'version': stmt.excluded.version,
            'state_type': stmt.excluded.state_type,
            'schema_version': stmt.excluded.schema_version,
            'updated_at': sa_func.now(),
        },
    )
    await self._session.execute(stmt)
    await self._session.flush()
make_sqlalchemy_snapshot_store
make_sqlalchemy_snapshot_store(snapshots_table)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
def make_sqlalchemy_snapshot_store(
    snapshots_table: Table,
) -> Callable[..., SqlAlchemySnapshotStore]:
    def factory(session: AsyncSession) -> SqlAlchemySnapshotStore:
        return SqlAlchemySnapshotStore(session, snapshots_table)

    return factory
bind_snapshot_tables
bind_snapshot_tables(metadata)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/tables.py
def bind_snapshot_tables(metadata: MetaData) -> Table:
    return es_snapshots_table.to_metadata(metadata)
store
SqlAlchemySnapshotStore
SqlAlchemySnapshotStore(session, snapshots_table)

Bases: ISnapshotStore

Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
def __init__(self, session: AsyncSession, snapshots_table: Table) -> None:
    self._session = session
    self._snapshots = snapshots_table
load async
load(stream_id)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
async def load(self, stream_id: StreamId, /) -> Snapshot | None:
    key = str(stream_id)
    query = select(self._snapshots).where(self._snapshots.c.stream_id == key)
    result = await self._session.execute(query)
    row: Any = result.one_or_none()
    if row is None:
        return None
    return Snapshot(
        stream_id=StreamId.from_value(row.stream_id),
        state=row.state,
        version=row.version,
        state_type=row.state_type,
        schema_version=row.schema_version,
    )
save async
save(snapshot)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
async def save(self, snapshot: Snapshot, /) -> None:
    stmt = pg_insert(self._snapshots).values(
        stream_id=str(snapshot.stream_id),
        state=snapshot.state,
        version=snapshot.version,
        state_type=snapshot.state_type,
        schema_version=snapshot.schema_version,
    )
    stmt = stmt.on_conflict_do_update(
        index_elements=['stream_id'],
        set_={
            'state': stmt.excluded.state,
            'version': stmt.excluded.version,
            'state_type': stmt.excluded.state_type,
            'schema_version': stmt.excluded.schema_version,
            'updated_at': sa_func.now(),
        },
    )
    await self._session.execute(stmt)
    await self._session.flush()
make_sqlalchemy_snapshot_store
make_sqlalchemy_snapshot_store(snapshots_table)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
def make_sqlalchemy_snapshot_store(
    snapshots_table: Table,
) -> Callable[..., SqlAlchemySnapshotStore]:
    def factory(session: AsyncSession) -> SqlAlchemySnapshotStore:
        return SqlAlchemySnapshotStore(session, snapshots_table)

    return factory
tables
es_snapshots_table module-attribute
es_snapshots_table = Table(
    'es_snapshots',
    _internal_metadata,
    Column('stream_id', Text, primary_key=True),
    Column('state', JSONB, nullable=False),
    Column('version', Integer, nullable=False),
    Column('state_type', Text, nullable=False),
    Column(
        'schema_version',
        Integer,
        nullable=False,
        server_default='1',
    ),
    Column(
        'created_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
    ),
    Column(
        'updated_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
        onupdate=now(),
    ),
)
bind_snapshot_tables
bind_snapshot_tables(metadata)
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/tables.py
def bind_snapshot_tables(metadata: MetaData) -> Table:
    return es_snapshots_table.to_metadata(metadata)

strategy

EventCountStrategy
EventCountStrategy(threshold=100)

Bases: ISnapshotStrategy

Source code in src/waku/eventsourcing/snapshot/strategy.py
def __init__(self, threshold: int = 100) -> None:
    if threshold < 1:
        msg = f'Threshold must be at least 1, got {threshold}'
        raise ValueError(msg)
    self._threshold = threshold
should_snapshot
should_snapshot(version, events_since_snapshot)
Source code in src/waku/eventsourcing/snapshot/strategy.py
def should_snapshot(self, version: int, events_since_snapshot: int) -> bool:  # noqa: ARG002
    return events_since_snapshot >= self._threshold

store

InMemoryEventStore

InMemoryEventStore(registry, projections=(), enrichers=())

Bases: IEventStore

Source code in src/waku/eventsourcing/store/in_memory.py
def __init__(
    self,
    registry: EventTypeRegistry,
    projections: Sequence[IProjection] = (),
    enrichers: Sequence[IMetadataEnricher] = (),
) -> None:
    self._registry = registry
    self._streams: dict[str, list[StoredEvent]] = {}
    self._idempotency_keys: dict[str, set[str]] = {}
    self._global_position: int = 0
    self._lock = anyio.Lock()
    self._projections = projections
    self._enrichers = enrichers
read_stream async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]:
    async with self._lock:
        key = str(stream_id)
        if key not in self._streams:
            raise StreamNotFoundError(stream_id)
        events = self._streams[key]
        match start:
            case StreamPosition.START:
                offset = 0
            case StreamPosition.END:
                offset = max(len(events) - 1, 0)
            case int() as offset:
                pass
            case _:  # pragma: no cover
                assert_never(start)
        subset = events[offset:]
        if count is not None:
            subset = subset[:count]
        return list(subset)
read_all async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]:
    async with self._lock:
        all_events: list[StoredEvent] = []
        for stream_events in self._streams.values():
            all_events.extend(stream_events)
        all_events.sort(key=lambda e: e.global_position)
        filtered = [e for e in all_events if e.global_position > after_position]
        if count is not None:
            filtered = filtered[:count]
        return filtered
stream_exists async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/in_memory.py
async def stream_exists(self, stream_id: StreamId, /) -> bool:
    async with self._lock:
        return str(stream_id) in self._streams
append_to_stream async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/in_memory.py
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int:
    async with self._lock:
        key = str(stream_id)
        stream = self._streams.get(key)
        current_version = len(stream) - 1 if stream is not None else -1

        if not events:
            check_expected_version(stream_id, expected_version, current_version, exists=stream is not None)
            return current_version

        dedup_version = self._check_idempotency(stream_id, events, current_version)
        if dedup_version is not None:
            return dedup_version

        check_expected_version(stream_id, expected_version, current_version, exists=stream is not None)

        if stream is None:
            stream = []
            self._streams[key] = stream

        stored_events: list[StoredEvent] = []
        for envelope in events:
            position = len(stream)
            stored = StoredEvent(
                event_id=uuid.uuid4(),
                stream_id=stream_id,
                event_type=self._registry.get_name(
                    type(envelope.domain_event)  # pyrefly: ignore[bad-argument-type]
                ),
                position=position,
                global_position=self._global_position,
                timestamp=datetime.now(UTC),
                data=envelope.domain_event,
                metadata=enrich_metadata(envelope.metadata, self._enrichers),
                idempotency_key=envelope.idempotency_key,
                schema_version=self._registry.get_version(
                    type(envelope.domain_event)  # pyrefly: ignore[bad-argument-type]
                ),
            )
            stream.append(stored)
            stored_events.append(stored)
            self._global_position += 1

        stream_keys = self._idempotency_keys.setdefault(key, set())
        for envelope in events:
            stream_keys.add(envelope.idempotency_key)

        for projection in self._projections:
            await projection.project(stored_events)

        return len(stream) - 1

IEventReader

Bases: ABC

read_stream abstractmethod async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]: ...
read_all abstractmethod async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]: ...
stream_exists abstractmethod async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def stream_exists(self, stream_id: StreamId, /) -> bool: ...

IEventStore

Bases: IEventReader, IEventWriter, ABC

append_to_stream abstractmethod async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int: ...
read_stream abstractmethod async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]: ...
read_all abstractmethod async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]: ...
stream_exists abstractmethod async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def stream_exists(self, stream_id: StreamId, /) -> bool: ...

IEventWriter

Bases: ABC

append_to_stream abstractmethod async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int: ...

in_memory

InMemoryEventStore
InMemoryEventStore(registry, projections=(), enrichers=())

Bases: IEventStore

Source code in src/waku/eventsourcing/store/in_memory.py
def __init__(
    self,
    registry: EventTypeRegistry,
    projections: Sequence[IProjection] = (),
    enrichers: Sequence[IMetadataEnricher] = (),
) -> None:
    self._registry = registry
    self._streams: dict[str, list[StoredEvent]] = {}
    self._idempotency_keys: dict[str, set[str]] = {}
    self._global_position: int = 0
    self._lock = anyio.Lock()
    self._projections = projections
    self._enrichers = enrichers
read_stream async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]:
    async with self._lock:
        key = str(stream_id)
        if key not in self._streams:
            raise StreamNotFoundError(stream_id)
        events = self._streams[key]
        match start:
            case StreamPosition.START:
                offset = 0
            case StreamPosition.END:
                offset = max(len(events) - 1, 0)
            case int() as offset:
                pass
            case _:  # pragma: no cover
                assert_never(start)
        subset = events[offset:]
        if count is not None:
            subset = subset[:count]
        return list(subset)
read_all async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]:
    async with self._lock:
        all_events: list[StoredEvent] = []
        for stream_events in self._streams.values():
            all_events.extend(stream_events)
        all_events.sort(key=lambda e: e.global_position)
        filtered = [e for e in all_events if e.global_position > after_position]
        if count is not None:
            filtered = filtered[:count]
        return filtered
stream_exists async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/in_memory.py
async def stream_exists(self, stream_id: StreamId, /) -> bool:
    async with self._lock:
        return str(stream_id) in self._streams
append_to_stream async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/in_memory.py
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int:
    async with self._lock:
        key = str(stream_id)
        stream = self._streams.get(key)
        current_version = len(stream) - 1 if stream is not None else -1

        if not events:
            check_expected_version(stream_id, expected_version, current_version, exists=stream is not None)
            return current_version

        dedup_version = self._check_idempotency(stream_id, events, current_version)
        if dedup_version is not None:
            return dedup_version

        check_expected_version(stream_id, expected_version, current_version, exists=stream is not None)

        if stream is None:
            stream = []
            self._streams[key] = stream

        stored_events: list[StoredEvent] = []
        for envelope in events:
            position = len(stream)
            stored = StoredEvent(
                event_id=uuid.uuid4(),
                stream_id=stream_id,
                event_type=self._registry.get_name(
                    type(envelope.domain_event)  # pyrefly: ignore[bad-argument-type]
                ),
                position=position,
                global_position=self._global_position,
                timestamp=datetime.now(UTC),
                data=envelope.domain_event,
                metadata=enrich_metadata(envelope.metadata, self._enrichers),
                idempotency_key=envelope.idempotency_key,
                schema_version=self._registry.get_version(
                    type(envelope.domain_event)  # pyrefly: ignore[bad-argument-type]
                ),
            )
            stream.append(stored)
            stored_events.append(stored)
            self._global_position += 1

        stream_keys = self._idempotency_keys.setdefault(key, set())
        for envelope in events:
            stream_keys.add(envelope.idempotency_key)

        for projection in self._projections:
            await projection.project(stored_events)

        return len(stream) - 1

interfaces

IEventReader

Bases: ABC

read_stream abstractmethod async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]: ...
read_all abstractmethod async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]: ...
stream_exists abstractmethod async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def stream_exists(self, stream_id: StreamId, /) -> bool: ...
IEventWriter

Bases: ABC

append_to_stream abstractmethod async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int: ...
IEventStore

Bases: IEventReader, IEventWriter, ABC

append_to_stream abstractmethod async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int: ...
read_stream abstractmethod async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]: ...
read_all abstractmethod async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]: ...
stream_exists abstractmethod async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/interfaces.py
@abc.abstractmethod
async def stream_exists(self, stream_id: StreamId, /) -> bool: ...

sqlalchemy

SqlAlchemyEventStore
SqlAlchemyEventStore(
    session,
    serializer,
    registry,
    tables,
    upcaster_chain,
    projections=(),
    enrichers=(),
)

Bases: IEventStore

Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
def __init__(
    self,
    session: AsyncSession,
    serializer: IEventSerializer,
    registry: EventTypeRegistry,
    tables: EventStoreTables,
    upcaster_chain: UpcasterChain,
    projections: Sequence[IProjection] = (),
    enrichers: Sequence[IMetadataEnricher] = (),
) -> None:
    self._session = session
    self._serializer = serializer
    self._registry = registry
    self._streams = tables.streams
    self._events = tables.events
    self._upcaster_chain = upcaster_chain
    self._projections = projections
    self._enrichers = enrichers
read_stream async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]:
    key = str(stream_id)

    if count == 0:
        await self._ensure_stream_exists(stream_id)
        return []

    if start is StreamPosition.END:
        return await self._read_stream_end(stream_id, key)

    match start:
        case StreamPosition.START:
            offset = 0
        case int() as offset:
            pass
        case _:  # pragma: no cover
            assert_never(start)

    query = (
        select(self._events)
        .where(self._events.c.stream_id == key)
        .where(self._events.c.position >= offset)
        .order_by(self._events.c.position)
    )
    if count is not None:
        query = query.limit(count)

    result = await self._session.execute(query)
    rows = result.fetchall()

    if not rows:
        await self._ensure_stream_exists(stream_id)

    return [
        row_to_stored_event(
            row, registry=self._registry, upcaster_chain=self._upcaster_chain, serializer=self._serializer
        )
        for row in rows
    ]
read_all async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]:
    query = (
        select(self._events)
        .where(self._events.c.global_position > after_position)
        .order_by(self._events.c.global_position)
    )
    if count is not None:
        query = query.limit(count)

    result = await self._session.execute(query)
    rows = result.fetchall()
    return [
        row_to_stored_event(
            row, registry=self._registry, upcaster_chain=self._upcaster_chain, serializer=self._serializer
        )
        for row in rows
    ]
stream_exists async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def stream_exists(self, stream_id: StreamId, /) -> bool:
    key = str(stream_id)
    query = select(self._streams.c.stream_id).where(self._streams.c.stream_id == key)
    result = await self._session.execute(query)
    return result.scalar_one_or_none() is not None
append_to_stream async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int:
    if not events:
        return await self._resolve_current_version(stream_id, expected_version)

    dedup_version = await self._check_idempotency(stream_id, events)
    if dedup_version is not None:
        return dedup_version

    current_version = await self._resolve_current_version(stream_id, expected_version)
    new_version = current_version + len(events)

    try:
        async with self._session.begin_nested():
            await self._ensure_stream_row(stream_id)
            await self._update_stream_version(stream_id, current_version, new_version)
            stored_events = await self._insert_events(stream_id, events, start_position=current_version + 1)
    except IntegrityError as exc:
        if IDEMPOTENCY_KEY_CONSTRAINT in str(exc):
            logger.warning(
                'Idempotency race condition on stream %s: duplicate key caught by DB constraint',
                stream_id,
            )
            dedup_version = await self._check_idempotency(stream_id, events)
            if dedup_version is not None:
                return dedup_version
            logger.exception(  # pragma: no cover
                'Idempotency re-check returned no match after IntegrityError on stream %s — '
                'this should not happen under normal conditions',
                stream_id,
            )
            raise DuplicateIdempotencyKeyError(
                stream_id,
                reason='conflict with existing keys',
            ) from exc  # pragma: no cover
        raise  # pragma: no cover

    for projection in self._projections:
        await projection.project(stored_events)

    return new_version
EventStoreTables dataclass
EventStoreTables(streams, events)
streams instance-attribute
streams
events instance-attribute
events
make_sqlalchemy_event_store
make_sqlalchemy_event_store(tables)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
def make_sqlalchemy_event_store(tables: EventStoreTables) -> SqlAlchemyEventStoreFactory:
    def factory(
        session: AsyncSession,
        serializer: IEventSerializer,
        registry: EventTypeRegistry,
        upcaster_chain: UpcasterChain,
        projections: Sequence[IProjection] = (),
        enrichers: Sequence[IMetadataEnricher] = (),
    ) -> SqlAlchemyEventStore:
        return SqlAlchemyEventStore(session, serializer, registry, tables, upcaster_chain, projections, enrichers)

    return factory
bind_event_store_tables
bind_event_store_tables(metadata)
Source code in src/waku/eventsourcing/store/sqlalchemy/tables.py
def bind_event_store_tables(metadata: MetaData) -> EventStoreTables:
    streams = es_streams_table.to_metadata(metadata)
    events = es_events_table.to_metadata(metadata)
    return EventStoreTables(streams=streams, events=events)
store
logger module-attribute
logger = getLogger(__name__)
SqlAlchemyEventStoreFactory

Bases: Protocol

SqlAlchemyEventStore
SqlAlchemyEventStore(
    session,
    serializer,
    registry,
    tables,
    upcaster_chain,
    projections=(),
    enrichers=(),
)

Bases: IEventStore

Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
def __init__(
    self,
    session: AsyncSession,
    serializer: IEventSerializer,
    registry: EventTypeRegistry,
    tables: EventStoreTables,
    upcaster_chain: UpcasterChain,
    projections: Sequence[IProjection] = (),
    enrichers: Sequence[IMetadataEnricher] = (),
) -> None:
    self._session = session
    self._serializer = serializer
    self._registry = registry
    self._streams = tables.streams
    self._events = tables.events
    self._upcaster_chain = upcaster_chain
    self._projections = projections
    self._enrichers = enrichers
read_stream async
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def read_stream(
    self,
    stream_id: StreamId,
    /,
    *,
    start: int | StreamPosition = StreamPosition.START,
    count: int | None = None,
) -> list[StoredEvent]:
    key = str(stream_id)

    if count == 0:
        await self._ensure_stream_exists(stream_id)
        return []

    if start is StreamPosition.END:
        return await self._read_stream_end(stream_id, key)

    match start:
        case StreamPosition.START:
            offset = 0
        case int() as offset:
            pass
        case _:  # pragma: no cover
            assert_never(start)

    query = (
        select(self._events)
        .where(self._events.c.stream_id == key)
        .where(self._events.c.position >= offset)
        .order_by(self._events.c.position)
    )
    if count is not None:
        query = query.limit(count)

    result = await self._session.execute(query)
    rows = result.fetchall()

    if not rows:
        await self._ensure_stream_exists(stream_id)

    return [
        row_to_stored_event(
            row, registry=self._registry, upcaster_chain=self._upcaster_chain, serializer=self._serializer
        )
        for row in rows
    ]
read_all async
read_all(*, after_position=-1, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def read_all(
    self,
    *,
    after_position: int = -1,
    count: int | None = None,
) -> list[StoredEvent]:
    query = (
        select(self._events)
        .where(self._events.c.global_position > after_position)
        .order_by(self._events.c.global_position)
    )
    if count is not None:
        query = query.limit(count)

    result = await self._session.execute(query)
    rows = result.fetchall()
    return [
        row_to_stored_event(
            row, registry=self._registry, upcaster_chain=self._upcaster_chain, serializer=self._serializer
        )
        for row in rows
    ]
stream_exists async
stream_exists(stream_id)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def stream_exists(self, stream_id: StreamId, /) -> bool:
    key = str(stream_id)
    query = select(self._streams.c.stream_id).where(self._streams.c.stream_id == key)
    result = await self._session.execute(query)
    return result.scalar_one_or_none() is not None
append_to_stream async
append_to_stream(stream_id, /, events, *, expected_version)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
async def append_to_stream(
    self,
    stream_id: StreamId,
    /,
    events: Sequence[EventEnvelope],
    *,
    expected_version: ExpectedVersion,
) -> int:
    if not events:
        return await self._resolve_current_version(stream_id, expected_version)

    dedup_version = await self._check_idempotency(stream_id, events)
    if dedup_version is not None:
        return dedup_version

    current_version = await self._resolve_current_version(stream_id, expected_version)
    new_version = current_version + len(events)

    try:
        async with self._session.begin_nested():
            await self._ensure_stream_row(stream_id)
            await self._update_stream_version(stream_id, current_version, new_version)
            stored_events = await self._insert_events(stream_id, events, start_position=current_version + 1)
    except IntegrityError as exc:
        if IDEMPOTENCY_KEY_CONSTRAINT in str(exc):
            logger.warning(
                'Idempotency race condition on stream %s: duplicate key caught by DB constraint',
                stream_id,
            )
            dedup_version = await self._check_idempotency(stream_id, events)
            if dedup_version is not None:
                return dedup_version
            logger.exception(  # pragma: no cover
                'Idempotency re-check returned no match after IntegrityError on stream %s — '
                'this should not happen under normal conditions',
                stream_id,
            )
            raise DuplicateIdempotencyKeyError(
                stream_id,
                reason='conflict with existing keys',
            ) from exc  # pragma: no cover
        raise  # pragma: no cover

    for projection in self._projections:
        await projection.project(stored_events)

    return new_version
make_sqlalchemy_event_store
make_sqlalchemy_event_store(tables)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
def make_sqlalchemy_event_store(tables: EventStoreTables) -> SqlAlchemyEventStoreFactory:
    def factory(
        session: AsyncSession,
        serializer: IEventSerializer,
        registry: EventTypeRegistry,
        upcaster_chain: UpcasterChain,
        projections: Sequence[IProjection] = (),
        enrichers: Sequence[IMetadataEnricher] = (),
    ) -> SqlAlchemyEventStore:
        return SqlAlchemyEventStore(session, serializer, registry, tables, upcaster_chain, projections, enrichers)

    return factory
tables
IDEMPOTENCY_KEY_CONSTRAINT module-attribute
IDEMPOTENCY_KEY_CONSTRAINT = 'uq_es_events_idempotency_key'
es_streams_table module-attribute
es_streams_table = Table(
    'es_streams',
    _internal_metadata,
    Column('stream_id', Text, primary_key=True),
    Column('stream_type', Text, nullable=False),
    Column(
        'version',
        Integer,
        nullable=False,
        server_default='0',
    ),
    Column(
        'created_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
    ),
    Column(
        'updated_at',
        TIMESTAMP(timezone=True),
        server_default=now(),
        onupdate=now(),
    ),
)
es_events_table module-attribute
es_events_table = Table(
    'es_events',
    _internal_metadata,
    Column(
        'event_id', UUID(as_uuid=True), primary_key=True
    ),
    Column('stream_id', Text, nullable=False),
    Column('event_type', Text, nullable=False),
    Column('position', Integer, nullable=False),
    Column(
        'global_position',
        BigInteger,
        Identity(
            always=True, start=0, minvalue=0, cycle=False
        ),
        nullable=False,
    ),
    Column('data', JSONB, nullable=False),
    Column('metadata', JSONB, nullable=False),
    Column(
        'timestamp',
        TIMESTAMP(timezone=True),
        nullable=False,
    ),
    Column(
        'schema_version',
        Integer,
        nullable=False,
        server_default='1',
    ),
    Column('idempotency_key', Text, nullable=False),
    UniqueConstraint(
        'stream_id',
        'position',
        name='uq_es_events_stream_id_position',
    ),
    UniqueConstraint(
        'stream_id',
        'idempotency_key',
        name=IDEMPOTENCY_KEY_CONSTRAINT,
    ),
    Index(
        'ix_es_events_global_position', 'global_position'
    ),
    Index('ix_es_events_event_type', 'event_type'),
)
EventStoreTables dataclass
EventStoreTables(streams, events)
streams instance-attribute
streams
events instance-attribute
events
bind_event_store_tables
bind_event_store_tables(metadata)
Source code in src/waku/eventsourcing/store/sqlalchemy/tables.py
def bind_event_store_tables(metadata: MetaData) -> EventStoreTables:
    streams = es_streams_table.to_metadata(metadata)
    events = es_events_table.to_metadata(metadata)
    return EventStoreTables(streams=streams, events=events)

testing

DeciderSpec

DeciderSpec(decider)

Bases: Generic[StateT, CommandT, EventT]

Given/When/Then DSL for testing IDecider implementations.

Example::

DeciderSpec.for_(decider).given([event]).when(command).then([expected])
Source code in src/waku/eventsourcing/testing.py
def __init__(self, decider: IDecider[StateT, CommandT, EventT]) -> None:
    self._decider = decider
    self._events: list[EventT] = []
for_ classmethod
for_(decider)
Source code in src/waku/eventsourcing/testing.py
@classmethod
def for_(cls, decider: IDecider[StateT, CommandT, EventT]) -> DeciderSpec[StateT, CommandT, EventT]:
    return cls(decider)
given
given(events)
Source code in src/waku/eventsourcing/testing.py
def given(self, events: Sequence[EventT]) -> DeciderSpec[StateT, CommandT, EventT]:
    self._events = list(events)
    return self
when
when(command)
Source code in src/waku/eventsourcing/testing.py
def when(self, command: CommandT) -> _DeciderWhenResult[StateT, CommandT, EventT]:
    state = self._decider.initial_state()
    for event in self._events:
        state = self._decider.evolve(state, event)
    return _DeciderWhenResult(self._decider, state, command)
then_state
then_state(predicate)
Source code in src/waku/eventsourcing/testing.py
def then_state(self, predicate: Callable[[StateT], bool]) -> None:
    state = self._decider.initial_state()
    for event in self._events:
        state = self._decider.evolve(state, event)
    assert predicate(state), f'State predicate failed for state: {state}'  # noqa: S101

upcasting

UpcasterChain

UpcasterChain(upcasters_by_type)
Source code in src/waku/eventsourcing/upcasting/chain.py
def __init__(self, upcasters_by_type: Mapping[str, Sequence[IEventUpcaster]]) -> None:
    chains: dict[str, tuple[IEventUpcaster, ...]] = {}
    for event_type, upcasters in upcasters_by_type.items():
        sorted_upcasters = sorted(upcasters, key=lambda u: u.from_version)
        seen: set[int] = set()
        for u in sorted_upcasters:
            if u.from_version < 1:
                msg = f'Invalid from_version {u.from_version} for event type {event_type!r}: must be >= 1'
                raise UpcasterChainError(msg)
            if u.from_version in seen:
                msg = f'Duplicate upcaster for event type {event_type!r} at from_version {u.from_version}'
                raise UpcasterChainError(msg)
            seen.add(u.from_version)
        chains[event_type] = tuple(sorted_upcasters)
    self._chains = chains
upcast
upcast(event_type, data, schema_version)
Source code in src/waku/eventsourcing/upcasting/chain.py
def upcast(self, event_type: str, data: dict[str, Any], schema_version: int) -> dict[str, Any]:
    upcasters = self._chains.get(event_type)
    if not upcasters:
        return data
    if schema_version > upcasters[-1].from_version:
        return data
    for u in upcasters:
        if u.from_version >= schema_version:
            data = u.upcast(data)
    return data

FnUpcaster

FnUpcaster(from_version, fn)

Bases: IEventUpcaster

Source code in src/waku/eventsourcing/upcasting/fn.py
def __init__(self, from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> None:
    self.from_version = from_version
    self._fn = fn
from_version instance-attribute
from_version = from_version
upcast
upcast(data)
Source code in src/waku/eventsourcing/upcasting/fn.py
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]:
    return self._fn(data)

IEventUpcaster

Bases: ABC

from_version instance-attribute
from_version
upcast abstractmethod
upcast(data)
Source code in src/waku/eventsourcing/upcasting/interfaces.py
@abc.abstractmethod
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]: ...

add_field

add_field(from_version, *, field, default)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def add_field(from_version: int, *, field: str, default: Any) -> IEventUpcaster:
    def _add(data: dict[str, Any]) -> dict[str, Any]:
        result = dict(data)
        if field not in result:
            result[field] = copy.copy(default)
        return result

    return FnUpcaster(from_version, fn=_add)

noop

noop(from_version)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def noop(from_version: int) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=dict)

remove_field

remove_field(from_version, *, field)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def remove_field(from_version: int, *, field: str) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=lambda data: {k: v for k, v in data.items() if k != field})

rename_field

rename_field(from_version, *, old, new)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def rename_field(from_version: int, *, old: str, new: str) -> IEventUpcaster:
    def _rename(data: dict[str, Any]) -> dict[str, Any]:
        result = {k: v for k, v in data.items() if k != old}
        if old in data:
            result[new] = data[old]
        return result

    return FnUpcaster(from_version, fn=_rename)

upcast

upcast(from_version, fn)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def upcast(from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=fn)

chain

UpcasterChain
UpcasterChain(upcasters_by_type)
Source code in src/waku/eventsourcing/upcasting/chain.py
def __init__(self, upcasters_by_type: Mapping[str, Sequence[IEventUpcaster]]) -> None:
    chains: dict[str, tuple[IEventUpcaster, ...]] = {}
    for event_type, upcasters in upcasters_by_type.items():
        sorted_upcasters = sorted(upcasters, key=lambda u: u.from_version)
        seen: set[int] = set()
        for u in sorted_upcasters:
            if u.from_version < 1:
                msg = f'Invalid from_version {u.from_version} for event type {event_type!r}: must be >= 1'
                raise UpcasterChainError(msg)
            if u.from_version in seen:
                msg = f'Duplicate upcaster for event type {event_type!r} at from_version {u.from_version}'
                raise UpcasterChainError(msg)
            seen.add(u.from_version)
        chains[event_type] = tuple(sorted_upcasters)
    self._chains = chains
upcast
upcast(event_type, data, schema_version)
Source code in src/waku/eventsourcing/upcasting/chain.py
def upcast(self, event_type: str, data: dict[str, Any], schema_version: int) -> dict[str, Any]:
    upcasters = self._chains.get(event_type)
    if not upcasters:
        return data
    if schema_version > upcasters[-1].from_version:
        return data
    for u in upcasters:
        if u.from_version >= schema_version:
            data = u.upcast(data)
    return data

fn

FnUpcaster
FnUpcaster(from_version, fn)

Bases: IEventUpcaster

Source code in src/waku/eventsourcing/upcasting/fn.py
def __init__(self, from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> None:
    self.from_version = from_version
    self._fn = fn
from_version instance-attribute
from_version = from_version
upcast
upcast(data)
Source code in src/waku/eventsourcing/upcasting/fn.py
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]:
    return self._fn(data)

helpers

noop
noop(from_version)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def noop(from_version: int) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=dict)
rename_field
rename_field(from_version, *, old, new)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def rename_field(from_version: int, *, old: str, new: str) -> IEventUpcaster:
    def _rename(data: dict[str, Any]) -> dict[str, Any]:
        result = {k: v for k, v in data.items() if k != old}
        if old in data:
            result[new] = data[old]
        return result

    return FnUpcaster(from_version, fn=_rename)
add_field
add_field(from_version, *, field, default)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def add_field(from_version: int, *, field: str, default: Any) -> IEventUpcaster:
    def _add(data: dict[str, Any]) -> dict[str, Any]:
        result = dict(data)
        if field not in result:
            result[field] = copy.copy(default)
        return result

    return FnUpcaster(from_version, fn=_add)
remove_field
remove_field(from_version, *, field)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def remove_field(from_version: int, *, field: str) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=lambda data: {k: v for k, v in data.items() if k != field})
upcast
upcast(from_version, fn)
Source code in src/waku/eventsourcing/upcasting/helpers.py
def upcast(from_version: int, fn: Callable[[dict[str, Any]], dict[str, Any]]) -> IEventUpcaster:
    return FnUpcaster(from_version, fn=fn)

interfaces

IEventUpcaster

Bases: ABC

from_version instance-attribute
from_version
upcast abstractmethod
upcast(data)
Source code in src/waku/eventsourcing/upcasting/interfaces.py
@abc.abstractmethod
def upcast(self, data: dict[str, Any], /) -> dict[str, Any]: ...

exceptions

WakuError

Bases: Exception

extensions

ApplicationExtension module-attribute

ModuleExtension module-attribute

DEFAULT_EXTENSIONS module-attribute

DEFAULT_EXTENSIONS = (
    ValidationExtension(
        [DependenciesAccessibleRule()], strict=True
    ),
)

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async

after_app_init(app)
Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None: ...

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async

on_app_init(app)
Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None: ...

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async

on_app_shutdown(app)
Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None: ...

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure

on_module_configure(metadata)

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async

on_module_destroy(module)
Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None: ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async

on_module_init(module)
Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None: ...

OnModuleRegistration

Bases: Protocol

Extension for contributing providers to module metadata during registration.

This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.

Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).

Execution order
  1. Application-level extensions (assigned to root module)
  2. Module-level extensions (in topological order)
Key differences from OnModuleConfigure
  • Runs after ALL modules are collected (cross-module visibility)
  • Receives registry with access to all modules' metadata
  • Can add providers to owning module

on_module_registration

on_module_registration(registry, owning_module, context)

Contribute providers to module metadata before Module objects are created.

PARAMETER DESCRIPTION
registry

Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.

TYPE: ModuleMetadataRegistry

owning_module

The module type that owns this extension. Providers added via registry.add_provider() should target this module.

TYPE: ModuleType

context

Application context passed to WakuFactory (read-only).

TYPE: Mapping[Any, Any] | None

Source code in src/waku/extensions/protocols.py
def on_module_registration(
    self,
    registry: ModuleMetadataRegistry,
    owning_module: ModuleType,
    context: Mapping[Any, Any] | None,
) -> None:
    """Contribute providers to module metadata before Module objects are created.

    Args:
        registry: Registry of all collected module metadata. Use find_extensions()
                  to discover extensions across modules, add_provider() to contribute.
        owning_module: The module type that owns this extension. Providers
                      added via registry.add_provider() should target this module.
        context: Application context passed to WakuFactory (read-only).
    """
    ...

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)

register_application_extension

register_application_extension(extension)

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast('type[ApplicationExtension]', base)].append(extension)
    return self

register_module_extension

register_module_extension(module_type, extension)
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self

get_application_extensions

get_application_extensions(extension_type)
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast('list[_AppExtT]', self._app_extensions.get(cast('type[ApplicationExtension]', extension_type), []))

get_module_extensions

get_module_extensions(module_type, extension_type)
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast('list[_ModExtT]', self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

protocols

Extension protocols for application and module lifecycle hooks.

ApplicationExtension module-attribute

ModuleExtension module-attribute

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async
on_app_init(app)
Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None: ...

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async
after_app_init(app)
Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None: ...

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async
on_app_shutdown(app)
Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None: ...

OnModuleRegistration

Bases: Protocol

Extension for contributing providers to module metadata during registration.

This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.

Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).

Execution order
  1. Application-level extensions (assigned to root module)
  2. Module-level extensions (in topological order)
Key differences from OnModuleConfigure
  • Runs after ALL modules are collected (cross-module visibility)
  • Receives registry with access to all modules' metadata
  • Can add providers to owning module
on_module_registration
on_module_registration(registry, owning_module, context)

Contribute providers to module metadata before Module objects are created.

PARAMETER DESCRIPTION
registry

Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.

TYPE: ModuleMetadataRegistry

owning_module

The module type that owns this extension. Providers added via registry.add_provider() should target this module.

TYPE: ModuleType

context

Application context passed to WakuFactory (read-only).

TYPE: Mapping[Any, Any] | None

Source code in src/waku/extensions/protocols.py
def on_module_registration(
    self,
    registry: ModuleMetadataRegistry,
    owning_module: ModuleType,
    context: Mapping[Any, Any] | None,
) -> None:
    """Contribute providers to module metadata before Module objects are created.

    Args:
        registry: Registry of all collected module metadata. Use find_extensions()
                  to discover extensions across modules, add_provider() to contribute.
        owning_module: The module type that owns this extension. Providers
                      added via registry.add_provider() should target this module.
        context: Application context passed to WakuFactory (read-only).
    """
    ...

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure
on_module_configure(metadata)

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async
on_module_init(module)
Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None: ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async
on_module_destroy(module)
Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None: ...

registry

Extension registry for centralized management of extensions.

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)
register_application_extension
register_application_extension(extension)

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast('type[ApplicationExtension]', base)].append(extension)
    return self
register_module_extension
register_module_extension(module_type, extension)
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self
get_application_extensions
get_application_extensions(extension_type)
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast('list[_AppExtT]', self._app_extensions.get(cast('type[ApplicationExtension]', extension_type), []))
get_module_extensions
get_module_extensions(module_type, extension_type)
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast('list[_ModExtT]', self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

factory

ContainerConfig dataclass

ContainerConfig(
    *,
    lock_factory=Lock,
    start_scope=None,
    skip_validation=False,
)

lock_factory class-attribute instance-attribute

lock_factory = Lock

start_scope class-attribute instance-attribute

start_scope = None

skip_validation class-attribute instance-attribute

skip_validation = False

WakuFactory

WakuFactory(
    root_module_type,
    /,
    context=None,
    lifespan=(),
    extensions=DEFAULT_EXTENSIONS,
    container_config=None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()

create

create()
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(
        self._root_module_type,
        context=self._context,
        app_extensions=self._extensions,
    ).build()

    container = self._build_container(registry.providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

lifespan

LifespanFunc module-attribute

LifespanFunc = (
    Callable[
        ['WakuApplication'],
        AbstractAsyncContextManager[None],
    ]
    | AbstractAsyncContextManager[None]
)

LifespanWrapper

LifespanWrapper(lifespan_func)
Source code in src/waku/lifespan.py
def __init__(self, lifespan_func: LifespanFunc) -> None:
    self._lifespan_func = lifespan_func

lifespan async

lifespan(app)
Source code in src/waku/lifespan.py
@asynccontextmanager
async def lifespan(self, app: WakuApplication) -> AsyncIterator[None]:
    ctx_manager = (
        self._lifespan_func
        if isinstance(self._lifespan_func, AbstractAsyncContextManager)
        else self._lifespan_func(app)
    )
    async with ctx_manager:
        yield

modules

ModuleType module-attribute

ModuleType = type[object | HasModuleMetadata]

DynamicModule dataclass

DynamicModule(
    *,
    providers=list(),
    imports=list(),
    exports=list(),
    extensions=list(),
    is_global=False,
    id=uuid4(),
    parent_module,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports = field(default_factory=list)

List of modules imported by this module.

exports class-attribute instance-attribute

exports = field(default_factory=list)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions = field(default_factory=list)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global = False

Whether this module is global or not.

id class-attribute instance-attribute

id = field(default_factory=uuid4)

parent_module instance-attribute

parent_module

HasModuleMetadata

Bases: Protocol

ModuleCompiler

extract_metadata

extract_metadata(module_type)
Source code in src/waku/modules/_metadata.py
def extract_metadata(self, module_type: ModuleType | DynamicModule) -> tuple[ModuleType, ModuleMetadata]:
    try:
        return self._extract_metadata(cast('Hashable', module_type))
    except AttributeError:
        msg = f'{type(module_type).__name__} is not module'
        raise ValueError(msg) from None

ModuleMetadata dataclass

ModuleMetadata(
    *,
    providers=list(),
    imports=list(),
    exports=list(),
    extensions=list(),
    is_global=False,
    id=uuid4(),
)

providers class-attribute instance-attribute

providers = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports = field(default_factory=list)

List of modules imported by this module.

exports class-attribute instance-attribute

exports = field(default_factory=list)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions = field(default_factory=list)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global = False

Whether this module is global or not.

id class-attribute instance-attribute

id = field(default_factory=uuid4)

ModuleMetadataRegistry

ModuleMetadataRegistry(metadata_by_type, topological_order)

Registry providing access to collected module metadata.

Provides read access to all modules' metadata for aggregation purposes, with controlled write access through explicit methods.

This class is used during the module registration phase to enable cross-module aggregation of providers.

Source code in src/waku/modules/_metadata_registry.py
def __init__(
    self,
    metadata_by_type: dict[ModuleType, ModuleMetadata],
    topological_order: tuple[ModuleType, ...],
) -> None:
    self._metadata_by_type = metadata_by_type
    self._topological_order = topological_order

modules property

modules

All module types in topological order (dependencies first).

get_metadata

get_metadata(module_type)

Get metadata for a specific module.

Source code in src/waku/modules/_metadata_registry.py
def get_metadata(self, module_type: ModuleType) -> ModuleMetadata:
    """Get metadata for a specific module."""
    return self._metadata_by_type[module_type]

find_extensions

find_extensions(protocol)

Find all extensions of a given type across all modules.

Yields (module_type, extension) pairs in topological order. This is useful for aggregating data from extensions across modules.

PARAMETER DESCRIPTION
protocol

The extension protocol/type to search for.

TYPE: type[_ExtT]

YIELDS DESCRIPTION
tuple[ModuleType, _ExtT]

Tuples of (module_type, extension) for each matching extension.

Source code in src/waku/modules/_metadata_registry.py
def find_extensions(self, protocol: type[_ExtT]) -> Iterator[tuple[ModuleType, _ExtT]]:
    """Find all extensions of a given type across all modules.

    Yields (module_type, extension) pairs in topological order.
    This is useful for aggregating data from extensions across modules.

    Args:
        protocol: The extension protocol/type to search for.

    Yields:
        Tuples of (module_type, extension) for each matching extension.
    """
    for module_type in self._topological_order:
        metadata = self._metadata_by_type[module_type]
        for ext in metadata.extensions:
            if isinstance(ext, protocol):
                yield module_type, ext

add_provider

add_provider(module_type, provider)

Add a provider to a module's metadata.

This is the preferred way to add providers during registration hooks. The provider will become part of the owning module.

PARAMETER DESCRIPTION
module_type

The module to add the provider to.

TYPE: ModuleType

provider

The provider specification to add.

TYPE: Provider

RAISES DESCRIPTION
KeyError

If module_type is not in the registry.

Source code in src/waku/modules/_metadata_registry.py
def add_provider(self, module_type: ModuleType, provider: Provider) -> None:
    """Add a provider to a module's metadata.

    This is the preferred way to add providers during registration hooks.
    The provider will become part of the owning module.

    Args:
        module_type: The module to add the provider to.
        provider: The provider specification to add.

    Raises:
        KeyError: If module_type is not in the registry.
    """
    self._metadata_by_type[module_type].providers.append(provider)

Module

Module(module_type, metadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[Provider]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id = id

target instance-attribute

target = module_type

providers instance-attribute

providers = providers

imports instance-attribute

imports = imports

exports instance-attribute

exports = exports

extensions instance-attribute

extensions = extensions

is_global instance-attribute

is_global = is_global

name property

name

provider property

provider

create_provider

create_provider()
Source code in src/waku/modules/_module.py
def create_provider(self) -> BaseProvider:
    cls = cast('type[_ModuleProvider]', type(f'{self.name}Provider', (_ModuleProvider,), {}))
    self._provider = cls(self.providers)
    return self._provider

ModuleRegistry

ModuleRegistry(
    *, compiler, root_module, modules, providers, adjacency
)

Immutable registry and graph for module queries, traversal, and lookups.

Source code in src/waku/modules/_registry.py
def __init__(
    self,
    *,
    compiler: ModuleCompiler,
    root_module: Module,
    modules: dict[UUID, Module],
    providers: list[BaseProvider],
    adjacency: AdjacencyMatrix,
) -> None:
    self._compiler = compiler
    self._root_module = root_module
    self._modules = modules
    self._providers = tuple(providers)
    self._adjacency = adjacency
    self._parent_to_module = self._build_parent_mapping(modules)

root_module property

root_module

modules property

modules

providers property

providers

compiler property

compiler

get

get(module_type)
Source code in src/waku/modules/_registry.py
def get(self, module_type: ModuleType | DynamicModule) -> Module:
    # For plain module classes, check if they're registered via parent mapping first.
    # This handles the case where ConfigModule.register() was imported,
    # but ConfigModule (the class) is being exported.
    if isinstance(module_type, type) and module_type in self._parent_to_module:
        return self._parent_to_module[module_type]

    module_id = self.compiler.extract_metadata(module_type)[1].id
    return self.get_by_id(module_id)

get_by_id

get_by_id(module_id)
Source code in src/waku/modules/_registry.py
def get_by_id(self, module_id: UUID) -> Module:
    module = self._modules.get(module_id)
    if module is None:
        msg = f'Module with ID {module_id} is not registered in the graph.'
        raise KeyError(msg)
    return module

traverse

traverse(from_=None)

Traverse the module graph in depth-first post-order (children before parent) recursively.

PARAMETER DESCRIPTION
from_

Start module (default: root)

TYPE: Module | None DEFAULT: None

YIELDS DESCRIPTION
Module

Each traversed module (post-order)

TYPE:: Module

Source code in src/waku/modules/_registry.py
def traverse(self, from_: Module | None = None) -> Iterator[Module]:
    """Traverse the module graph in depth-first post-order (children before parent) recursively.

    Args:
        from_: Start module (default: root)

    Yields:
        Module: Each traversed module (post-order)
    """
    start_module = from_ or self._root_module
    visited: set[UUID] = set()

    def _dfs(module: Module) -> Iterator[Module]:
        if module.id in visited:
            return

        visited.add(module.id)

        # Process children first (maintain original order)
        neighbor_ids = self._adjacency[module.id]
        for neighbor_id in neighbor_ids:
            if neighbor_id == module.id:
                continue
            neighbor = self.get_by_id(neighbor_id)
            yield from _dfs(neighbor)

        # Process current module after children (post-order)
        yield module

    yield from _dfs(start_module)

ModuleRegistryBuilder

ModuleRegistryBuilder(
    root_module_type,
    compiler=None,
    context=None,
    app_extensions=(),
)
Source code in src/waku/modules/_registry_builder.py
def __init__(
    self,
    root_module_type: ModuleType,
    compiler: ModuleCompiler | None = None,
    context: dict[Any, Any] | None = None,
    app_extensions: Sequence[ApplicationExtension] = (),
) -> None:
    self._compiler: Final = compiler or ModuleCompiler()
    self._root_module_type: Final = root_module_type
    self._context: Final = context
    self._app_extensions: Final = app_extensions
    self._modules: dict[UUID, Module] = {}
    self._providers: list[BaseProvider] = []

    self._metadata_cache: dict[ModuleType | DynamicModule, tuple[ModuleType, ModuleMetadata]] = {}

build

build()
Source code in src/waku/modules/_registry_builder.py
def build(self) -> ModuleRegistry:
    modules, adjacency = self._collect_modules()
    self._execute_registration_hooks(modules)
    root_module = self._register_modules(modules)
    return self._build_registry(root_module, adjacency)

module

module(
    *,
    providers=(),
    imports=(),
    exports=(),
    extensions=(),
    is_global=False,
)

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[Provider] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[Provider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

testing

override

override(container, *providers, context=None)

Temporarily override providers and/or context in an AsyncContainer for testing.

PARAMETER DESCRIPTION
container

The container whose providers/context will be overridden.

TYPE: AsyncContainer

*providers

Providers to override in the container.

TYPE: BaseProvider DEFAULT: ()

context

Context values to override.

TYPE: dict[Any, Any] | None DEFAULT: None

YIELDS DESCRIPTION
None

Context in which the container uses the overridden providers/context.

TYPE:: None

Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override


class Service: ...


class ServiceOverride(Service): ...


# Override providers
with override(application.container, singleton(ServiceOverride, provided_type=Service)):
    service = await application.container.get(Service)
    assert isinstance(service, ServiceOverride)

# Override context
with override(application.container, context={int: 123}):
    ...
RAISES DESCRIPTION
ValueError

If container is not at root (APP) scope.

Source code in src/waku/testing.py
@contextmanager
def override(
    container: AsyncContainer,
    *providers: BaseProvider,
    context: dict[Any, Any] | None = None,
) -> Iterator[None]:
    """Temporarily override providers and/or context in an AsyncContainer for testing.

    Args:
        container: The container whose providers/context will be overridden.
        *providers: Providers to override in the container.
        context: Context values to override.

    Yields:
        None: Context in which the container uses the overridden providers/context.

    Example:
        ```python
        from waku import WakuFactory, module
        from waku.di import Scope, singleton
        from waku.testing import override


        class Service: ...


        class ServiceOverride(Service): ...


        # Override providers
        with override(application.container, singleton(ServiceOverride, provided_type=Service)):
            service = await application.container.get(Service)
            assert isinstance(service, ServiceOverride)

        # Override context
        with override(application.container, context={int: 123}):
            ...
        ```

    Raises:
        ValueError: If container is not at root (APP) scope.
    """
    if container.scope != Scope.APP:
        msg = (
            f'override() only supports root (APP scope) containers, '
            f'got {container.scope.name} scope. '
            f'Use application.container instead of a scoped container.'
        )
        raise ValueError(msg)

    _mark_as_overrides(providers)

    original_context = cast('dict[Any, Any]', container._context)  # noqa: SLF001
    merged_context = {**original_context, **(context or {})}
    new_container = make_async_container(
        _container_provider(container),
        *providers,
        context=merged_context,
        start_scope=container.scope,
        validation_settings=STRICT_VALIDATION,
    )

    # Only copy cache when no providers are overridden (context-only override)
    # Provider overrides may have transitive effects, so rebuild everything
    if not providers:
        _copy_cache(container, new_container)

    _swap(container, new_container)
    container._cache[CONTAINER_KEY] = container  # noqa: SLF001
    yield
    _swap(new_container, container)

create_test_app async

create_test_app(
    *,
    base=None,
    providers=(),
    imports=(),
    extensions=(),
    app_extensions=DEFAULT_EXTENSIONS,
    context=None,
)

Create a minimal test application with given configuration.

Useful for testing extensions and module configurations in isolation without needing to set up a full application structure.

PARAMETER DESCRIPTION
base

Base module to build upon. When provided, the test module imports this module and providers act as overrides.

TYPE: ModuleType | DynamicModule | None DEFAULT: None

providers

Providers to register in the test module. When base is provided, these override existing providers.

TYPE: Sequence[Provider] DEFAULT: ()

imports

Additional modules to import into the test module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

extensions

Module extensions to register.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

app_extensions

Application extensions to register (default: DEFAULT_EXTENSIONS).

TYPE: Sequence[ApplicationExtension] DEFAULT: DEFAULT_EXTENSIONS

context

Context values to pass to the container.

TYPE: dict[Any, Any] | None DEFAULT: None

YIELDS DESCRIPTION
AsyncIterator[WakuApplication]

Initialized WakuApplication.

Example
from waku.testing import create_test_app
from waku.di import singleton


class IRepository(Protocol):
    async def get(self, id: str) -> Entity: ...


class FakeRepository(IRepository):
    async def get(self, id: str) -> Entity:
        return Entity(id=id)


# Create test app from scratch
async def test_my_extension():
    extension = MyExtension().bind(SomeEvent, SomeHandler)

    async with create_test_app(
        extensions=[extension],
        providers=[singleton(IRepository, FakeRepository)],
    ) as app:
        service = await app.container.get(MyService)
        result = await service.do_something()
        assert result == expected


# Create test app based on existing module with overrides
async def test_with_base_module():
    async with create_test_app(
        base=AppModule,
        providers=[singleton(IRepository, FakeRepository)],
    ) as app:
        # FakeRepository replaces the real one from AppModule
        repo = await app.container.get(IRepository)
        assert isinstance(repo, FakeRepository)
Source code in src/waku/testing.py
@asynccontextmanager
async def create_test_app(
    *,
    base: ModuleType | DynamicModule | None = None,
    providers: Sequence[Provider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    app_extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    context: dict[Any, Any] | None = None,
) -> AsyncIterator[WakuApplication]:
    """Create a minimal test application with given configuration.

    Useful for testing extensions and module configurations in isolation
    without needing to set up a full application structure.

    Args:
        base: Base module to build upon. When provided, the test module
            imports this module and providers act as overrides.
        providers: Providers to register in the test module.
            When `base` is provided, these override existing providers.
        imports: Additional modules to import into the test module.
        extensions: Module extensions to register.
        app_extensions: Application extensions to register (default: DEFAULT_EXTENSIONS).
        context: Context values to pass to the container.

    Yields:
        Initialized WakuApplication.

    Example:
        ```python
        from waku.testing import create_test_app
        from waku.di import singleton


        class IRepository(Protocol):
            async def get(self, id: str) -> Entity: ...


        class FakeRepository(IRepository):
            async def get(self, id: str) -> Entity:
                return Entity(id=id)


        # Create test app from scratch
        async def test_my_extension():
            extension = MyExtension().bind(SomeEvent, SomeHandler)

            async with create_test_app(
                extensions=[extension],
                providers=[singleton(IRepository, FakeRepository)],
            ) as app:
                service = await app.container.get(MyService)
                result = await service.do_something()
                assert result == expected


        # Create test app based on existing module with overrides
        async def test_with_base_module():
            async with create_test_app(
                base=AppModule,
                providers=[singleton(IRepository, FakeRepository)],
            ) as app:
                # FakeRepository replaces the real one from AppModule
                repo = await app.container.get(IRepository)
                assert isinstance(repo, FakeRepository)
        ```
    """
    all_imports = list(imports)
    if base is not None:
        all_imports.insert(0, base)

    override_providers = list(providers)
    if base is not None:
        _mark_as_overrides(override_providers)

    @module(
        providers=override_providers,
        imports=all_imports,
        extensions=list(extensions),
    )
    class _TestModule:
        pass

    app = WakuFactory(_TestModule, context=context, extensions=app_extensions).create()
    async with app:
        yield app

validation

ValidationRule

Bases: Protocol

validate

validate(context)
Source code in src/waku/validation/_abc.py
def validate(self, context: ValidationContext) -> list[ValidationError]: ...

ValidationError

Bases: WakuError

ValidationContext dataclass

ValidationContext(*, app)

app instance-attribute

app

ValidationExtension

ValidationExtension(rules, *, strict=True)

Bases: AfterApplicationInit

Source code in src/waku/validation/_extension.py
def __init__(self, rules: Sequence[ValidationRule], *, strict: bool = True) -> None:
    self.rules = rules
    self.strict: Final = strict

rules instance-attribute

rules = rules

strict instance-attribute

strict = strict

after_app_init async

after_app_init(app)
Source code in src/waku/validation/_extension.py
async def after_app_init(self, app: WakuApplication) -> None:
    context = ValidationContext(app=app)

    errors_chain = chain.from_iterable(rule.validate(context) for rule in self.rules)
    if errors := list(errors_chain):
        self._raise(errors)

rules

DependenciesAccessibleRule

DependenciesAccessibleRule(cache_size=1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(context)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()

    registry = context.app.registry
    modules = list(registry.modules)
    container = context.app.container

    strategies: list[AccessibilityStrategy] = [
        GlobalProvidersStrategy(modules, container, self._types_extractor, registry),
        LocalProvidersStrategy(self._types_extractor),
        ContextVarsStrategy(self._types_extractor),
        ImportedModulesStrategy(registry, self._types_extractor),
    ]

    checker = DependencyAccessChecker(strategies)
    errors: list[ValidationError] = []

    for module in modules:
        for factory in module.provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors

DependencyInaccessibleError

DependencyInaccessibleError(
    required_type, required_by, from_module
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module

dependency_accessible

DependencyInaccessibleError
DependencyInaccessibleError(
    required_type, required_by, from_module
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module
AccessibilityStrategy

Bases: ABC

Strategy for checking if a type is accessible to a module.

is_accessible abstractmethod
is_accessible(required_type, module)

Check if the required type is accessible to the given module.

Source code in src/waku/validation/rules/dependency_accessible.py
@abstractmethod
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    """Check if the required type is accessible to the given module."""
GlobalProvidersStrategy
GlobalProvidersStrategy(
    modules, container, types_extractor, registry
)

Bases: AccessibilityStrategy

Check if type is provided by a global module or APP-scoped context.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    modules: Sequence[Module],
    container: AsyncContainer,
    types_extractor: ModuleTypesExtractor,
    registry: ModuleRegistry,
) -> None:
    self._global_types = self._build_global_types(modules, container, types_extractor, registry)
is_accessible
is_accessible(required_type, module)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._global_types
LocalProvidersStrategy
LocalProvidersStrategy(types_extractor)

Bases: AccessibilityStrategy

Check if type is provided by the module itself.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, types_extractor: ModuleTypesExtractor) -> None:
    self._types_extractor = types_extractor
is_accessible
is_accessible(required_type, module)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._types_extractor.get_provided_types(module)
ContextVarsStrategy
ContextVarsStrategy(types_extractor)

Bases: AccessibilityStrategy

Check if type is provided by application or request container context.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, types_extractor: ModuleTypesExtractor) -> None:
    self._types_extractor = types_extractor
is_accessible
is_accessible(required_type, module)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._types_extractor.get_context_vars(module)
ImportedModulesStrategy
ImportedModulesStrategy(registry, types_extractor)

Bases: AccessibilityStrategy

Check if type is accessible via imported modules (direct export or re-export).

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, registry: ModuleRegistry, types_extractor: ModuleTypesExtractor) -> None:
    self._registry = registry
    self._types_extractor = types_extractor
is_accessible
is_accessible(required_type, module)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    for imported in module.imports:
        imported_module = self._registry.get(imported)
        if self._is_directly_exported(required_type, imported_module):
            return True
        if self._is_reexported(required_type, imported_module):
            return True
    return False
DependencyAccessChecker
DependencyAccessChecker(strategies)

Handles dependency accessibility checks between modules.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, strategies: Sequence[AccessibilityStrategy]) -> None:
    self._strategies = strategies
find_inaccessible_dependencies
find_inaccessible_dependencies(dependencies, module)
Source code in src/waku/validation/rules/dependency_accessible.py
def find_inaccessible_dependencies(
    self,
    dependencies: Sequence[DependencyKey],
    module: Module,
) -> Iterable[type[object]]:
    for dependency in dependencies:
        if not self._is_accessible(dependency.type_hint, module):
            yield dependency.type_hint
DependenciesAccessibleRule
DependenciesAccessibleRule(cache_size=1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(context)
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()

    registry = context.app.registry
    modules = list(registry.modules)
    container = context.app.container

    strategies: list[AccessibilityStrategy] = [
        GlobalProvidersStrategy(modules, container, self._types_extractor, registry),
        LocalProvidersStrategy(self._types_extractor),
        ContextVarsStrategy(self._types_extractor),
        ImportedModulesStrategy(registry, self._types_extractor),
    ]

    checker = DependencyAccessChecker(strategies)
    errors: list[ValidationError] = []

    for module in modules:
        for factory in module.provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors