waku
¶
WakuApplication
¶
Source code in src/waku/application.py
initialize
async
¶
WakuFactory
¶
WakuFactory(
root_module_type,
/,
context=None,
lifespan=(),
extensions=DEFAULT_EXTENSIONS,
container_config=None,
)
Source code in src/waku/factory.py
create
¶
Source code in src/waku/factory.py
DynamicModule
dataclass
¶
DynamicModule(
*,
providers=list(),
imports=list(),
exports=list(),
extensions=list(),
is_global=False,
id=uuid4(),
parent_module,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
List of module extensions for lifecycle hooks.
is_global
class-attribute
instance-attribute
¶
Whether this module is global or not.
Module
¶
Source code in src/waku/modules/_module.py
module
¶
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
application
¶
WakuApplication
¶
Source code in src/waku/application.py
initialize
async
¶
cqrs
¶
Event
dataclass
¶
Event(*, event_id=uuid4())
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Request(*, request_id=uuid4())
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
INotificationHandler
¶
Bases: Protocol[NotificationT]
Protocol for notification/event handlers.
MediatR equivalent: INotificationHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
async def handle(self, event: OrderPlaced, /) -> None:
await self._send_confirmation_email(event.order_id)
Mediator
¶
IMediator
¶
Bases: ISender, IPublisher, ABC
Defines a cqrs to encapsulate request/response and publishing interaction patterns.
IPublisher
¶
ISender
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type=Mediator,
event_publisher=SequentialEventPublisher,
pipeline_behaviors=(),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
on_module_configure
¶
bind_request
¶
Source code in src/waku/cqrs/modules.py
bind_event
¶
MediatorModule
¶
register
classmethod
¶
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
IRequestHandler
¶
Bases: Protocol[RequestT, ResponseT]
Protocol for request handlers (commands/queries).
MediatR equivalent: IRequestHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
async def handle(self, request: GetUserQuery, /) -> UserDTO:
return await self._repository.get(request.user_id)
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
contracts
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
RequestT
module-attribute
¶
ResponseT
module-attribute
¶
Event
dataclass
¶
Event(*, event_id=uuid4())
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Request(*, request_id=uuid4())
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
event
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
Event
dataclass
¶
Event(*, event_id=uuid4())
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
notification
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
pipeline
¶
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
request
¶
ResponseT
module-attribute
¶
RequestT
module-attribute
¶
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Request(*, request_id=uuid4())
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
events
¶
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
GroupEventPublisher
¶
Bases: EventPublisher
SequentialEventPublisher
¶
Bases: EventPublisher
handler
¶
INotificationHandler
¶
Bases: Protocol[NotificationT]
Protocol for notification/event handlers.
MediatR equivalent: INotificationHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
async def handle(self, event: OrderPlaced, /) -> None:
await self._send_confirmation_email(event.order_id)
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
map
¶
EventMapRegistry
module-attribute
¶
EventMapRegistry = MutableMapping[
type[INotification], EventMapEntry[INotification]
]
EventMapEntry
dataclass
¶
EventMapEntry(
event_type, di_lookup_type, handler_types=list()
)
EventMap
¶
Source code in src/waku/cqrs/events/map.py
freeze
¶
bind
¶
Source code in src/waku/cqrs/events/map.py
merge
¶
has_handlers
¶
publish
¶
SequentialEventPublisher
¶
Bases: EventPublisher
GroupEventPublisher
¶
Bases: EventPublisher
exceptions
¶
MapFrozenError
¶
ImproperlyConfiguredError
¶
Bases: MediatorError
Raised when cqrs configuration is invalid.
RequestHandlerAlreadyRegistered
¶
Bases: MediatorError, KeyError
Raised when a request handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
RequestHandlerNotFound
¶
Bases: MediatorError, TypeError
Raised when a request handler is not found.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
Source code in src/waku/cqrs/exceptions.py
EventHandlerAlreadyRegistered
¶
Bases: MediatorError, KeyError
Raised when an notification handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
event_type |
The type of notification that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
PipelineBehaviorAlreadyRegistered
¶
Bases: MediatorError, KeyError
Raised when a pipeline behavior is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
behavior_type |
The type of behavior that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
impl
¶
interfaces
¶
ISender
¶
IPublisher
¶
modules
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type=Mediator,
event_publisher=SequentialEventPublisher,
pipeline_behaviors=(),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
MediatorModule
¶
register
classmethod
¶
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
on_module_configure
¶
bind_request
¶
Source code in src/waku/cqrs/modules.py
bind_event
¶
MediatorRegistryAggregator
¶
Bases: OnModuleRegistration
on_module_registration
¶
Source code in src/waku/cqrs/modules.py
pipeline
¶
PipelineBehaviorWrapper
¶
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
chain
¶
PipelineBehaviorWrapper
¶
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
map
¶
PipelineBehaviorMapRegistry
module-attribute
¶
PipelineBehaviorMapRegistry = MutableMapping[
type[RequestT],
PipelineBehaviorMapEntry[RequestT, ResponseT],
]
PipelineBehaviorMapEntry
dataclass
¶
PipelineBehaviorMapEntry(
request_type, di_lookup_type, behavior_types=list()
)
Bases: Generic[RequestT, ResponseT]
for_request
classmethod
¶
Source code in src/waku/cqrs/pipeline/map.py
add
¶
PipelineBehaviorMap
¶
Source code in src/waku/cqrs/pipeline/map.py
freeze
¶
bind
¶
Source code in src/waku/cqrs/pipeline/map.py
merge
¶
has_behaviors
¶
registry
¶
MediatorRegistry
dataclass
¶
MediatorRegistry(
*,
request_map=RequestMap(),
event_map=EventMap(),
behavior_map=PipelineBehaviorMap(),
)
behavior_map
class-attribute
instance-attribute
¶
behavior_map = field(default_factory=PipelineBehaviorMap)
merge
¶
freeze
¶
handler_providers
¶
Source code in src/waku/cqrs/registry.py
collector_providers
¶
requests
¶
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handler
¶
IRequestHandler
¶
Bases: Protocol[RequestT, ResponseT]
Protocol for request handlers (commands/queries).
MediatR equivalent: IRequestHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
async def handle(self, request: GetUserQuery, /) -> UserDTO:
return await self._repository.get(request.user_id)
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
map
¶
RequestMapRegistry
module-attribute
¶
RequestMapRegistry = MutableMapping[
type[IRequest[Response | None]],
RequestMapEntry[
IRequest[Response | None], Response | None
],
]
RequestMapEntry
dataclass
¶
RequestMap
¶
Source code in src/waku/cqrs/requests/map.py
freeze
¶
bind
¶
Source code in src/waku/cqrs/requests/map.py
merge
¶
has_handler
¶
utils
¶
get_request_response_type
cached
¶
Extract the response type from an IRequest implementation.
Searches through the class hierarchy to find IRequest or its subclasses, supporting direct implementations, Request subclasses, and nested inheritance.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
if response type cannot be extracted from the request type. |
Source code in src/waku/cqrs/utils.py
di
¶
activator
¶
Create a Provider with an activator for simple cases.
| PARAMETER | DESCRIPTION |
|---|---|
fn
|
Callable that returns bool to determine marker activation. |
*markers
|
Marker instances or types to activate.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider with the activator registered. |
Source code in src/waku/di/_providers.py
contextual
¶
Provide a dependency from the current context (e.g., app/request).
| PARAMETER | DESCRIPTION |
|---|---|
provided_type
|
The type to resolve from context.
TYPE:
|
scope
|
Scope of the context variable (default: Scope.REQUEST).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured for context resolution. |
Source code in src/waku/di/_providers.py
many
¶
Register multiple implementations as a collection.
| PARAMETER | DESCRIPTION |
|---|---|
interface
|
Interface type for the collection.
TYPE:
|
*implementations
|
Implementation types or factory functions to include in collection.
TYPE:
|
scope
|
Scope of the collection (default: Scope.REQUEST).
TYPE:
|
cache
|
Whether to cache the resolve results within scope.
TYPE:
|
when
|
Optional marker to conditionally activate the provider.
TYPE:
|
collect
|
Whether to include collect+alias for Sequence/list resolution. Set to False to only register implementations without the collector.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured for collection resolution. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If no implementations and collect is False. |
Source code in src/waku/di/_providers.py
object_
¶
Provide the exact object passed at creation time as a singleton dependency.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
The instance to provide as-is.
TYPE:
|
provided_type
|
Explicit type to provide (default: inferred).
TYPE:
|
when
|
Optional marker to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured to return the given object. |
Source code in src/waku/di/_providers.py
provider
¶
Source code in src/waku/di/_providers.py
scoped
¶
Create a scoped provider (lifetime: request).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional marker to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured for request scope. |
Source code in src/waku/di/_providers.py
singleton
¶
Create a singleton provider (lifetime: app).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional marker to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured for singleton scope. |
Source code in src/waku/di/_providers.py
transient
¶
Create a transient provider (new instance per injection).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional marker to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Provider configured for transient (no cache) scope. |
Source code in src/waku/di/_providers.py
eventsourcing
¶
ExpectedVersion
module-attribute
¶
ExpectedVersion = (
Exact | NoStream | StreamExists | AnyVersion
)
EventSourcedAggregate
¶
IDecider
¶
EventEnvelope
dataclass
¶
EventEnvelope(
*,
domain_event,
idempotency_key,
metadata=EventMetadata(),
)
IMetadataEnricher
¶
StoredEvent
dataclass
¶
StoredEvent(
*,
event_id,
stream_id,
event_type,
position,
global_position,
timestamp,
data,
metadata,
idempotency_key,
schema_version=1,
)
StreamId
dataclass
¶
for_aggregate
classmethod
¶
from_value
classmethod
¶
Source code in src/waku/eventsourcing/contracts/stream.py
StreamPosition
¶
DeciderCommandHandler
¶
Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
DeciderRepository
¶
Bases: ABC, Generic[StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
DeciderVoidCommandHandler
¶
Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
SnapshotDeciderRepository
¶
SnapshotDeciderRepository(
decider,
event_store,
snapshot_store,
snapshot_config_registry,
state_serializer,
)
Bases: DeciderRepository[StateT, CommandT, EventT], ABC
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
AggregateNotFoundError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
ConcurrencyConflictError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
ConflictingEventTypeError
¶
ConflictingEventTypeError(
event_type,
existing_name,
existing_version,
attempted_name,
attempted_version,
)
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
DuplicateAggregateNameError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
DuplicateEventTypeError
¶
DuplicateIdempotencyKeyError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
PartialDuplicateAppendError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
ProjectionError
¶
Bases: EventSourcingError
ProjectionStoppedError
¶
Bases: ProjectionError
Source code in src/waku/eventsourcing/exceptions.py
RegistryFrozenError
¶
RetryExhaustedError
¶
Bases: ProjectionError
Source code in src/waku/eventsourcing/exceptions.py
SnapshotConfigNotFoundError
¶
SnapshotMigrationChainError
¶
Bases: EventSourcingError
SnapshotTypeMismatchError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
StreamNotFoundError
¶
StreamTooLargeError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
UnknownEventTypeError
¶
UpcasterChainError
¶
Bases: EventSourcingError
EventSourcedCommandHandler
¶
Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, AggregateT]
Source code in src/waku/eventsourcing/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/handler.py
EventSourcedVoidCommandHandler
¶
Bases: EventSourcedCommandHandler[RequestT, None, AggregateT], ABC, Generic[RequestT, AggregateT]
Source code in src/waku/eventsourcing/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/handler.py
EventSourcingConfig
dataclass
¶
EventSourcingConfig(
*,
store=None,
event_serializer=None,
snapshot_store=None,
snapshot_state_serializer=None,
checkpoint_store=None,
enrichers=(),
)
EventSourcingExtension
dataclass
¶
Bases: OnModuleConfigure
bind_aggregate
¶
Source code in src/waku/eventsourcing/modules.py
bind_decider
¶
Source code in src/waku/eventsourcing/modules.py
bind_catch_up_projections
¶
aggregate_names
¶
Source code in src/waku/eventsourcing/modules.py
snapshot_bindings
¶
Source code in src/waku/eventsourcing/modules.py
on_module_configure
¶
Source code in src/waku/eventsourcing/modules.py
EventSourcingModule
¶
register
classmethod
¶
Source code in src/waku/eventsourcing/modules.py
EventType
dataclass
¶
SnapshotOptions
dataclass
¶
EventSourcedRepository
¶
Bases: ABC, Generic[AggregateT]
Source code in src/waku/eventsourcing/repository.py
load
async
¶
Source code in src/waku/eventsourcing/repository.py
save
async
¶
Source code in src/waku/eventsourcing/repository.py
create_aggregate
¶
Source code in src/waku/eventsourcing/repository.py
FnUpcaster
¶
Bases: IEventUpcaster
Source code in src/waku/eventsourcing/upcasting/fn.py
IEventUpcaster
¶
UpcasterChain
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
upcast
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
add_field
¶
Source code in src/waku/eventsourcing/upcasting/helpers.py
noop
¶
remove_field
¶
rename_field
¶
Source code in src/waku/eventsourcing/upcasting/helpers.py
upcast
¶
contracts
¶
ExpectedVersion
module-attribute
¶
ExpectedVersion = (
Exact | NoStream | StreamExists | AnyVersion
)
EventSourcedAggregate
¶
EventEnvelope
dataclass
¶
EventEnvelope(
*,
domain_event,
idempotency_key,
metadata=EventMetadata(),
)
IMetadataEnricher
¶
StoredEvent
dataclass
¶
StoredEvent(
*,
event_id,
stream_id,
event_type,
position,
global_position,
timestamp,
data,
metadata,
idempotency_key,
schema_version=1,
)
StreamId
dataclass
¶
for_aggregate
classmethod
¶
from_value
classmethod
¶
Source code in src/waku/eventsourcing/contracts/stream.py
StreamPosition
¶
aggregate
¶
IDecider
¶
event
¶
IMetadataEnricher
¶
EventEnvelope
dataclass
¶
EventEnvelope(
*,
domain_event,
idempotency_key,
metadata=EventMetadata(),
)
StoredEvent
dataclass
¶
StoredEvent(
*,
event_id,
stream_id,
event_type,
position,
global_position,
timestamp,
data,
metadata,
idempotency_key,
schema_version=1,
)
stream
¶
ExpectedVersion
module-attribute
¶
ExpectedVersion = (
Exact | NoStream | StreamExists | AnyVersion
)
StreamId
dataclass
¶
for_aggregate
classmethod
¶
from_value
classmethod
¶
Source code in src/waku/eventsourcing/contracts/stream.py
decider
¶
DeciderCommandHandler
¶
Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
DeciderVoidCommandHandler
¶
Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
DeciderRepository
¶
Bases: ABC, Generic[StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
SnapshotDeciderRepository
¶
SnapshotDeciderRepository(
decider,
event_store,
snapshot_store,
snapshot_config_registry,
state_serializer,
)
Bases: DeciderRepository[StateT, CommandT, EventT], ABC
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
handler
¶
DeciderCommandHandler
¶
Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
DeciderVoidCommandHandler
¶
Bases: DeciderCommandHandler[RequestT, None, StateT, CommandT, EventT], ABC, Generic[RequestT, StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/decider/handler.py
repository
¶
DeciderRepository
¶
Bases: ABC, Generic[StateT, CommandT, EventT]
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
SnapshotDeciderRepository
¶
SnapshotDeciderRepository(
decider,
event_store,
snapshot_store,
snapshot_config_registry,
state_serializer,
)
Bases: DeciderRepository[StateT, CommandT, EventT], ABC
Source code in src/waku/eventsourcing/decider/repository.py
load
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
save
async
¶
Source code in src/waku/eventsourcing/decider/repository.py
exceptions
¶
StreamNotFoundError
¶
ConcurrencyConflictError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
AggregateNotFoundError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
DuplicateAggregateNameError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
UnknownEventTypeError
¶
DuplicateEventTypeError
¶
ConflictingEventTypeError
¶
ConflictingEventTypeError(
event_type,
existing_name,
existing_version,
attempted_name,
attempted_version,
)
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
SnapshotTypeMismatchError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
StreamTooLargeError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
RegistryFrozenError
¶
ProjectionError
¶
Bases: EventSourcingError
ProjectionStoppedError
¶
Bases: ProjectionError
Source code in src/waku/eventsourcing/exceptions.py
RetryExhaustedError
¶
Bases: ProjectionError
Source code in src/waku/eventsourcing/exceptions.py
DuplicateIdempotencyKeyError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
PartialDuplicateAppendError
¶
Bases: EventSourcingError
Source code in src/waku/eventsourcing/exceptions.py
SnapshotConfigNotFoundError
¶
SnapshotMigrationChainError
¶
Bases: EventSourcingError
UpcasterChainError
¶
Bases: EventSourcingError
handler
¶
AggregateT
module-attribute
¶
AggregateT = TypeVar(
'AggregateT',
bound=EventSourcedAggregate,
default=EventSourcedAggregate,
)
EventSourcedCommandHandler
¶
Bases: RequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT, AggregateT]
Source code in src/waku/eventsourcing/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/handler.py
EventSourcedVoidCommandHandler
¶
Bases: EventSourcedCommandHandler[RequestT, None, AggregateT], ABC, Generic[RequestT, AggregateT]
Source code in src/waku/eventsourcing/handler.py
handle
async
¶
Source code in src/waku/eventsourcing/handler.py
modules
¶
EventType
dataclass
¶
SnapshotOptions
dataclass
¶
AggregateBinding
dataclass
¶
DeciderBinding
dataclass
¶
EventSourcingConfig
dataclass
¶
EventSourcingConfig(
*,
store=None,
event_serializer=None,
snapshot_store=None,
snapshot_state_serializer=None,
checkpoint_store=None,
enrichers=(),
)
EventSourcingRegistry
dataclass
¶
EventSourcingRegistry(
projection_types=list(),
catch_up_projection_types=list(),
event_type_bindings=list(),
)
projection_types
class-attribute
instance-attribute
¶
catch_up_projection_types
class-attribute
instance-attribute
¶
event_type_bindings
class-attribute
instance-attribute
¶
merge
¶
Source code in src/waku/eventsourcing/modules.py
freeze
¶
handler_providers
¶
Source code in src/waku/eventsourcing/modules.py
EventSourcingModule
¶
register
classmethod
¶
Source code in src/waku/eventsourcing/modules.py
EventSourcingExtension
dataclass
¶
Bases: OnModuleConfigure
bind_aggregate
¶
Source code in src/waku/eventsourcing/modules.py
bind_decider
¶
Source code in src/waku/eventsourcing/modules.py
bind_catch_up_projections
¶
aggregate_names
¶
Source code in src/waku/eventsourcing/modules.py
snapshot_bindings
¶
Source code in src/waku/eventsourcing/modules.py
on_module_configure
¶
Source code in src/waku/eventsourcing/modules.py
EventSourcingRegistryAggregator
¶
Bases: OnModuleRegistration
Source code in src/waku/eventsourcing/modules.py
on_module_registration
¶
Source code in src/waku/eventsourcing/modules.py
projection
¶
Checkpoint
dataclass
¶
CatchUpProjectionConfig
dataclass
¶
CatchUpProjectionConfig(
*,
batch_size=100,
max_attempts=3,
base_retry_delay_seconds=10.0,
max_retry_delay_seconds=300.0,
poll_interval_min_seconds=0.5,
poll_interval_max_seconds=5.0,
poll_interval_step_seconds=1.0,
poll_interval_jitter_factor=0.1,
)
LeaseConfig
dataclass
¶
InMemoryCheckpointStore
¶
ErrorPolicy
¶
ICatchUpProjection
¶
Bases: IProjection
project
abstractmethod
async
¶
IProjection
¶
CatchUpProjectionRunner
¶
Source code in src/waku/eventsourcing/projection/runner.py
run
async
¶
Source code in src/waku/eventsourcing/projection/runner.py
rebuild
async
¶
Source code in src/waku/eventsourcing/projection/runner.py
adaptive_interval
¶
AdaptiveInterval
¶
calculate_backoff_with_jitter
¶
Full jitter: random(0, min(base * 2^attempt, max_delay)).
Source code in src/waku/eventsourcing/projection/adaptive_interval.py
checkpoint
¶
config
¶
CatchUpProjectionConfig
dataclass
¶
CatchUpProjectionConfig(
*,
batch_size=100,
max_attempts=3,
base_retry_delay_seconds=10.0,
max_retry_delay_seconds=300.0,
poll_interval_min_seconds=0.5,
poll_interval_max_seconds=5.0,
poll_interval_step_seconds=1.0,
poll_interval_jitter_factor=0.1,
)
LeaseConfig
dataclass
¶
in_memory
¶
InMemoryCheckpointStore
¶
interfaces
¶
lock
¶
InMemoryProjectionLock
¶
Bases: IProjectionLock
Always acquires in single-process. Tracks held locks for testing.
Source code in src/waku/eventsourcing/projection/lock/in_memory.py
acquire
async
¶
Source code in src/waku/eventsourcing/projection/lock/in_memory.py
IProjectionLock
¶
in_memory
¶
InMemoryProjectionLock
¶
Bases: IProjectionLock
Always acquires in single-process. Tracks held locks for testing.
Source code in src/waku/eventsourcing/projection/lock/in_memory.py
acquire
async
¶Source code in src/waku/eventsourcing/projection/lock/in_memory.py
interfaces
¶
sqlalchemy
¶
PostgresAdvisoryProjectionLock
¶
Bases: IProjectionLock
Session-level PostgreSQL advisory lock.
Holds a database connection for the entire duration of the lock because
pg_advisory_lock is bound to the session — releasing the connection
releases the lock. For long-running projections consider
:class:PostgresLeaseProjectionLock which only connects during heartbeats.
Not compatible with PgBouncer in transaction-pooling mode.
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
acquire
async
¶Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
PostgresLeaseProjectionLock
¶
Bases: IProjectionLock
Production lease-based projection lock backed by PostgreSQL.
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
acquire
async
¶Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
bind_lease_tables
¶
advisory
¶
PostgresAdvisoryProjectionLock
¶
Bases: IProjectionLock
Session-level PostgreSQL advisory lock.
Holds a database connection for the entire duration of the lock because
pg_advisory_lock is bound to the session — releasing the connection
releases the lock. For long-running projections consider
:class:PostgresLeaseProjectionLock which only connects during heartbeats.
Not compatible with PgBouncer in transaction-pooling mode.
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
acquire
async
¶Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/advisory.py
lock
¶
PostgresLeaseProjectionLock
¶
Bases: IProjectionLock
Production lease-based projection lock backed by PostgreSQL.
Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
acquire
async
¶Source code in src/waku/eventsourcing/projection/lock/sqlalchemy/lock.py
tables
¶
es_projection_leases_table
module-attribute
¶es_projection_leases_table = Table(
'es_projection_leases',
_internal_metadata,
Column('projection_name', Text, primary_key=True),
Column('holder_id', Text, nullable=False),
Column(
'acquired_at',
TIMESTAMP(timezone=True),
server_default=now(),
),
Column(
'renewed_at',
TIMESTAMP(timezone=True),
server_default=now(),
),
Column(
'expires_at',
TIMESTAMP(timezone=True),
nullable=False,
),
)
processor
¶
ProjectionProcessor
¶
Source code in src/waku/eventsourcing/projection/processor.py
run_once
async
¶
Source code in src/waku/eventsourcing/projection/processor.py
reset_checkpoint
async
¶
runner
¶
CatchUpProjectionRunner
¶
Source code in src/waku/eventsourcing/projection/runner.py
run
async
¶
Source code in src/waku/eventsourcing/projection/runner.py
rebuild
async
¶
Source code in src/waku/eventsourcing/projection/runner.py
sqlalchemy
¶
SqlAlchemyCheckpointStore
¶
Bases: ICheckpointStore
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
load
async
¶
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
save
async
¶
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
make_sqlalchemy_checkpoint_store
¶
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
bind_checkpoint_tables
¶
store
¶
SqlAlchemyCheckpointStore
¶
Bases: ICheckpointStore
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
load
async
¶Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
save
async
¶Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
make_sqlalchemy_checkpoint_store
¶
Source code in src/waku/eventsourcing/projection/sqlalchemy/store.py
tables
¶
es_checkpoints_table
module-attribute
¶
es_checkpoints_table = Table(
'es_checkpoints',
_internal_metadata,
Column('projection_name', Text, primary_key=True),
Column('position', BigInteger, nullable=False),
Column(
'updated_at',
TIMESTAMP(timezone=True),
nullable=False,
),
Column(
'created_at',
TIMESTAMP(timezone=True),
server_default=now(),
),
)
repository
¶
EventSourcedRepository
¶
Bases: ABC, Generic[AggregateT]
Source code in src/waku/eventsourcing/repository.py
load
async
¶
Source code in src/waku/eventsourcing/repository.py
save
async
¶
Source code in src/waku/eventsourcing/repository.py
create_aggregate
¶
Source code in src/waku/eventsourcing/repository.py
serialization
¶
default_retort
module-attribute
¶
default_retort = Retort(
recipe=[
loader(UUID, UUID),
dumper(UUID, str),
loader(StreamId, from_value),
dumper(StreamId, str),
]
)
JsonEventSerializer
¶
JsonSnapshotStateSerializer
¶
Bases: ISnapshotStateSerializer
serialize
¶
EventTypeRegistry
¶
Source code in src/waku/eventsourcing/serialization/registry.py
register
¶
Source code in src/waku/eventsourcing/serialization/registry.py
add_alias
¶
Source code in src/waku/eventsourcing/serialization/registry.py
resolve
¶
get_name
¶
get_version
¶
interfaces
¶
json
¶
JsonEventSerializer
¶
JsonSnapshotStateSerializer
¶
Bases: ISnapshotStateSerializer
serialize
¶
registry
¶
EventTypeRegistry
¶
Source code in src/waku/eventsourcing/serialization/registry.py
register
¶
Source code in src/waku/eventsourcing/serialization/registry.py
add_alias
¶
Source code in src/waku/eventsourcing/serialization/registry.py
resolve
¶
get_name
¶
get_version
¶
snapshot
¶
JsonSnapshotStateSerializer
¶
Bases: ISnapshotStateSerializer
serialize
¶
InMemorySnapshotStore
¶
ISnapshotStrategy
¶
Snapshot
dataclass
¶
ISnapshotMigration
¶
SnapshotMigrationChain
¶
Source code in src/waku/eventsourcing/snapshot/migration.py
migrate
¶
Source code in src/waku/eventsourcing/snapshot/migration.py
SnapshotConfig
dataclass
¶
SnapshotConfigRegistry
¶
Source code in src/waku/eventsourcing/snapshot/registry.py
SnapshotEventSourcedRepository
¶
SnapshotEventSourcedRepository(
event_store,
snapshot_store,
snapshot_config_registry,
state_serializer,
)
Bases: EventSourcedRepository[AggregateT], ABC, Generic[AggregateT]
Source code in src/waku/eventsourcing/snapshot/repository.py
create_aggregate
¶
Source code in src/waku/eventsourcing/repository.py
load
async
¶
Source code in src/waku/eventsourcing/snapshot/repository.py
save
async
¶
Source code in src/waku/eventsourcing/snapshot/repository.py
EventCountStrategy
¶
Bases: ISnapshotStrategy
Source code in src/waku/eventsourcing/snapshot/strategy.py
in_memory
¶
InMemorySnapshotStore
¶
migration
¶
ISnapshotMigration
¶
SnapshotMigrationChain
¶
Source code in src/waku/eventsourcing/snapshot/migration.py
migrate
¶
Source code in src/waku/eventsourcing/snapshot/migration.py
migrate_snapshot_or_discard
¶
Source code in src/waku/eventsourcing/snapshot/migration.py
registry
¶
repository
¶
SnapshotEventSourcedRepository
¶
SnapshotEventSourcedRepository(
event_store,
snapshot_store,
snapshot_config_registry,
state_serializer,
)
Bases: EventSourcedRepository[AggregateT], ABC, Generic[AggregateT]
Source code in src/waku/eventsourcing/snapshot/repository.py
load
async
¶
Source code in src/waku/eventsourcing/snapshot/repository.py
save
async
¶
Source code in src/waku/eventsourcing/snapshot/repository.py
create_aggregate
¶
Source code in src/waku/eventsourcing/repository.py
sqlalchemy
¶
SqlAlchemySnapshotStore
¶
Bases: ISnapshotStore
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
load
async
¶
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
save
async
¶
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
make_sqlalchemy_snapshot_store
¶
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
bind_snapshot_tables
¶
store
¶
SqlAlchemySnapshotStore
¶
Bases: ISnapshotStore
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
load
async
¶Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
save
async
¶Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
make_sqlalchemy_snapshot_store
¶
Source code in src/waku/eventsourcing/snapshot/sqlalchemy/store.py
tables
¶
es_snapshots_table
module-attribute
¶
es_snapshots_table = Table(
'es_snapshots',
_internal_metadata,
Column('stream_id', Text, primary_key=True),
Column('state', JSONB, nullable=False),
Column('version', Integer, nullable=False),
Column('state_type', Text, nullable=False),
Column(
'schema_version',
Integer,
nullable=False,
server_default='1',
),
Column(
'created_at',
TIMESTAMP(timezone=True),
server_default=now(),
),
Column(
'updated_at',
TIMESTAMP(timezone=True),
server_default=now(),
onupdate=now(),
),
)
strategy
¶
EventCountStrategy
¶
Bases: ISnapshotStrategy
Source code in src/waku/eventsourcing/snapshot/strategy.py
store
¶
InMemoryEventStore
¶
Bases: IEventStore
Source code in src/waku/eventsourcing/store/in_memory.py
read_stream
async
¶
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
read_all
async
¶
Source code in src/waku/eventsourcing/store/in_memory.py
stream_exists
async
¶
append_to_stream
async
¶
Source code in src/waku/eventsourcing/store/in_memory.py
IEventReader
¶
IEventStore
¶
Bases: IEventReader, IEventWriter, ABC
in_memory
¶
InMemoryEventStore
¶
Bases: IEventStore
Source code in src/waku/eventsourcing/store/in_memory.py
read_stream
async
¶
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/in_memory.py
read_all
async
¶
Source code in src/waku/eventsourcing/store/in_memory.py
stream_exists
async
¶
append_to_stream
async
¶
Source code in src/waku/eventsourcing/store/in_memory.py
interfaces
¶
IEventReader
¶
IEventStore
¶
Bases: IEventReader, IEventWriter, ABC
sqlalchemy
¶
SqlAlchemyEventStore
¶
SqlAlchemyEventStore(
session,
serializer,
registry,
tables,
upcaster_chain,
projections=(),
enrichers=(),
)
Bases: IEventStore
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
read_stream
async
¶
read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
read_all
async
¶
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
stream_exists
async
¶
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
append_to_stream
async
¶
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
EventStoreTables
dataclass
¶
make_sqlalchemy_event_store
¶
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
bind_event_store_tables
¶
store
¶
SqlAlchemyEventStore
¶
SqlAlchemyEventStore(
session,
serializer,
registry,
tables,
upcaster_chain,
projections=(),
enrichers=(),
)
Bases: IEventStore
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
read_stream
async
¶read_stream(stream_id, /, *, start=START, count=None)
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
read_all
async
¶Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
stream_exists
async
¶Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
append_to_stream
async
¶Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
make_sqlalchemy_event_store
¶
Source code in src/waku/eventsourcing/store/sqlalchemy/store.py
tables
¶
IDEMPOTENCY_KEY_CONSTRAINT
module-attribute
¶
es_streams_table
module-attribute
¶
es_streams_table = Table(
'es_streams',
_internal_metadata,
Column('stream_id', Text, primary_key=True),
Column('stream_type', Text, nullable=False),
Column(
'version',
Integer,
nullable=False,
server_default='0',
),
Column(
'created_at',
TIMESTAMP(timezone=True),
server_default=now(),
),
Column(
'updated_at',
TIMESTAMP(timezone=True),
server_default=now(),
onupdate=now(),
),
)
es_events_table
module-attribute
¶
es_events_table = Table(
'es_events',
_internal_metadata,
Column(
'event_id', UUID(as_uuid=True), primary_key=True
),
Column('stream_id', Text, nullable=False),
Column('event_type', Text, nullable=False),
Column('position', Integer, nullable=False),
Column(
'global_position',
BigInteger,
Identity(
always=True, start=0, minvalue=0, cycle=False
),
nullable=False,
),
Column('data', JSONB, nullable=False),
Column('metadata', JSONB, nullable=False),
Column(
'timestamp',
TIMESTAMP(timezone=True),
nullable=False,
),
Column(
'schema_version',
Integer,
nullable=False,
server_default='1',
),
Column('idempotency_key', Text, nullable=False),
UniqueConstraint(
'stream_id',
'position',
name='uq_es_events_stream_id_position',
),
UniqueConstraint(
'stream_id',
'idempotency_key',
name=IDEMPOTENCY_KEY_CONSTRAINT,
),
Index(
'ix_es_events_global_position', 'global_position'
),
Index('ix_es_events_event_type', 'event_type'),
)
EventStoreTables
dataclass
¶
bind_event_store_tables
¶
testing
¶
DeciderSpec
¶
Bases: Generic[StateT, CommandT, EventT]
Given/When/Then DSL for testing IDecider implementations.
Example::
DeciderSpec.for_(decider).given([event]).when(command).then([expected])
Source code in src/waku/eventsourcing/testing.py
upcasting
¶
UpcasterChain
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
upcast
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
FnUpcaster
¶
Bases: IEventUpcaster
Source code in src/waku/eventsourcing/upcasting/fn.py
IEventUpcaster
¶
add_field
¶
Source code in src/waku/eventsourcing/upcasting/helpers.py
noop
¶
remove_field
¶
rename_field
¶
Source code in src/waku/eventsourcing/upcasting/helpers.py
upcast
¶
chain
¶
UpcasterChain
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
upcast
¶
Source code in src/waku/eventsourcing/upcasting/chain.py
fn
¶
FnUpcaster
¶
Bases: IEventUpcaster
Source code in src/waku/eventsourcing/upcasting/fn.py
extensions
¶
ApplicationExtension
module-attribute
¶
ApplicationExtension = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnModuleRegistration
)
ModuleExtension
module-attribute
¶
ModuleExtension = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnModuleRegistration
)
DEFAULT_EXTENSIONS
module-attribute
¶
DEFAULT_EXTENSIONS = (
ValidationExtension(
[DependenciesAccessibleRule()], strict=True
),
)
AfterApplicationInit
¶
OnApplicationInit
¶
OnApplicationShutdown
¶
OnModuleDestroy
¶
OnModuleInit
¶
OnModuleRegistration
¶
Bases: Protocol
Extension for contributing providers to module metadata during registration.
This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (assigned to root module)
- Module-level extensions (in topological order)
Key differences from OnModuleConfigure
- Runs after ALL modules are collected (cross-module visibility)
- Receives registry with access to all modules' metadata
- Can add providers to owning module
on_module_registration
¶
Contribute providers to module metadata before Module objects are created.
| PARAMETER | DESCRIPTION |
|---|---|
registry
|
Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.
TYPE:
|
owning_module
|
The module type that owns this extension. Providers added via registry.add_provider() should target this module.
TYPE:
|
context
|
Application context passed to WakuFactory (read-only). |
Source code in src/waku/extensions/protocols.py
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
get_application_extensions
¶
get_module_extensions
¶
Source code in src/waku/extensions/registry.py
protocols
¶
Extension protocols for application and module lifecycle hooks.
ApplicationExtension
module-attribute
¶
ApplicationExtension = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnModuleRegistration
)
ModuleExtension
module-attribute
¶
ModuleExtension = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnModuleRegistration
)
OnApplicationInit
¶
AfterApplicationInit
¶
OnApplicationShutdown
¶
OnModuleRegistration
¶
Bases: Protocol
Extension for contributing providers to module metadata during registration.
This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (assigned to root module)
- Module-level extensions (in topological order)
Key differences from OnModuleConfigure
- Runs after ALL modules are collected (cross-module visibility)
- Receives registry with access to all modules' metadata
- Can add providers to owning module
on_module_registration
¶
Contribute providers to module metadata before Module objects are created.
| PARAMETER | DESCRIPTION |
|---|---|
registry
|
Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.
TYPE:
|
owning_module
|
The module type that owns this extension. Providers added via registry.add_provider() should target this module.
TYPE:
|
context
|
Application context passed to WakuFactory (read-only). |
Source code in src/waku/extensions/protocols.py
OnModuleInit
¶
registry
¶
Extension registry for centralized management of extensions.
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
get_application_extensions
¶
get_module_extensions
¶
Source code in src/waku/extensions/registry.py
factory
¶
ContainerConfig
dataclass
¶
ContainerConfig(
*,
lock_factory=Lock,
start_scope=None,
skip_validation=False,
)
WakuFactory
¶
WakuFactory(
root_module_type,
/,
context=None,
lifespan=(),
extensions=DEFAULT_EXTENSIONS,
container_config=None,
)
Source code in src/waku/factory.py
create
¶
Source code in src/waku/factory.py
lifespan
¶
LifespanFunc
module-attribute
¶
LifespanFunc = (
Callable[
['WakuApplication'],
AbstractAsyncContextManager[None],
]
| AbstractAsyncContextManager[None]
)
modules
¶
DynamicModule
dataclass
¶
DynamicModule(
*,
providers=list(),
imports=list(),
exports=list(),
extensions=list(),
is_global=False,
id=uuid4(),
parent_module,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
List of module extensions for lifecycle hooks.
is_global
class-attribute
instance-attribute
¶
Whether this module is global or not.
ModuleMetadata
dataclass
¶
ModuleMetadata(
*,
providers=list(),
imports=list(),
exports=list(),
extensions=list(),
is_global=False,
id=uuid4(),
)
providers
class-attribute
instance-attribute
¶
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
List of module extensions for lifecycle hooks.
is_global
class-attribute
instance-attribute
¶
Whether this module is global or not.
ModuleMetadataRegistry
¶
Registry providing access to collected module metadata.
Provides read access to all modules' metadata for aggregation purposes, with controlled write access through explicit methods.
This class is used during the module registration phase to enable cross-module aggregation of providers.
Source code in src/waku/modules/_metadata_registry.py
get_metadata
¶
find_extensions
¶
Find all extensions of a given type across all modules.
Yields (module_type, extension) pairs in topological order. This is useful for aggregating data from extensions across modules.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The extension protocol/type to search for.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
tuple[ModuleType, _ExtT]
|
Tuples of (module_type, extension) for each matching extension. |
Source code in src/waku/modules/_metadata_registry.py
add_provider
¶
Add a provider to a module's metadata.
This is the preferred way to add providers during registration hooks. The provider will become part of the owning module.
| PARAMETER | DESCRIPTION |
|---|---|
module_type
|
The module to add the provider to.
TYPE:
|
provider
|
The provider specification to add.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
KeyError
|
If module_type is not in the registry. |
Source code in src/waku/modules/_metadata_registry.py
Module
¶
Source code in src/waku/modules/_module.py
ModuleRegistry
¶
Immutable registry and graph for module queries, traversal, and lookups.
Source code in src/waku/modules/_registry.py
get
¶
Source code in src/waku/modules/_registry.py
get_by_id
¶
traverse
¶
Traverse the module graph in depth-first post-order (children before parent) recursively.
| PARAMETER | DESCRIPTION |
|---|---|
from_
|
Start module (default: root)
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Module
|
Each traversed module (post-order)
TYPE::
|
Source code in src/waku/modules/_registry.py
ModuleRegistryBuilder
¶
Source code in src/waku/modules/_registry_builder.py
build
¶
module
¶
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
testing
¶
override
¶
Temporarily override providers and/or context in an AsyncContainer for testing.
| PARAMETER | DESCRIPTION |
|---|---|
container
|
The container whose providers/context will be overridden.
TYPE:
|
*providers
|
Providers to override in the container.
TYPE:
|
context
|
Context values to override. |
| YIELDS | DESCRIPTION |
|---|---|
None
|
Context in which the container uses the overridden providers/context.
TYPE::
|
Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override
class Service: ...
class ServiceOverride(Service): ...
# Override providers
with override(application.container, singleton(ServiceOverride, provided_type=Service)):
service = await application.container.get(Service)
assert isinstance(service, ServiceOverride)
# Override context
with override(application.container, context={int: 123}):
...
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If container is not at root (APP) scope. |
Source code in src/waku/testing.py
create_test_app
async
¶
create_test_app(
*,
base=None,
providers=(),
imports=(),
extensions=(),
app_extensions=DEFAULT_EXTENSIONS,
context=None,
)
Create a minimal test application with given configuration.
Useful for testing extensions and module configurations in isolation without needing to set up a full application structure.
| PARAMETER | DESCRIPTION |
|---|---|
base
|
Base module to build upon. When provided, the test module imports this module and providers act as overrides.
TYPE:
|
providers
|
Providers to register in the test module.
When
TYPE:
|
imports
|
Additional modules to import into the test module.
TYPE:
|
extensions
|
Module extensions to register.
TYPE:
|
app_extensions
|
Application extensions to register (default: DEFAULT_EXTENSIONS).
TYPE:
|
context
|
Context values to pass to the container. |
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[WakuApplication]
|
Initialized WakuApplication. |
Example
from waku.testing import create_test_app
from waku.di import singleton
class IRepository(Protocol):
async def get(self, id: str) -> Entity: ...
class FakeRepository(IRepository):
async def get(self, id: str) -> Entity:
return Entity(id=id)
# Create test app from scratch
async def test_my_extension():
extension = MyExtension().bind(SomeEvent, SomeHandler)
async with create_test_app(
extensions=[extension],
providers=[singleton(IRepository, FakeRepository)],
) as app:
service = await app.container.get(MyService)
result = await service.do_something()
assert result == expected
# Create test app based on existing module with overrides
async def test_with_base_module():
async with create_test_app(
base=AppModule,
providers=[singleton(IRepository, FakeRepository)],
) as app:
# FakeRepository replaces the real one from AppModule
repo = await app.container.get(IRepository)
assert isinstance(repo, FakeRepository)
Source code in src/waku/testing.py
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | |
validation
¶
ValidationRule
¶
ValidationExtension
¶
rules
¶
DependenciesAccessibleRule
¶
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.
Source code in src/waku/validation/rules/dependency_accessible.py
validate
¶
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyInaccessibleError
¶
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
dependency_accessible
¶
DependencyInaccessibleError
¶
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
AccessibilityStrategy
¶
GlobalProvidersStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by a global module or APP-scoped context.
Source code in src/waku/validation/rules/dependency_accessible.py
LocalProvidersStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by the module itself.
Source code in src/waku/validation/rules/dependency_accessible.py
ContextVarsStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by application or request container context.
Source code in src/waku/validation/rules/dependency_accessible.py
ImportedModulesStrategy
¶
Bases: AccessibilityStrategy
Check if type is accessible via imported modules (direct export or re-export).
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyAccessChecker
¶
Handles dependency accessibility checks between modules.
Source code in src/waku/validation/rules/dependency_accessible.py
find_inaccessible_dependencies
¶
Source code in src/waku/validation/rules/dependency_accessible.py
DependenciesAccessibleRule
¶
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.