waku
¶
WakuApplication
¶
WakuApplication(
*,
container: AsyncContainer,
registry: ModuleRegistry,
lifespan: Sequence[LifespanFunc | LifespanWrapper],
extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
initialize
async
¶
WakuFactory
¶
WakuFactory(
root_module_type: ModuleType,
/,
context: dict[Any, Any] | None = None,
lifespan: Sequence[LifespanFunc] = (),
extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
container_config: ContainerConfig | None = None,
provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
create
¶
create() -> WakuApplication
Source code in src/waku/factory.py
DynamicModule
dataclass
¶
DynamicModule(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
parent_module: ModuleType,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
Module
¶
Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
exports
instance-attribute
¶
exports: Final[
Sequence[type[object] | ModuleType | DynamicModule]
] = exports
provider
property
¶
Get the aggregated provider for this module.
This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If create_provider() has not been called yet. |
create_provider
¶
create_provider(
context: dict[Any, Any] | None,
builder: ActivationBuilder,
provider_filter: IProviderFilter,
) -> BaseProvider
Create aggregated provider with activation filtering applied.
| PARAMETER | DESCRIPTION |
|---|---|
context
|
Context dict for activation decisions. |
builder
|
Activation builder for checking if types are registered.
TYPE:
|
provider_filter
|
Filter strategy for conditional provider activation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseProvider
|
BaseProvider with only active providers aggregated. |
Source code in src/waku/modules/_module.py
module
¶
module(
*,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
exports: Sequence[
type[object] | ModuleType | DynamicModule
] = (),
extensions: Sequence[ModuleExtension] = (),
is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
application
¶
WakuApplication
¶
WakuApplication(
*,
container: AsyncContainer,
registry: ModuleRegistry,
lifespan: Sequence[LifespanFunc | LifespanWrapper],
extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
initialize
async
¶
cqrs
¶
NextHandlerType
module-attribute
¶
Event
dataclass
¶
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
handle
abstractmethod
async
¶
handle(event: NotificationT) -> None
INotificationHandler
¶
Bases: Protocol[NotificationT]
Protocol for notification/event handlers.
MediatR equivalent: INotificationHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
async def handle(self, event: OrderPlaced, /) -> None:
await self._send_confirmation_email(event.order_id)
handle
async
¶
handle(event: NotificationT) -> None
Mediator
¶
Mediator(
container: AsyncContainer,
event_publisher: EventPublisher,
request_map: RequestMap,
event_map: EventMap,
behavior_map: PipelineBehaviorMap,
)
Bases: IMediator
Default CQRS implementation.
Source code in src/waku/cqrs/impl.py
send
async
¶
publish
async
¶
publish(notification: INotification) -> None
IMediator
¶
Bases: ISender, IPublisher, ABC
Defines a cqrs to encapsulate request/response and publishing interaction patterns.
publish
abstractmethod
async
¶
publish(notification: INotification) -> None
send
abstractmethod
async
¶
IPublisher
¶
Bases: ABC
Publish notification through the cqrs to be handled by multiple handlers.
publish
abstractmethod
async
¶
publish(notification: INotification) -> None
ISender
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type: type[
IMediator
] = Mediator,
event_publisher: type[
EventPublisher
] = SequentialEventPublisher,
pipeline_behaviors: Sequence[
type[IPipelineBehavior[Any, Any]]
] = (),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
mediator_implementation_type
class-attribute
instance-attribute
¶
event_publisher
class-attribute
instance-attribute
¶
event_publisher: type[EventPublisher] = (
SequentialEventPublisher
)
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
bind_request
¶
bind_request(
request_type: type[RequestT],
handler_type: type[RequestHandler[RequestT, Any]],
*,
behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
| None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
bind_event
¶
bind_event(
event_type: type[NotificationT],
handler_types: list[type[EventHandler[NotificationT]]],
) -> Self
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
MediatorModule
¶
register
classmethod
¶
register(
config: MediatorConfig | None = None,
) -> DynamicModule
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
IRequestHandler
¶
Bases: Protocol[RequestT, ResponseT]
Protocol for request handlers (commands/queries).
MediatR equivalent: IRequestHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
async def handle(self, request: GetUserQuery, /) -> UserDTO:
return await self._repository.get(request.user_id)
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
contracts
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
NextHandlerType
module-attribute
¶
RequestT
module-attribute
¶
ResponseT
module-attribute
¶
Event
dataclass
¶
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
event
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
Event
dataclass
¶
Bases: INotification
Convenience base class for events with auto-generated ID.
Use this class when you want automatic event_id generation. For custom identification strategies, implement INotification directly.
Example::
@dataclass(frozen=True, kw_only=True)
class UserCreated(Event):
user_id: str
email: str
notification
¶
NotificationT
module-attribute
¶
NotificationT = TypeVar(
'NotificationT', bound=INotification, contravariant=True
)
INotification
¶
Bases: Protocol
Marker interface for notification-type objects (events).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for domain events that need custom identification strategies or no identification at all.
MediatR equivalent: INotification
Example::
@dataclass(frozen=True)
class OrderPlaced(INotification):
order_id: str
customer_id: str
# Or with custom identification:
@dataclass(frozen=True)
class DomainEvent(INotification):
aggregate_id: str
occurred_at: datetime
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str
pipeline
¶
NextHandlerType
module-attribute
¶
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
request
¶
ResponseT
module-attribute
¶
RequestT
module-attribute
¶
IRequest
¶
Marker interface for request-type objects (commands/queries).
This is a pure marker protocol with no required attributes or methods. Implement this protocol for requests that need custom identification strategies or no identification at all.
MediatR equivalent: IRequest
Example::
@dataclass(frozen=True)
class GetUserQuery(IRequest[UserDTO]):
user_id: str
@dataclass(frozen=True)
class CreateOrderCommand(IRequest[OrderId]):
customer_id: str
items: list[OrderItem]
Request
dataclass
¶
Bases: IRequest[ResponseT], Generic[ResponseT]
Convenience base class for requests with auto-generated ID.
Use this class when you want automatic request_id generation. For custom identification strategies, implement IRequest directly.
Example::
@dataclass(frozen=True, kw_only=True)
class GetUserQuery(Request[UserDTO]):
user_id: str
events
¶
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
handle
abstractmethod
async
¶
handle(event: NotificationT) -> None
GroupEventPublisher
¶
Bases: EventPublisher
SequentialEventPublisher
¶
Bases: EventPublisher
handler
¶
INotificationHandler
¶
Bases: Protocol[NotificationT]
Protocol for notification/event handlers.
MediatR equivalent: INotificationHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class OrderPlacedHandler(INotificationHandler[OrderPlaced]):
async def handle(self, event: OrderPlaced, /) -> None:
await self._send_confirmation_email(event.order_id)
handle
async
¶
handle(event: NotificationT) -> None
EventHandler
¶
Bases: INotificationHandler[NotificationT], ABC, Generic[NotificationT]
Abstract base class for event handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement INotificationHandler directly.
Example::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, 'New user joined!')
handle
abstractmethod
async
¶
handle(event: NotificationT) -> None
map
¶
EventMapRegistry
module-attribute
¶
EventMapRegistry: TypeAlias = MutableMapping[
type[INotification], EventMapEntry[INotification]
]
EventMapEntry
dataclass
¶
EventMapEntry(
di_lookup_type: type[EventHandler[_EventT]],
handler_types: list[
type[EventHandler[_EventT]]
] = list(),
)
Bases: Generic[_EventT]
EventMap
¶
Source code in src/waku/cqrs/events/map.py
bind
¶
bind(
event_type: type[NotificationT],
handler_types: list[type[EventHandler[NotificationT]]],
) -> Self
Source code in src/waku/cqrs/events/map.py
merge
¶
has_handlers
¶
has_handlers(event_type: type[INotification]) -> bool
get_handler_type
¶
get_handler_type(
event_type: type[INotification],
) -> type[EventHandler[INotification]]
publish
¶
SequentialEventPublisher
¶
Bases: EventPublisher
GroupEventPublisher
¶
Bases: EventPublisher
exceptions
¶
ImproperlyConfiguredError
¶
Bases: MediatorError
Raised when cqrs configuration is invalid.
RequestHandlerAlreadyRegistered
¶
RequestHandlerAlreadyRegistered(
request_type: type[IRequest[Any]],
handler_type: type[IRequestHandler[Any, Any]],
)
Bases: MediatorError, KeyError
Raised when a request handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
RequestHandlerNotFound
¶
Bases: MediatorError, TypeError
Raised when a request handler is not found.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
Source code in src/waku/cqrs/exceptions.py
EventHandlerAlreadyRegistered
¶
EventHandlerAlreadyRegistered(
event_type: type[INotification],
handler_type: type[INotificationHandler[Any]],
)
Bases: MediatorError, KeyError
Raised when an notification handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
event_type |
The type of notification that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
PipelineBehaviorAlreadyRegistered
¶
PipelineBehaviorAlreadyRegistered(
request_type: type[IRequest[Any]],
behavior_type: type[IPipelineBehavior[Any, Any]],
)
Bases: MediatorError, KeyError
Raised when a pipeline behavior is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
behavior_type |
The type of behavior that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
impl
¶
Mediator
¶
Mediator(
container: AsyncContainer,
event_publisher: EventPublisher,
request_map: RequestMap,
event_map: EventMap,
behavior_map: PipelineBehaviorMap,
)
Bases: IMediator
Default CQRS implementation.
Source code in src/waku/cqrs/impl.py
send
async
¶
publish
async
¶
publish(notification: INotification) -> None
interfaces
¶
ISender
¶
IPublisher
¶
Bases: ABC
Publish notification through the cqrs to be handled by multiple handlers.
publish
abstractmethod
async
¶
publish(notification: INotification) -> None
IMediator
¶
Bases: ISender, IPublisher, ABC
Defines a cqrs to encapsulate request/response and publishing interaction patterns.
publish
abstractmethod
async
¶
publish(notification: INotification) -> None
send
abstractmethod
async
¶
modules
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type: type[
IMediator
] = Mediator,
event_publisher: type[
EventPublisher
] = SequentialEventPublisher,
pipeline_behaviors: Sequence[
type[IPipelineBehavior[Any, Any]]
] = (),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
mediator_implementation_type
class-attribute
instance-attribute
¶
event_publisher
class-attribute
instance-attribute
¶
event_publisher: type[EventPublisher] = (
SequentialEventPublisher
)
MediatorModule
¶
register
classmethod
¶
register(
config: MediatorConfig | None = None,
) -> DynamicModule
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
bind_request
¶
bind_request(
request_type: type[RequestT],
handler_type: type[RequestHandler[RequestT, Any]],
*,
behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
| None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
bind_event
¶
bind_event(
event_type: type[NotificationT],
handler_types: list[type[EventHandler[NotificationT]]],
) -> Self
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
RequestMapAggregator
¶
Bases: OnModuleRegistration
on_module_registration
¶
on_module_registration(
registry: ModuleMetadataRegistry,
owning_module: ModuleType,
context: Mapping[Any, Any] | None,
) -> None
Source code in src/waku/cqrs/modules.py
EventMapAggregator
¶
Bases: OnModuleRegistration
on_module_registration
¶
on_module_registration(
registry: ModuleMetadataRegistry,
owning_module: ModuleType,
context: Mapping[Any, Any] | None,
) -> None
Source code in src/waku/cqrs/modules.py
PipelineBehaviorMapAggregator
¶
Bases: OnModuleRegistration
on_module_registration
¶
on_module_registration(
registry: ModuleMetadataRegistry,
owning_module: ModuleType,
context: Mapping[Any, Any] | None,
) -> None
Source code in src/waku/cqrs/modules.py
pipeline
¶
PipelineBehaviorWrapper
¶
PipelineBehaviorWrapper(
behaviors: Sequence[
IPipelineBehavior[RequestT, ResponseT]
],
)
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
wrap(
handle: NextHandlerType[RequestT, ResponseT],
) -> NextHandlerType[RequestT, ResponseT]
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
chain
¶
PipelineBehaviorWrapper
¶
PipelineBehaviorWrapper(
behaviors: Sequence[
IPipelineBehavior[RequestT, ResponseT]
],
)
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
wrap(
handle: NextHandlerType[RequestT, ResponseT],
) -> NextHandlerType[RequestT, ResponseT]
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
map
¶
PipelineBehaviorMapRegistry
module-attribute
¶
PipelineBehaviorMapRegistry: TypeAlias = MutableMapping[
type[RequestT], PipelineBehaviorMapEntry
]
PipelineBehaviorMapEntry
dataclass
¶
PipelineBehaviorMapEntry(
di_lookup_type: type[IPipelineBehavior[Any, Any]],
behavior_types: list[
type[IPipelineBehavior[Any, Any]]
] = list(),
)
PipelineBehaviorMap
¶
Source code in src/waku/cqrs/pipeline/map.py
bind
¶
bind(
request_type: type[RequestT],
behavior_types: list[
type[IPipelineBehavior[RequestT, ResponseT]]
],
) -> Self
Source code in src/waku/cqrs/pipeline/map.py
merge
¶
merge(other: PipelineBehaviorMap) -> Self
has_behaviors
¶
requests
¶
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handler
¶
IRequestHandler
¶
Bases: Protocol[RequestT, ResponseT]
Protocol for request handlers (commands/queries).
MediatR equivalent: IRequestHandler
This protocol allows structural subtyping - any class with a matching
handle method signature is compatible.
Example::
class GetUserQueryHandler(IRequestHandler[GetUserQuery, UserDTO]):
async def handle(self, request: GetUserQuery, /) -> UserDTO:
return await self._repository.get(request.user_id)
RequestHandler
¶
Bases: IRequestHandler[RequestT, ResponseT], ABC, Generic[RequestT, ResponseT]
Abstract base class for request handlers.
Use this class when you want explicit ABC inheritance and type checking. For structural subtyping, implement IRequestHandler directly.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
map
¶
RequestMapRegistry
module-attribute
¶
RequestMapRegistry: TypeAlias = MutableMapping[
type[IRequest[Response | None]],
RequestMapEntry[
IRequest[Response | None], Response | None
],
]
RequestMapEntry
dataclass
¶
RequestMapEntry(
handler_type: type[RequestHandler[_MapReqT, _MapResT]],
di_lookup_type: type[
RequestHandler[_MapReqT, _MapResT]
],
)
Bases: Generic[_MapReqT, _MapResT]
RequestMap
¶
Source code in src/waku/cqrs/requests/map.py
bind
¶
bind(
request_type: type[RequestT],
handler_type: type[RequestHandler[RequestT, ResponseT]],
) -> Self
Source code in src/waku/cqrs/requests/map.py
merge
¶
merge(other: RequestMap) -> Self
Source code in src/waku/cqrs/requests/map.py
has_handler
¶
utils
¶
get_request_response_type
cached
¶
Extract the response type from an IRequest implementation.
Searches through the class hierarchy to find IRequest or its subclasses, supporting direct implementations, Request subclasses, and nested inheritance.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
if response type cannot be extracted from the request type. |
Source code in src/waku/cqrs/utils.py
di
¶
ActivationBuilder
¶
ActivationContext
¶
ConditionalProvider
dataclass
¶
IProviderFilter
¶
Bases: Protocol
Strategy for filtering providers based on activation context.
filter
¶
filter(
providers: list[ProviderSpec],
context: dict[Any, Any] | None,
module_type: ModuleType | DynamicModule,
builder: ActivationBuilder,
) -> list[Provider]
ProviderFilter
dataclass
¶
Default provider filter implementation.
filter
¶
filter(
providers: list[ProviderSpec],
context: dict[Any, Any] | None,
module_type: ModuleType | DynamicModule,
builder: ActivationBuilder,
) -> list[Provider]
Source code in src/waku/di/_activation.py
contextual
¶
contextual(
provided_type: Any, *, scope: Scope = REQUEST
) -> Provider
contextual(
provided_type: Any,
*,
scope: Scope = REQUEST,
when: Activator,
) -> ConditionalProvider
contextual(
provided_type: Any,
*,
scope: Scope = REQUEST,
when: Activator | None = None,
) -> ProviderSpec
Provide a dependency from the current context (e.g., app/request).
| PARAMETER | DESCRIPTION |
|---|---|
provided_type
|
The type to resolve from context.
TYPE:
|
scope
|
Scope of the context variable (default: Scope.REQUEST).
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
many
¶
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
) -> Provider
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
when: Activator,
) -> ConditionalProvider
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
when: Activator | None = None,
) -> ProviderSpec
Register multiple implementations as a collection.
| PARAMETER | DESCRIPTION |
|---|---|
interface
|
Interface type for the collection.
TYPE:
|
*implementations
|
Implementation types or factory functions to include in collection.
TYPE:
|
scope
|
Scope of the collection (default: Scope.REQUEST).
TYPE:
|
cache
|
Whether to cache the resolve results within scope.
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If no implementations are provided. |
TypeError
|
If a factory function lacks a return type annotation. |
Source code in src/waku/di/_providers.py
object_
¶
object_(
obj: Any,
*,
provided_type: Any | None = None,
when: Activator,
) -> ConditionalProvider
object_(
obj: Any,
*,
provided_type: Any | None = None,
when: Activator | None = None,
) -> ProviderSpec
Provide the exact object passed at creation time as a singleton dependency.
The provider always returns the same object instance, without instantiation or copying.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
The instance to provide as-is.
TYPE:
|
provided_type
|
Explicit type to provide (default: inferred).
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
provider
¶
provider(
source: Callable[..., Any] | type[Any],
*,
scope: Scope = REQUEST,
provided_type: Any | None = None,
cache: bool = True,
) -> Provider
Create a Dishka provider for a callable or type.
| PARAMETER | DESCRIPTION |
|---|---|
source
|
Callable or type to provide as a dependency. |
scope
|
Scope of the dependency (default: Scope.REQUEST).
TYPE:
|
provided_type
|
Explicit type to provide (default: inferred).
TYPE:
|
cache
|
Whether to cache the instance in the scope.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Configured provider instance.
TYPE:
|
Source code in src/waku/di/_providers.py
scoped
¶
scoped(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
scoped(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
scoped(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a scoped provider (lifetime: request).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
singleton
¶
singleton(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
singleton(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
singleton(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a singleton provider (lifetime: app).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
transient
¶
transient(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
transient(
interface: Any,
implementation: type[_T" backlink-type="used-by" backlink-anchor="waku.di.transient" optional hover>_T] | Callable[..., _T],
) -> Provider
transient(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
transient(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a transient provider (new instance per injection).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
extensions
¶
ApplicationExtension
module-attribute
¶
ApplicationExtension: TypeAlias = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnModuleRegistration
)
ModuleExtension
module-attribute
¶
ModuleExtension: TypeAlias = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnModuleRegistration
)
DEFAULT_EXTENSIONS
module-attribute
¶
DEFAULT_EXTENSIONS: Sequence[ApplicationExtension] = (
ValidationExtension(
[DependenciesAccessibleRule()], strict=True
),
)
AfterApplicationInit
¶
Bases: Protocol
Extension for application post-initialization actions.
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationInit
¶
Bases: Protocol
Extension for application pre-initialization actions.
on_app_init
async
¶
on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationShutdown
¶
Bases: Protocol
Extension for application shutdown actions.
on_app_shutdown
async
¶
on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnModuleConfigure
¶
Bases: Protocol
Extension for module configuration.
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
OnModuleDestroy
¶
OnModuleInit
¶
OnModuleRegistration
¶
Bases: Protocol
Extension for contributing providers to module metadata during registration.
This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (assigned to root module)
- Module-level extensions (in topological order)
Key differences from OnModuleConfigure
- Runs after ALL modules are collected (cross-module visibility)
- Receives registry with access to all modules' metadata
- Can add providers to owning module
on_module_registration
¶
on_module_registration(
registry: ModuleMetadataRegistry,
owning_module: ModuleType,
context: Mapping[Any, Any] | None,
) -> None
Contribute providers to module metadata before Module objects are created.
| PARAMETER | DESCRIPTION |
|---|---|
registry
|
Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.
TYPE:
|
owning_module
|
The module type that owns this extension. Providers added via registry.add_provider() should target this module.
TYPE:
|
context
|
Application context passed to WakuFactory (read-only). |
Source code in src/waku/extensions/protocols.py
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
register_application_extension(
extension: ApplicationExtension,
) -> Self
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
register_module_extension(
module_type: ModuleType, extension: ModuleExtension
) -> Self
get_application_extensions
¶
get_module_extensions
¶
get_module_extensions(
module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
protocols
¶
Extension protocols for application and module lifecycle hooks.
ApplicationExtension
module-attribute
¶
ApplicationExtension: TypeAlias = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnModuleRegistration
)
ModuleExtension
module-attribute
¶
ModuleExtension: TypeAlias = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnModuleRegistration
)
OnApplicationInit
¶
Bases: Protocol
Extension for application pre-initialization actions.
on_app_init
async
¶
on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
AfterApplicationInit
¶
Bases: Protocol
Extension for application post-initialization actions.
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationShutdown
¶
Bases: Protocol
Extension for application shutdown actions.
on_app_shutdown
async
¶
on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnModuleRegistration
¶
Bases: Protocol
Extension for contributing providers to module metadata during registration.
This hook runs after all module metadata is collected but before Module objects are created. Use this for cross-module aggregation that produces providers which should belong to the owning module.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (assigned to root module)
- Module-level extensions (in topological order)
Key differences from OnModuleConfigure
- Runs after ALL modules are collected (cross-module visibility)
- Receives registry with access to all modules' metadata
- Can add providers to owning module
on_module_registration
¶
on_module_registration(
registry: ModuleMetadataRegistry,
owning_module: ModuleType,
context: Mapping[Any, Any] | None,
) -> None
Contribute providers to module metadata before Module objects are created.
| PARAMETER | DESCRIPTION |
|---|---|
registry
|
Registry of all collected module metadata. Use find_extensions() to discover extensions across modules, add_provider() to contribute.
TYPE:
|
owning_module
|
The module type that owns this extension. Providers added via registry.add_provider() should target this module.
TYPE:
|
context
|
Application context passed to WakuFactory (read-only). |
Source code in src/waku/extensions/protocols.py
OnModuleConfigure
¶
Bases: Protocol
Extension for module configuration.
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
OnModuleInit
¶
registry
¶
Extension registry for centralized management of extensions.
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
register_application_extension(
extension: ApplicationExtension,
) -> Self
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
register_module_extension(
module_type: ModuleType, extension: ModuleExtension
) -> Self
get_application_extensions
¶
get_module_extensions
¶
get_module_extensions(
module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
factory
¶
ContainerConfig
dataclass
¶
WakuFactory
¶
WakuFactory(
root_module_type: ModuleType,
/,
context: dict[Any, Any] | None = None,
lifespan: Sequence[LifespanFunc] = (),
extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
container_config: ContainerConfig | None = None,
provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
create
¶
create() -> WakuApplication
Source code in src/waku/factory.py
lifespan
¶
LifespanFunc
module-attribute
¶
LifespanFunc: TypeAlias = (
Callable[
['WakuApplication'],
AbstractAsyncContextManager[None],
]
| AbstractAsyncContextManager[None]
)
LifespanWrapper
¶
LifespanWrapper(lifespan_func: LifespanFunc)
Source code in src/waku/lifespan.py
lifespan
async
¶
lifespan(app: WakuApplication) -> AsyncIterator[None]
Source code in src/waku/lifespan.py
modules
¶
DynamicModule
dataclass
¶
DynamicModule(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
parent_module: ModuleType,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
ModuleCompiler
¶
extract_metadata
¶
extract_metadata(
module_type: ModuleType | DynamicModule,
) -> tuple[ModuleType, ModuleMetadata]
Source code in src/waku/modules/_metadata.py
ModuleMetadata
dataclass
¶
ModuleMetadata(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
)
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
ModuleMetadataRegistry
¶
ModuleMetadataRegistry(
metadata_by_type: dict[ModuleType, ModuleMetadata],
topological_order: tuple[ModuleType, ...],
)
Registry providing access to collected module metadata.
Provides read access to all modules' metadata for aggregation purposes, with controlled write access through explicit methods.
This class is used during the module registration phase to enable cross-module aggregation of providers.
Source code in src/waku/modules/_metadata_registry.py
modules
property
¶
modules: tuple[ModuleType, ...]
All module types in topological order (dependencies first).
get_metadata
¶
get_metadata(module_type: ModuleType) -> ModuleMetadata
find_extensions
¶
find_extensions(
protocol: type[_ExtT],
) -> Iterator[tuple[ModuleType, _ExtT]]
Find all extensions of a given type across all modules.
Yields (module_type, extension) pairs in topological order. This is useful for aggregating data from extensions across modules.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The extension protocol/type to search for.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
tuple[ModuleType, _ExtT]
|
Tuples of (module_type, extension) for each matching extension. |
Source code in src/waku/modules/_metadata_registry.py
add_provider
¶
add_provider(
module_type: ModuleType, provider: ProviderSpec
) -> None
Add a provider to a module's metadata.
This is the preferred way to add providers during registration hooks. The provider will become part of the owning module.
| PARAMETER | DESCRIPTION |
|---|---|
module_type
|
The module to add the provider to.
TYPE:
|
provider
|
The provider specification to add.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
KeyError
|
If module_type is not in the registry. |
Source code in src/waku/modules/_metadata_registry.py
Module
¶
Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
exports
instance-attribute
¶
exports: Final[
Sequence[type[object] | ModuleType | DynamicModule]
] = exports
provider
property
¶
Get the aggregated provider for this module.
This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If create_provider() has not been called yet. |
create_provider
¶
create_provider(
context: dict[Any, Any] | None,
builder: ActivationBuilder,
provider_filter: IProviderFilter,
) -> BaseProvider
Create aggregated provider with activation filtering applied.
| PARAMETER | DESCRIPTION |
|---|---|
context
|
Context dict for activation decisions. |
builder
|
Activation builder for checking if types are registered.
TYPE:
|
provider_filter
|
Filter strategy for conditional provider activation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseProvider
|
BaseProvider with only active providers aggregated. |
Source code in src/waku/modules/_module.py
ModuleRegistry
¶
ModuleRegistry(
*,
compiler: ModuleCompiler,
root_module: Module,
modules: dict[UUID, Module],
providers: list[BaseProvider],
adjacency: AdjacencyMatrix,
)
Immutable registry and graph for module queries, traversal, and lookups.
Source code in src/waku/modules/_registry.py
get
¶
get(module_type: ModuleType | DynamicModule) -> Module
Source code in src/waku/modules/_registry.py
get_by_id
¶
traverse
¶
Traverse the module graph in depth-first post-order (children before parent) recursively.
| PARAMETER | DESCRIPTION |
|---|---|
from_
|
Start module (default: root)
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Module
|
Each traversed module (post-order)
TYPE::
|
Source code in src/waku/modules/_registry.py
ModuleRegistryBuilder
¶
ModuleRegistryBuilder(
root_module_type: ModuleType,
compiler: ModuleCompiler | None = None,
context: dict[Any, Any] | None = None,
provider_filter: IProviderFilter | None = None,
app_extensions: Sequence[ApplicationExtension] = (),
)
Source code in src/waku/modules/_registry_builder.py
module
¶
module(
*,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
exports: Sequence[
type[object] | ModuleType | DynamicModule
] = (),
extensions: Sequence[ModuleExtension] = (),
is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
testing
¶
override
¶
override(
container: AsyncContainer,
*providers: BaseProvider,
context: dict[Any, Any] | None = None,
) -> Iterator[None]
Temporarily override providers and/or context in an AsyncContainer for testing.
| PARAMETER | DESCRIPTION |
|---|---|
container
|
The container whose providers/context will be overridden.
TYPE:
|
*providers
|
Providers to override in the container.
TYPE:
|
context
|
Context values to override. |
| YIELDS | DESCRIPTION |
|---|---|
None
|
Context in which the container uses the overridden providers/context.
TYPE::
|
Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override
class Service: ...
class ServiceOverride(Service): ...
# Override providers
with override(application.container, singleton(ServiceOverride, provided_type=Service)):
service = await application.container.get(Service)
assert isinstance(service, ServiceOverride)
# Override context
with override(application.container, context={int: 123}):
...
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If container is not at root (APP) scope. |
Source code in src/waku/testing.py
create_test_app
async
¶
create_test_app(
*,
base: ModuleType | DynamicModule | None = None,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
extensions: Sequence[ModuleExtension] = (),
app_extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
context: dict[Any, Any] | None = None,
) -> AsyncIterator[WakuApplication]
Create a minimal test application with given configuration.
Useful for testing extensions and module configurations in isolation without needing to set up a full application structure.
| PARAMETER | DESCRIPTION |
|---|---|
base
|
Base module to build upon. When provided, the test module imports this module and providers act as overrides.
TYPE:
|
providers
|
Providers to register in the test module.
When
TYPE:
|
imports
|
Additional modules to import into the test module.
TYPE:
|
extensions
|
Module extensions to register.
TYPE:
|
app_extensions
|
Application extensions to register (default: DEFAULT_EXTENSIONS).
TYPE:
|
context
|
Context values to pass to the container. |
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[WakuApplication]
|
Initialized WakuApplication. |
Example
from waku.testing import create_test_app
from waku.di import singleton
class IRepository(Protocol):
async def get(self, id: str) -> Entity: ...
class FakeRepository(IRepository):
async def get(self, id: str) -> Entity:
return Entity(id=id)
# Create test app from scratch
async def test_my_extension():
extension = MyExtension().bind(SomeEvent, SomeHandler)
async with create_test_app(
extensions=[extension],
providers=[singleton(IRepository, FakeRepository)],
) as app:
service = await app.container.get(MyService)
result = await service.do_something()
assert result == expected
# Create test app based on existing module with overrides
async def test_with_base_module():
async with create_test_app(
base=AppModule,
providers=[singleton(IRepository, FakeRepository)],
) as app:
# FakeRepository replaces the real one from AppModule
repo = await app.container.get(IRepository)
assert isinstance(repo, FakeRepository)
Source code in src/waku/testing.py
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | |
validation
¶
ValidationRule
¶
Bases: Protocol
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/_abc.py
ValidationExtension
¶
ValidationExtension(
rules: Sequence[ValidationRule], *, strict: bool = True
)
Bases: AfterApplicationInit
Source code in src/waku/validation/_extension.py
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
rules
¶
DependenciesAccessibleRule
¶
DependenciesAccessibleRule(cache_size: int = 1000)
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.
Source code in src/waku/validation/rules/dependency_accessible.py
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyInaccessibleError
¶
DependencyInaccessibleError(
required_type: type[object],
required_by: object,
from_module: Module,
)
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
dependency_accessible
¶
DependencyInaccessibleError
¶
DependencyInaccessibleError(
required_type: type[object],
required_by: object,
from_module: Module,
)
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
AccessibilityStrategy
¶
GlobalProvidersStrategy
¶
GlobalProvidersStrategy(
modules: Sequence[Module],
container: AsyncContainer,
types_extractor: ModuleTypesExtractor,
registry: ModuleRegistry,
)
Bases: AccessibilityStrategy
Check if type is provided by a global module or APP-scoped context.
Source code in src/waku/validation/rules/dependency_accessible.py
LocalProvidersStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by the module itself.
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
ContextVarsStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by application or request container context.
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
ImportedModulesStrategy
¶
ImportedModulesStrategy(
registry: ModuleRegistry,
types_extractor: ModuleTypesExtractor,
)
Bases: AccessibilityStrategy
Check if type is accessible via imported modules (direct export or re-export).
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyAccessChecker
¶
DependencyAccessChecker(
strategies: Sequence[AccessibilityStrategy],
)
Handles dependency accessibility checks between modules.
Source code in src/waku/validation/rules/dependency_accessible.py
DependenciesAccessibleRule
¶
DependenciesAccessibleRule(cache_size: int = 1000)
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.
Source code in src/waku/validation/rules/dependency_accessible.py
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]