waku
¶
WakuApplication
¶
WakuApplication(
*,
container: AsyncContainer,
registry: ModuleRegistry,
lifespan: Sequence[LifespanFunc | LifespanWrapper],
extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
initialize
async
¶
WakuFactory
¶
WakuFactory(
root_module_type: ModuleType,
/,
context: dict[Any, Any] | None = None,
lifespan: Sequence[LifespanFunc] = (),
extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
container_config: ContainerConfig | None = None,
provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
create
¶
create() -> WakuApplication
Source code in src/waku/factory.py
DynamicModule
dataclass
¶
DynamicModule(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
parent_module: ModuleType,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
Module
¶
Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
exports
instance-attribute
¶
exports: Final[
Sequence[type[object] | ModuleType | DynamicModule]
] = exports
provider
property
¶
Get the aggregated provider for this module.
This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If create_provider() has not been called yet. |
create_provider
¶
create_provider(
context: dict[Any, Any] | None,
builder: ActivationBuilder,
provider_filter: IProviderFilter,
) -> BaseProvider
Create aggregated provider with activation filtering applied.
| PARAMETER | DESCRIPTION |
|---|---|
context
|
Context dict for activation decisions. |
builder
|
Activation builder for checking if types are registered.
TYPE:
|
provider_filter
|
Filter strategy for conditional provider activation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseProvider
|
BaseProvider with only active providers aggregated. |
Source code in src/waku/modules/_module.py
module
¶
module(
*,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
exports: Sequence[
type[object] | ModuleType | DynamicModule
] = (),
extensions: Sequence[ModuleExtension] = (),
is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
application
¶
WakuApplication
¶
WakuApplication(
*,
container: AsyncContainer,
registry: ModuleRegistry,
lifespan: Sequence[LifespanFunc | LifespanWrapper],
extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
initialize
async
¶
cqrs
¶
NextHandlerType
module-attribute
¶
Event
dataclass
¶
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
Request
dataclass
¶
EventHandler
¶
The event handler interface.
Usage::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
Mediator
¶
Mediator(
container: AsyncContainer,
event_publisher: EventPublisher,
)
Bases: IMediator
Default CQRS implementation.
Initialize the mediator.
| PARAMETER | DESCRIPTION |
|---|---|
container
|
Container used to resolve handlers and behaviors
TYPE:
|
event_publisher
|
Function to publish events to handlers
TYPE:
|
Source code in src/waku/cqrs/impl.py
send
async
¶
Send a request through the CQRS pipeline chain.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to process |
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Response from the handler |
| RAISES | DESCRIPTION |
|---|---|
RequestHandlerNotFound
|
If no handler is registered for the request type |
Source code in src/waku/cqrs/impl.py
IMediator
¶
ISender
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type: type[
IMediator
] = Mediator,
event_publisher: type[
EventPublisher
] = SequentialEventPublisher,
pipeline_behaviors: Sequence[
type[IPipelineBehavior[Any, Any]]
] = (),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
mediator_implementation_type
class-attribute
instance-attribute
¶
event_publisher
class-attribute
instance-attribute
¶
event_publisher: type[EventPublisher] = (
SequentialEventPublisher
)
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
bind_request
¶
bind_request(
request_type: type[RequestT],
handler_type: type[RequestHandler[RequestT, Any]],
*,
behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
| None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
bind_event
¶
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
MediatorModule
¶
register
classmethod
¶
register(
config: MediatorConfig | None = None,
) -> DynamicModule
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
RequestHandler
¶
Bases: ABC, Generic[RequestT, ResponseT]
The request handler interface.
The request handler is an object, which gets a request as input and may return a response as a result.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
contracts
¶
NextHandlerType
module-attribute
¶
RequestT
module-attribute
¶
ResponseT
module-attribute
¶
Event
dataclass
¶
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
Request
dataclass
¶
event
¶
pipeline
¶
NextHandlerType
module-attribute
¶
IPipelineBehavior
¶
Bases: ABC, Generic[RequestT, ResponseT]
Interface for pipeline behaviors that wrap request handling.
handle
abstractmethod
async
¶
handle(
request: RequestT,
/,
next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT
Handle the request and call the next handler in the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to handle
TYPE:
|
next_handler
|
Function to call the next handler in the pipeline
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ResponseT
|
The response from the pipeline |
Source code in src/waku/cqrs/contracts/pipeline.py
events
¶
EventHandler
¶
The event handler interface.
Usage::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
GroupEventPublisher
¶
Bases: EventPublisher
SequentialEventPublisher
¶
Bases: EventPublisher
handler
¶
EventHandler
¶
The event handler interface.
Usage::
class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEvent, /) -> None:
await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
map
¶
EventMapRegistry
module-attribute
¶
EventMapRegistry: TypeAlias = MutableMapping[
type[EventT], list[type[EventHandler[EventT]]]
]
EventMap
¶
Source code in src/waku/cqrs/events/map.py
bind
¶
Source code in src/waku/cqrs/events/map.py
publish
¶
SequentialEventPublisher
¶
Bases: EventPublisher
GroupEventPublisher
¶
Bases: EventPublisher
exceptions
¶
ImproperlyConfiguredError
¶
Bases: MediatorError
Raised when cqrs configuration is invalid.
RequestHandlerAlreadyRegistered
¶
RequestHandlerAlreadyRegistered(
request_type: type[Request[Any]],
handler_type: type[RequestHandler[Any, Any]],
)
Bases: MediatorError, KeyError
Raised when a request handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
RequestHandlerNotFound
¶
Bases: MediatorError, TypeError
Raised when a request handler is not found.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
Source code in src/waku/cqrs/exceptions.py
EventHandlerAlreadyRegistered
¶
EventHandlerAlreadyRegistered(
event_type: type[Event],
handler_type: type[EventHandler[Any]],
)
Bases: MediatorError, KeyError
Raised when an event handler is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
event_type |
The type of event that caused the error.
|
handler_type |
The type of handler that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
PipelineBehaviorAlreadyRegistered
¶
PipelineBehaviorAlreadyRegistered(
request_type: type[Request],
behavior_type: type[IPipelineBehavior[Any, Any]],
)
Bases: MediatorError, KeyError
Raised when a pipeline behavior is already registered.
| ATTRIBUTE | DESCRIPTION |
|---|---|
request_type |
The type of request that caused the error.
|
behavior_type |
The type of behavior that was already registered.
|
Source code in src/waku/cqrs/exceptions.py
impl
¶
Mediator
¶
Mediator(
container: AsyncContainer,
event_publisher: EventPublisher,
)
Bases: IMediator
Default CQRS implementation.
Initialize the mediator.
| PARAMETER | DESCRIPTION |
|---|---|
container
|
Container used to resolve handlers and behaviors
TYPE:
|
event_publisher
|
Function to publish events to handlers
TYPE:
|
Source code in src/waku/cqrs/impl.py
send
async
¶
Send a request through the CQRS pipeline chain.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The request to process |
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Response from the handler |
| RAISES | DESCRIPTION |
|---|---|
RequestHandlerNotFound
|
If no handler is registered for the request type |
Source code in src/waku/cqrs/impl.py
interfaces
¶
ISender
¶
modules
¶
MediatorConfig
dataclass
¶
MediatorConfig(
*,
mediator_implementation_type: type[
IMediator
] = Mediator,
event_publisher: type[
EventPublisher
] = SequentialEventPublisher,
pipeline_behaviors: Sequence[
type[IPipelineBehavior[Any, Any]]
] = (),
)
Configuration for the Mediator extension.
This class defines the configuration options for setting up the cqrs pattern implementation in the application.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mediator_implementation_type |
The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class. |
event_publisher |
The implementation class for publishing events. Defaults to
TYPE:
|
pipeline_behaviors |
A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.
TYPE:
|
mediator_implementation_type
class-attribute
instance-attribute
¶
event_publisher
class-attribute
instance-attribute
¶
event_publisher: type[EventPublisher] = (
SequentialEventPublisher
)
MediatorModule
¶
register
classmethod
¶
register(
config: MediatorConfig | None = None,
) -> DynamicModule
Application-level module for Mediator setup.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
Configuration for the Mediator extension.
TYPE:
|
Source code in src/waku/cqrs/modules.py
MediatorExtension
¶
Bases: OnModuleConfigure
Source code in src/waku/cqrs/modules.py
bind_request
¶
bind_request(
request_type: type[RequestT],
handler_type: type[RequestHandler[RequestT, Any]],
*,
behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
| None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
bind_event
¶
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
pipeline
¶
PipelineBehaviorWrapper
¶
PipelineBehaviorWrapper(
behaviors: Sequence[
IPipelineBehavior[RequestT, ResponseT]
],
)
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
wrap(
handle: NextHandlerType[RequestT, ResponseT],
) -> NextHandlerType[RequestT, ResponseT]
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
chain
¶
PipelineBehaviorWrapper
¶
PipelineBehaviorWrapper(
behaviors: Sequence[
IPipelineBehavior[RequestT, ResponseT]
],
)
Bases: Generic[RequestT, ResponseT]
Composes pipeline behaviors into a processing chain.
Source code in src/waku/cqrs/pipeline/chain.py
wrap
¶
wrap(
handle: NextHandlerType[RequestT, ResponseT],
) -> NextHandlerType[RequestT, ResponseT]
Create a pipeline that wraps the handler function with behaviors.
Pipeline behaviors are executed in the order they are provided.
| PARAMETER | DESCRIPTION |
|---|---|
handle
|
The handler function to wrap with behaviors
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
NextHandlerType[RequestT, ResponseT]
|
A function that executes the entire pipeline |
Source code in src/waku/cqrs/pipeline/chain.py
map
¶
PipelineBehaviorMapRegistry
module-attribute
¶
PipelineBehaviorMapRegistry = MutableMapping[
type[RequestT],
list[type[IPipelineBehavior[RequestT, ResponseT]]],
]
PipelineBehaviorMap
¶
Source code in src/waku/cqrs/pipeline/map.py
bind
¶
bind(
request_type: type[RequestT],
behavior_types: list[
type[IPipelineBehavior[RequestT, ResponseT]]
],
) -> Self
Source code in src/waku/cqrs/pipeline/map.py
requests
¶
RequestHandler
¶
Bases: ABC, Generic[RequestT, ResponseT]
The request handler interface.
The request handler is an object, which gets a request as input and may return a response as a result.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handler
¶
RequestHandler
¶
Bases: ABC, Generic[RequestT, ResponseT]
The request handler interface.
The request handler is an object, which gets a request as input and may return a response as a result.
Command handler example::
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: JoinMeetingCommand, /) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Query handler example::
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
map
¶
RequestMapRegistry
module-attribute
¶
RequestMapRegistry: TypeAlias = MutableMapping[
type[RequestT],
type[RequestHandler[RequestT, ResponseT]],
]
di
¶
ActivationBuilder
¶
ActivationContext
¶
ConditionalProvider
dataclass
¶
IProviderFilter
¶
Bases: Protocol
Strategy for filtering providers based on activation context.
filter
¶
filter(
providers: list[ProviderSpec],
context: dict[Any, Any] | None,
module_type: ModuleType | DynamicModule,
builder: ActivationBuilder,
) -> list[Provider]
ProviderFilter
dataclass
¶
Default provider filter implementation.
filter
¶
filter(
providers: list[ProviderSpec],
context: dict[Any, Any] | None,
module_type: ModuleType | DynamicModule,
builder: ActivationBuilder,
) -> list[Provider]
Source code in src/waku/di/_activation.py
contextual
¶
contextual(
provided_type: Any, *, scope: Scope = REQUEST
) -> Provider
contextual(
provided_type: Any,
*,
scope: Scope = REQUEST,
when: Activator,
) -> ConditionalProvider
contextual(
provided_type: Any,
*,
scope: Scope = REQUEST,
when: Activator | None = None,
) -> ProviderSpec
Provide a dependency from the current context (e.g., app/request).
| PARAMETER | DESCRIPTION |
|---|---|
provided_type
|
The type to resolve from context.
TYPE:
|
scope
|
Scope of the context variable (default: Scope.REQUEST).
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
many
¶
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
) -> Provider
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
when: Activator,
) -> ConditionalProvider
many(
interface: Any,
*implementations: Any,
scope: Scope = REQUEST,
cache: bool = True,
when: Activator | None = None,
) -> ProviderSpec
Register multiple implementations as a collection.
| PARAMETER | DESCRIPTION |
|---|---|
interface
|
Interface type for the collection.
TYPE:
|
*implementations
|
Implementation types or factory functions to include in collection.
TYPE:
|
scope
|
Scope of the collection (default: Scope.REQUEST).
TYPE:
|
cache
|
Whether to cache the resolve results within scope.
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If no implementations are provided. |
TypeError
|
If a factory function lacks a return type annotation. |
Source code in src/waku/di/_providers.py
object_
¶
object_(
obj: Any,
*,
provided_type: Any | None = None,
when: Activator,
) -> ConditionalProvider
object_(
obj: Any,
*,
provided_type: Any | None = None,
when: Activator | None = None,
) -> ProviderSpec
Provide the exact object passed at creation time as a singleton dependency.
The provider always returns the same object instance, without instantiation or copying.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
The instance to provide as-is.
TYPE:
|
provided_type
|
Explicit type to provide (default: inferred).
TYPE:
|
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
provider
¶
provider(
source: Callable[..., Any] | type[Any],
*,
scope: Scope = REQUEST,
provided_type: Any | None = None,
cache: bool = True,
) -> Provider
Create a Dishka provider for a callable or type.
| PARAMETER | DESCRIPTION |
|---|---|
source
|
Callable or type to provide as a dependency. |
scope
|
Scope of the dependency (default: Scope.REQUEST).
TYPE:
|
provided_type
|
Explicit type to provide (default: inferred).
TYPE:
|
cache
|
Whether to cache the instance in the scope.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Provider
|
Configured provider instance.
TYPE:
|
Source code in src/waku/di/_providers.py
scoped
¶
scoped(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
scoped(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
scoped(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a scoped provider (lifetime: request).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
singleton
¶
singleton(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
singleton(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
singleton(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a singleton provider (lifetime: app).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
transient
¶
transient(
source: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
transient(
interface: Any,
implementation: type[_T] | Callable[..., _T],
/,
*,
when: Activator,
) -> ConditionalProvider
transient(
interface_or_source: type[Any] | Callable[..., Any],
implementation: type[Any]
| Callable[..., Any]
| None = None,
/,
*,
when: Activator | None = None,
) -> ProviderSpec
Create a transient provider (new instance per injection).
| PARAMETER | DESCRIPTION |
|---|---|
interface_or_source
|
Interface type or source if no separate implementation. |
implementation
|
Implementation type if interface is provided. |
when
|
Optional predicate to conditionally activate the provider.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ProviderSpec
|
Provider or ConditionalProvider if |
Source code in src/waku/di/_providers.py
extensions
¶
ApplicationExtension
module-attribute
¶
ApplicationExtension: TypeAlias = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnBeforeContainerBuild
)
ModuleExtension
module-attribute
¶
ModuleExtension: TypeAlias = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnBeforeContainerBuild
)
DEFAULT_EXTENSIONS
module-attribute
¶
DEFAULT_EXTENSIONS: Sequence[ApplicationExtension] = (
ValidationExtension(
[DependenciesAccessibleRule()], strict=True
),
)
AfterApplicationInit
¶
Bases: Protocol
Extension for application post-initialization actions.
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationInit
¶
Bases: Protocol
Extension for application pre-initialization actions.
on_app_init
async
¶
on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationShutdown
¶
Bases: Protocol
Extension for application shutdown actions.
on_app_shutdown
async
¶
on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnBeforeContainerBuild
¶
Bases: Protocol
Extension for aggregating configuration across modules before container build.
This hook runs after all modules are collected but before the DI container is created. Use this for cross-module configuration aggregation that needs to produce injectable registries.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (in registration order)
- Module-level extensions (in topological order)
on_before_container_build
¶
on_before_container_build(
modules: Sequence[Module],
context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]
Aggregate configuration from modules and return providers to register.
| PARAMETER | DESCRIPTION |
|---|---|
modules
|
All application modules in topological order (dependencies first). |
context
|
Application context passed to WakuFactory (read-only). |
| RETURNS | DESCRIPTION |
|---|---|
Sequence[ProviderSpec]
|
Sequence of providers to add to the container. |
Source code in src/waku/extensions/protocols.py
OnModuleConfigure
¶
Bases: Protocol
Extension for module configuration.
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
OnModuleDestroy
¶
OnModuleInit
¶
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
register_application_extension(
extension: ApplicationExtension,
) -> Self
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
register_module_extension(
module_type: ModuleType, extension: ModuleExtension
) -> Self
get_application_extensions
¶
get_module_extensions
¶
get_module_extensions(
module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
protocols
¶
Extension protocols for application and module lifecycle hooks.
ApplicationExtension
module-attribute
¶
ApplicationExtension: TypeAlias = (
OnApplicationInit
| AfterApplicationInit
| OnApplicationShutdown
| OnBeforeContainerBuild
)
ModuleExtension
module-attribute
¶
ModuleExtension: TypeAlias = (
OnModuleConfigure
| OnModuleInit
| OnModuleDestroy
| OnBeforeContainerBuild
)
OnApplicationInit
¶
Bases: Protocol
Extension for application pre-initialization actions.
on_app_init
async
¶
on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
AfterApplicationInit
¶
Bases: Protocol
Extension for application post-initialization actions.
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnApplicationShutdown
¶
Bases: Protocol
Extension for application shutdown actions.
on_app_shutdown
async
¶
on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
OnBeforeContainerBuild
¶
Bases: Protocol
Extension for aggregating configuration across modules before container build.
This hook runs after all modules are collected but before the DI container is created. Use this for cross-module configuration aggregation that needs to produce injectable registries.
Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).
Execution order
- Application-level extensions (in registration order)
- Module-level extensions (in topological order)
on_before_container_build
¶
on_before_container_build(
modules: Sequence[Module],
context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]
Aggregate configuration from modules and return providers to register.
| PARAMETER | DESCRIPTION |
|---|---|
modules
|
All application modules in topological order (dependencies first). |
context
|
Application context passed to WakuFactory (read-only). |
| RETURNS | DESCRIPTION |
|---|---|
Sequence[ProviderSpec]
|
Sequence of providers to add to the container. |
Source code in src/waku/extensions/protocols.py
OnModuleConfigure
¶
Bases: Protocol
Extension for module configuration.
on_module_configure
¶
on_module_configure(metadata: ModuleMetadata) -> None
OnModuleInit
¶
registry
¶
Extension registry for centralized management of extensions.
ExtensionRegistry
¶
Registry for extensions.
This registry maintains references to all extensions in the application, allowing for centralized management and discovery.
Source code in src/waku/extensions/registry.py
register_application_extension
¶
register_application_extension(
extension: ApplicationExtension,
) -> Self
Register an application extension with optional priority and tags.
Source code in src/waku/extensions/registry.py
register_module_extension
¶
register_module_extension(
module_type: ModuleType, extension: ModuleExtension
) -> Self
get_application_extensions
¶
get_module_extensions
¶
get_module_extensions(
module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
factory
¶
ContainerConfig
dataclass
¶
WakuFactory
¶
WakuFactory(
root_module_type: ModuleType,
/,
context: dict[Any, Any] | None = None,
lifespan: Sequence[LifespanFunc] = (),
extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
container_config: ContainerConfig | None = None,
provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
create
¶
create() -> WakuApplication
Source code in src/waku/factory.py
lifespan
¶
LifespanFunc
module-attribute
¶
LifespanFunc: TypeAlias = (
Callable[
['WakuApplication'],
AbstractAsyncContextManager[None],
]
| AbstractAsyncContextManager[None]
)
LifespanWrapper
¶
LifespanWrapper(lifespan_func: LifespanFunc)
Source code in src/waku/lifespan.py
lifespan
async
¶
lifespan(app: WakuApplication) -> AsyncIterator[None]
Source code in src/waku/lifespan.py
modules
¶
DynamicModule
dataclass
¶
DynamicModule(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
parent_module: ModuleType,
)
Bases: ModuleMetadata
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
ModuleCompiler
¶
extract_metadata
¶
extract_metadata(
module_type: ModuleType | DynamicModule,
) -> tuple[ModuleType, ModuleMetadata]
Source code in src/waku/modules/_metadata.py
ModuleMetadata
dataclass
¶
ModuleMetadata(
*,
providers: list[ProviderSpec] = list(),
imports: list[ModuleType | DynamicModule] = list(),
exports: list[
type[object] | ModuleType | DynamicModule
] = list(),
extensions: list[ModuleExtension] = list(),
is_global: bool = False,
id: UUID = uuid4(),
)
providers
class-attribute
instance-attribute
¶
providers: list[ProviderSpec] = field(default_factory=list)
List of providers for dependency injection.
imports
class-attribute
instance-attribute
¶
imports: list[ModuleType | DynamicModule] = field(
default_factory=list
)
List of modules imported by this module.
exports
class-attribute
instance-attribute
¶
exports: list[type[object] | ModuleType | DynamicModule] = (
field(default_factory=list)
)
List of types or modules exported by this module.
extensions
class-attribute
instance-attribute
¶
extensions: list[ModuleExtension] = field(
default_factory=list
)
List of module extensions for lifecycle hooks.
Module
¶
Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
exports
instance-attribute
¶
exports: Final[
Sequence[type[object] | ModuleType | DynamicModule]
] = exports
provider
property
¶
Get the aggregated provider for this module.
This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If create_provider() has not been called yet. |
create_provider
¶
create_provider(
context: dict[Any, Any] | None,
builder: ActivationBuilder,
provider_filter: IProviderFilter,
) -> BaseProvider
Create aggregated provider with activation filtering applied.
| PARAMETER | DESCRIPTION |
|---|---|
context
|
Context dict for activation decisions. |
builder
|
Activation builder for checking if types are registered.
TYPE:
|
provider_filter
|
Filter strategy for conditional provider activation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseProvider
|
BaseProvider with only active providers aggregated. |
Source code in src/waku/modules/_module.py
ModuleRegistry
¶
ModuleRegistry(
*,
compiler: ModuleCompiler,
root_module: Module,
modules: dict[UUID, Module],
providers: list[BaseProvider],
adjacency: AdjacencyMatrix,
)
Immutable registry and graph for module queries, traversal, and lookups.
Source code in src/waku/modules/_registry.py
get
¶
get(module_type: ModuleType | DynamicModule) -> Module
get_by_id
¶
traverse
¶
Traverse the module graph in depth-first post-order (children before parent) recursively.
| PARAMETER | DESCRIPTION |
|---|---|
from_
|
Start module (default: root)
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Module
|
Each traversed module (post-order)
TYPE::
|
Source code in src/waku/modules/_registry.py
ModuleRegistryBuilder
¶
ModuleRegistryBuilder(
root_module_type: ModuleType,
compiler: ModuleCompiler | None = None,
context: dict[Any, Any] | None = None,
provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/modules/_registry_builder.py
build
¶
build() -> ModuleRegistry
module
¶
module(
*,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
exports: Sequence[
type[object] | ModuleType | DynamicModule
] = (),
extensions: Sequence[ModuleExtension] = (),
is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]
Decorator to define a module.
| PARAMETER | DESCRIPTION |
|---|---|
providers
|
Sequence of providers for dependency injection.
TYPE:
|
imports
|
Sequence of modules imported by this module.
TYPE:
|
exports
|
Sequence of types or modules exported by this module.
TYPE:
|
extensions
|
Sequence of module extensions for lifecycle hooks.
TYPE:
|
is_global
|
Whether this module is global or not.
TYPE:
|
Source code in src/waku/modules/_metadata.py
testing
¶
override
¶
override(
container: AsyncContainer,
*providers: BaseProvider,
context: dict[Any, Any] | None = None,
) -> Iterator[None]
Temporarily override providers and/or context in an AsyncContainer for testing.
| PARAMETER | DESCRIPTION |
|---|---|
container
|
The container whose providers/context will be overridden.
TYPE:
|
*providers
|
Providers to override in the container.
TYPE:
|
context
|
Context values to override. |
| YIELDS | DESCRIPTION |
|---|---|
None
|
Context in which the container uses the overridden providers/context.
TYPE::
|
Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override
class Service: ...
class ServiceOverride(Service): ...
# Override providers
with override(application.container, singleton(ServiceOverride, provided_type=Service)):
service = await application.container.get(Service)
assert isinstance(service, ServiceOverride)
# Override context
with override(application.container, context={int: 123}):
...
Source code in src/waku/testing.py
create_test_app
async
¶
create_test_app(
*,
base: ModuleType | DynamicModule | None = None,
providers: Sequence[ProviderSpec] = (),
imports: Sequence[ModuleType | DynamicModule] = (),
extensions: Sequence[ModuleExtension] = (),
app_extensions: Sequence[
ApplicationExtension
] = DEFAULT_EXTENSIONS,
context: dict[Any, Any] | None = None,
) -> AsyncIterator[WakuApplication]
Create a minimal test application with given configuration.
Useful for testing extensions and module configurations in isolation without needing to set up a full application structure.
| PARAMETER | DESCRIPTION |
|---|---|
base
|
Base module to build upon. When provided, the test module imports this module and providers act as overrides.
TYPE:
|
providers
|
Providers to register in the test module.
When
TYPE:
|
imports
|
Additional modules to import into the test module.
TYPE:
|
extensions
|
Module extensions to register.
TYPE:
|
app_extensions
|
Application extensions to register (default: DEFAULT_EXTENSIONS).
TYPE:
|
context
|
Context values to pass to the container. |
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[WakuApplication]
|
Initialized WakuApplication. |
Example
from waku.testing import create_test_app
from waku.di import singleton
class IRepository(Protocol):
async def get(self, id: str) -> Entity: ...
class FakeRepository(IRepository):
async def get(self, id: str) -> Entity:
return Entity(id=id)
# Create test app from scratch
async def test_my_extension():
extension = MyExtension().bind(SomeEvent, SomeHandler)
async with create_test_app(
extensions=[extension],
providers=[singleton(IRepository, FakeRepository)],
) as app:
service = await app.container.get(MyService)
result = await service.do_something()
assert result == expected
# Create test app based on existing module with overrides
async def test_with_base_module():
async with create_test_app(
base=AppModule,
providers=[singleton(IRepository, FakeRepository)],
) as app:
# FakeRepository replaces the real one from AppModule
repo = await app.container.get(IRepository)
assert isinstance(repo, FakeRepository)
Source code in src/waku/testing.py
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | |
validation
¶
ValidationRule
¶
Bases: Protocol
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/_abc.py
ValidationExtension
¶
ValidationExtension(
rules: Sequence[ValidationRule], *, strict: bool = True
)
Bases: AfterApplicationInit
Source code in src/waku/validation/_extension.py
after_app_init
async
¶
after_app_init(app: WakuApplication) -> None
rules
¶
DependenciesAccessibleRule
¶
DependenciesAccessibleRule(cache_size: int = 1000)
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.
Source code in src/waku/validation/rules/dependency_accessible.py
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyInaccessibleError
¶
DependencyInaccessibleError(
required_type: type[object],
required_by: object,
from_module: Module,
)
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
dependency_accessible
¶
DependencyInaccessibleError
¶
DependencyInaccessibleError(
required_type: type[object],
required_by: object,
from_module: Module,
)
Bases: ValidationError
Error indicating a dependency is not accessible to a provider/module.
Source code in src/waku/validation/rules/dependency_accessible.py
AccessibilityStrategy
¶
GlobalProvidersStrategy
¶
GlobalProvidersStrategy(
modules: Sequence[Module],
container: AsyncContainer,
types_extractor: ModuleTypesExtractor,
registry: ModuleRegistry,
)
Bases: AccessibilityStrategy
Check if type is provided by a global module or APP-scoped context.
Source code in src/waku/validation/rules/dependency_accessible.py
LocalProvidersStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by the module itself.
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
ContextVarsStrategy
¶
Bases: AccessibilityStrategy
Check if type is provided by application or request container context.
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
ImportedModulesStrategy
¶
ImportedModulesStrategy(
registry: ModuleRegistry,
types_extractor: ModuleTypesExtractor,
)
Bases: AccessibilityStrategy
Check if type is accessible via imported modules (direct export or re-export).
Source code in src/waku/validation/rules/dependency_accessible.py
is_accessible
¶
Source code in src/waku/validation/rules/dependency_accessible.py
DependencyAccessChecker
¶
DependencyAccessChecker(
strategies: Sequence[AccessibilityStrategy],
)
Handles dependency accessibility checks between modules.
Source code in src/waku/validation/rules/dependency_accessible.py
DependenciesAccessibleRule
¶
DependenciesAccessibleRule(cache_size: int = 1000)
Bases: ValidationRule
Validates that all dependencies required by providers are accessible.
Source code in src/waku/validation/rules/dependency_accessible.py
validate
¶
validate(
context: ValidationContext,
) -> list[ValidationError]