Skip to content

waku

WakuApplication

WakuApplication(
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container: AsyncContainer

registry property

registry: ModuleRegistry

initialize async

initialize() -> None
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close() -> None
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

WakuFactory

WakuFactory(
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[
        ApplicationExtension
    ] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
    provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
    provider_filter: IProviderFilter | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()
    self._provider_filter = provider_filter

create

create() -> WakuApplication
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(
        self._root_module_type,
        context=self._context,
        provider_filter=self._provider_filter,
    ).build()

    additional_providers = self._execute_before_build_hooks(registry.modules)
    all_providers = (*registry.providers, *additional_providers)

    container = self._build_container(all_providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

DynamicModule dataclass

DynamicModule(
    *,
    providers: list[ProviderSpec] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
    parent_module: ModuleType,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers: list[ProviderSpec] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

parent_module instance-attribute

parent_module: ModuleType

Module

Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[ProviderSpec]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id: Final[UUID] = id

target instance-attribute

target: Final[ModuleType] = module_type

providers instance-attribute

providers: Final[Sequence[ProviderSpec]] = providers

imports instance-attribute

imports: Final[Sequence[ModuleType | DynamicModule]] = (
    imports
)

exports instance-attribute

exports: Final[
    Sequence[type[object] | ModuleType | DynamicModule]
] = exports

extensions instance-attribute

extensions: Final[Sequence[ModuleExtension]] = extensions

is_global instance-attribute

is_global: Final[bool] = is_global

name property

name: str

provider property

provider: BaseProvider

Get the aggregated provider for this module.

This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.

RAISES DESCRIPTION
RuntimeError

If create_provider() has not been called yet.

create_provider

create_provider(
    context: dict[Any, Any] | None,
    builder: ActivationBuilder,
    provider_filter: IProviderFilter,
) -> BaseProvider

Create aggregated provider with activation filtering applied.

PARAMETER DESCRIPTION
context

Context dict for activation decisions.

TYPE: dict[Any, Any] | None

builder

Activation builder for checking if types are registered.

TYPE: ActivationBuilder

provider_filter

Filter strategy for conditional provider activation.

TYPE: IProviderFilter

RETURNS DESCRIPTION
BaseProvider

BaseProvider with only active providers aggregated.

Source code in src/waku/modules/_module.py
def create_provider(
    self,
    context: dict[Any, Any] | None,
    builder: ActivationBuilder,
    provider_filter: IProviderFilter,
) -> BaseProvider:
    """Create aggregated provider with activation filtering applied.

    Args:
        context: Context dict for activation decisions.
        builder: Activation builder for checking if types are registered.
        provider_filter: Filter strategy for conditional provider activation.

    Returns:
        BaseProvider with only active providers aggregated.
    """
    active_providers = provider_filter.filter(
        list(self.providers),
        context=context,
        module_type=self.target,
        builder=builder,
    )

    cls = cast(type[_ModuleProvider], type(f'{self.name}Provider', (_ModuleProvider,), {}))
    self._provider = cls(active_providers)
    return self._provider

module

module(
    *,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[
        type[object] | ModuleType | DynamicModule
    ] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[ProviderSpec] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

application

WakuApplication

WakuApplication(
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container: AsyncContainer

registry property

registry: ModuleRegistry

initialize async

initialize() -> None
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close() -> None
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

cqrs

NextHandlerType module-attribute

NextHandlerType: TypeAlias = Callable[
    [RequestT], Awaitable[ResponseT]
]

Event dataclass

Event(*, event_id: UUID = uuid4())

Base class for events.

event_id class-attribute instance-attribute

event_id: UUID = field(default_factory=uuid4)

IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async

handle(
    request: RequestT,
    /,
    next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

Request dataclass

Request(*, request_id: UUID = uuid4())

Bases: Generic[ResponseT]

Base class for request-type objects.

request_id class-attribute instance-attribute

request_id: UUID = field(default_factory=uuid4)

Response dataclass

Response()

Base class for response type objects.

EventHandler

Bases: ABC, Generic[EventT]

The event handler interface.

Usage::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, event: UserJoinedEvent, /) -> None:
      await self._meetings_api.notify_room(event.meeting_id, "New user joined!")

handle abstractmethod async

handle(event: EventT) -> None
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: EventT, /) -> None:
    raise NotImplementedError

Mediator

Mediator(
    container: AsyncContainer,
    event_publisher: EventPublisher,
)

Bases: IMediator

Default CQRS implementation.

Initialize the mediator.

PARAMETER DESCRIPTION
container

Container used to resolve handlers and behaviors

TYPE: AsyncContainer

event_publisher

Function to publish events to handlers

TYPE: EventPublisher

Source code in src/waku/cqrs/impl.py
def __init__(self, container: AsyncContainer, event_publisher: EventPublisher) -> None:
    """Initialize the mediator.

    Args:
        container: Container used to resolve handlers and behaviors
        event_publisher: Function to publish events to handlers
    """
    self._container = container
    self._event_publisher = event_publisher

send async

send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[Any]) -> Any

Send a request through the CQRS pipeline chain.

PARAMETER DESCRIPTION
request

The request to process

TYPE: Request[Any]

RETURNS DESCRIPTION
Any

Response from the handler

RAISES DESCRIPTION
RequestHandlerNotFound

If no handler is registered for the request type

Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: Request[Any], /) -> Any:
    """Send a request through the CQRS pipeline chain.

    Args:
        request: The request to process

    Returns:
        Response from the handler

    Raises:
        RequestHandlerNotFound: If no handler is registered for the request type
    """
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)
    return await self._handle_request(handler, request)

publish async

publish(event: Event) -> None

Publish an event to all registered handlers.

PARAMETER DESCRIPTION
event

The event to publish

TYPE: Event

Source code in src/waku/cqrs/impl.py
@override
async def publish(self, event: Event, /) -> None:
    """Publish an event to all registered handlers.

    Args:
        event: The event to publish
    """
    event_type = type(event)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, event)

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async

publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event, /) -> None:
    """Asynchronously send event to multiple handlers."""

send abstractmethod async

send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish event through the cqrs to be handled by multiple handlers.

publish abstractmethod async

publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event, /) -> None:
    """Asynchronously send event to multiple handlers."""

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async

send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type: type[
        IMediator
    ] = Mediator,
    event_publisher: type[
        EventPublisher
    ] = SequentialEventPublisher,
    pipeline_behaviors: Sequence[
        type[IPipelineBehavior[Any, Any]]
    ] = (),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)

mediator_implementation_type class-attribute instance-attribute

mediator_implementation_type: type[IMediator] = Mediator

event_publisher class-attribute instance-attribute

pipeline_behaviors class-attribute instance-attribute

pipeline_behaviors: Sequence[
    type[IPipelineBehavior[Any, Any]]
] = ()

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._request_map = RequestMap()
    self._event_map = EventMap()
    self._behavior_map = PipelineBehaviorMap()

bind_request

bind_request(
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
    | None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]] | None = None,
) -> Self:
    self._request_map.bind(request_type, handler_type)
    if behaviors:
        self._behavior_map.bind(request_type, behaviors)
    return self

bind_event

bind_event(
    event_type: type[EventT],
    handler_types: list[type[EventHandler[EventT]]],
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[EventT],
    handler_types: list[type[EventHandler[EventT]]],
) -> Self:
    self._event_map.bind(event_type, handler_types)
    return self

on_module_configure

on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    metadata.providers.extend(
        chain(
            self._create_request_handler_providers(),
            self._create_event_handler_providers(),
            self._create_pipeline_behavior_providers(),
        ),
    )

MediatorModule

register classmethod

register(
    config: MediatorConfig | None = None,
) -> DynamicModule

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        is_global=True,
    )

RequestHandler

Bases: ABC, Generic[RequestT, ResponseT]

The request handler interface.

The request handler is an object, which gets a request as input and may return a response as a result.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: JoinMeetingCommand, /) -> None:
      await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
      link = await self._meetings_api.get_link(request.meeting_id)
      return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)

handle abstractmethod async

handle(request: RequestT) -> ResponseT
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

contracts

EventT module-attribute

EventT = TypeVar(
    'EventT', bound='Event', contravariant=True
)

NextHandlerType module-attribute

NextHandlerType: TypeAlias = Callable[
    [RequestT], Awaitable[ResponseT]
]

RequestT module-attribute

RequestT = TypeVar(
    'RequestT', bound='Request[Any]', contravariant=True
)

ResponseT module-attribute

ResponseT = TypeVar(
    'ResponseT',
    bound='Response | None',
    default=None,
    covariant=True,
)

Event dataclass

Event(*, event_id: UUID = uuid4())

Base class for events.

event_id class-attribute instance-attribute
event_id: UUID = field(default_factory=uuid4)

IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async
handle(
    request: RequestT,
    /,
    next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

Request dataclass

Request(*, request_id: UUID = uuid4())

Bases: Generic[ResponseT]

Base class for request-type objects.

request_id class-attribute instance-attribute
request_id: UUID = field(default_factory=uuid4)

Response dataclass

Response()

Base class for response type objects.

event

EventT module-attribute
EventT = TypeVar(
    'EventT', bound='Event', contravariant=True
)
Event dataclass
Event(*, event_id: UUID = uuid4())

Base class for events.

event_id class-attribute instance-attribute
event_id: UUID = field(default_factory=uuid4)

pipeline

NextHandlerType module-attribute
NextHandlerType: TypeAlias = Callable[
    [RequestT], Awaitable[ResponseT]
]
IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async
handle(
    request: RequestT,
    /,
    next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, /, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

request

RequestT module-attribute
RequestT = TypeVar(
    'RequestT', bound='Request[Any]', contravariant=True
)
ResponseT module-attribute
ResponseT = TypeVar(
    'ResponseT',
    bound='Response | None',
    default=None,
    covariant=True,
)
Request dataclass
Request(*, request_id: UUID = uuid4())

Bases: Generic[ResponseT]

Base class for request-type objects.

request_id class-attribute instance-attribute
request_id: UUID = field(default_factory=uuid4)
Response dataclass
Response()

Base class for response type objects.

events

EventHandler

Bases: ABC, Generic[EventT]

The event handler interface.

Usage::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, event: UserJoinedEvent, /) -> None:
      await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
handle abstractmethod async
handle(event: EventT) -> None
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: EventT, /) -> None:
    raise NotImplementedError

EventPublisher

Bases: Protocol

GroupEventPublisher

SequentialEventPublisher

handler

EventHandler

Bases: ABC, Generic[EventT]

The event handler interface.

Usage::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, event: UserJoinedEvent, /) -> None:
      await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
handle abstractmethod async
handle(event: EventT) -> None
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: EventT, /) -> None:
    raise NotImplementedError

map

EventMapRegistry module-attribute
EventMap
EventMap()
Source code in src/waku/cqrs/events/map.py
def __init__(self) -> None:
    self._registry: EventMapRegistry[Any] = defaultdict(list)
registry property
registry: EventMapRegistry[Any]
bind
bind(
    event_type: type[EventT],
    handler_types: list[type[EventHandler[EventT]]],
) -> Self
Source code in src/waku/cqrs/events/map.py
def bind(self, event_type: type[EventT], handler_types: list[type[EventHandler[EventT]]]) -> Self:
    for handler_type in handler_types:
        if handler_type in self._registry[event_type]:
            raise EventHandlerAlreadyRegistered(event_type, handler_type)
        self._registry[event_type].append(handler_type)
    return self
merge
merge(other: EventMap) -> Self
Source code in src/waku/cqrs/events/map.py
def merge(self, other: EventMap) -> Self:
    for event_type, handlers in other.registry.items():
        self.bind(event_type, handlers)
    return self

publish

EventPublisher

Bases: Protocol

SequentialEventPublisher
GroupEventPublisher

exceptions

MediatorError

Bases: WakuError

Base exception for all cqrs-related errors.

ImproperlyConfiguredError

Bases: MediatorError

Raised when cqrs configuration is invalid.

RequestHandlerAlreadyRegistered

RequestHandlerAlreadyRegistered(
    request_type: type[Request[Any]],
    handler_type: type[RequestHandler[Any, Any]],
)

Bases: MediatorError, KeyError

Raised when a request handler is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request[Any]], handler_type: type[RequestHandler[Any, Any]]) -> None:
    self.request_type = request_type
    self.handler_type = handler_type
request_type instance-attribute
request_type = request_type
handler_type instance-attribute
handler_type = handler_type

RequestHandlerNotFound

RequestHandlerNotFound(request_type: type[Request[Any]])

Bases: MediatorError, TypeError

Raised when a request handler is not found.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request[Any]]) -> None:
    self.request_type = request_type
request_type instance-attribute
request_type = request_type

EventHandlerAlreadyRegistered

EventHandlerAlreadyRegistered(
    event_type: type[Event],
    handler_type: type[EventHandler[Any]],
)

Bases: MediatorError, KeyError

Raised when an event handler is already registered.

ATTRIBUTE DESCRIPTION
event_type

The type of event that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, event_type: type[Event], handler_type: type[EventHandler[Any]]) -> None:
    self.event_type = event_type
    self.handler_type = handler_type
event_type instance-attribute
event_type = event_type
handler_type instance-attribute
handler_type = handler_type

PipelineBehaviorAlreadyRegistered

PipelineBehaviorAlreadyRegistered(
    request_type: type[Request],
    behavior_type: type[IPipelineBehavior[Any, Any]],
)

Bases: MediatorError, KeyError

Raised when a pipeline behavior is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

behavior_type

The type of behavior that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request], behavior_type: type[IPipelineBehavior[Any, Any]]) -> None:
    self.request_type = request_type
    self.behavior_type = behavior_type
request_type instance-attribute
request_type = request_type
behavior_type instance-attribute
behavior_type = behavior_type

impl

Mediator

Mediator(
    container: AsyncContainer,
    event_publisher: EventPublisher,
)

Bases: IMediator

Default CQRS implementation.

Initialize the mediator.

PARAMETER DESCRIPTION
container

Container used to resolve handlers and behaviors

TYPE: AsyncContainer

event_publisher

Function to publish events to handlers

TYPE: EventPublisher

Source code in src/waku/cqrs/impl.py
def __init__(self, container: AsyncContainer, event_publisher: EventPublisher) -> None:
    """Initialize the mediator.

    Args:
        container: Container used to resolve handlers and behaviors
        event_publisher: Function to publish events to handlers
    """
    self._container = container
    self._event_publisher = event_publisher
send async
send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[Any]) -> Any

Send a request through the CQRS pipeline chain.

PARAMETER DESCRIPTION
request

The request to process

TYPE: Request[Any]

RETURNS DESCRIPTION
Any

Response from the handler

RAISES DESCRIPTION
RequestHandlerNotFound

If no handler is registered for the request type

Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: Request[Any], /) -> Any:
    """Send a request through the CQRS pipeline chain.

    Args:
        request: The request to process

    Returns:
        Response from the handler

    Raises:
        RequestHandlerNotFound: If no handler is registered for the request type
    """
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)
    return await self._handle_request(handler, request)
publish async
publish(event: Event) -> None

Publish an event to all registered handlers.

PARAMETER DESCRIPTION
event

The event to publish

TYPE: Event

Source code in src/waku/cqrs/impl.py
@override
async def publish(self, event: Event, /) -> None:
    """Publish an event to all registered handlers.

    Args:
        event: The event to publish
    """
    event_type = type(event)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, event)

interfaces

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async
send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish event through the cqrs to be handled by multiple handlers.

publish abstractmethod async
publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event, /) -> None:
    """Asynchronously send event to multiple handlers."""

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async
publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event, /) -> None:
    """Asynchronously send event to multiple handlers."""
send abstractmethod async
send(request: Request[None]) -> None
send(request: Request[ResponseT]) -> ResponseT
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT], /) -> ResponseT:
    """Asynchronously send a request to a single handler."""

modules

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type: type[
        IMediator
    ] = Mediator,
    event_publisher: type[
        EventPublisher
    ] = SequentialEventPublisher,
    pipeline_behaviors: Sequence[
        type[IPipelineBehavior[Any, Any]]
    ] = (),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)
mediator_implementation_type class-attribute instance-attribute
mediator_implementation_type: type[IMediator] = Mediator
event_publisher class-attribute instance-attribute
pipeline_behaviors class-attribute instance-attribute
pipeline_behaviors: Sequence[
    type[IPipelineBehavior[Any, Any]]
] = ()

MediatorModule

register classmethod
register(
    config: MediatorConfig | None = None,
) -> DynamicModule

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        is_global=True,
    )

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._request_map = RequestMap()
    self._event_map = EventMap()
    self._behavior_map = PipelineBehaviorMap()
bind_request
bind_request(
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]]
    | None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, Any]],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, Any]]] | None = None,
) -> Self:
    self._request_map.bind(request_type, handler_type)
    if behaviors:
        self._behavior_map.bind(request_type, behaviors)
    return self
bind_event
bind_event(
    event_type: type[EventT],
    handler_types: list[type[EventHandler[EventT]]],
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[EventT],
    handler_types: list[type[EventHandler[EventT]]],
) -> Self:
    self._event_map.bind(event_type, handler_types)
    return self
on_module_configure
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    metadata.providers.extend(
        chain(
            self._create_request_handler_providers(),
            self._create_event_handler_providers(),
            self._create_pipeline_behavior_providers(),
        ),
    )

pipeline

PipelineBehaviorWrapper

PipelineBehaviorWrapper(
    behaviors: Sequence[
        IPipelineBehavior[RequestT, ResponseT]
    ],
)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    self._behaviors = tuple(behaviors)
wrap

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    if not self._behaviors:
        return handle

    behaviors = self._behaviors

    async def pipeline(request: RequestT) -> ResponseT:
        async def execute(req: RequestT, idx: int) -> ResponseT:
            if idx >= len(behaviors):
                return await handle(req)
            return await behaviors[idx].handle(req, next_handler=lambda r: execute(r, idx + 1))

        return await execute(request, 0)

    return pipeline

chain

PipelineBehaviorWrapper
PipelineBehaviorWrapper(
    behaviors: Sequence[
        IPipelineBehavior[RequestT, ResponseT]
    ],
)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    self._behaviors = tuple(behaviors)
wrap

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    if not self._behaviors:
        return handle

    behaviors = self._behaviors

    async def pipeline(request: RequestT) -> ResponseT:
        async def execute(req: RequestT, idx: int) -> ResponseT:
            if idx >= len(behaviors):
                return await handle(req)
            return await behaviors[idx].handle(req, next_handler=lambda r: execute(r, idx + 1))

        return await execute(request, 0)

    return pipeline

map

PipelineBehaviorMapRegistry module-attribute
PipelineBehaviorMapRegistry = MutableMapping[
    type[RequestT],
    list[type[IPipelineBehavior[RequestT, ResponseT]]],
]
PipelineBehaviorMap
PipelineBehaviorMap()
Source code in src/waku/cqrs/pipeline/map.py
def __init__(self) -> None:
    self._registry: PipelineBehaviorMapRegistry[Any, Any] = defaultdict(list)
registry property
bind
bind(
    request_type: type[RequestT],
    behavior_types: list[
        type[IPipelineBehavior[RequestT, ResponseT]]
    ],
) -> Self
Source code in src/waku/cqrs/pipeline/map.py
def bind(
    self,
    request_type: type[RequestT],
    behavior_types: list[type[IPipelineBehavior[RequestT, ResponseT]]],
) -> Self:
    for behavior_type in behavior_types:
        if behavior_type in self._registry[request_type]:
            raise PipelineBehaviorAlreadyRegistered(request_type, behavior_type)
        self._registry[request_type].append(behavior_type)
    return self
merge
merge(other: PipelineBehaviorMap) -> Self
Source code in src/waku/cqrs/pipeline/map.py
def merge(self, other: PipelineBehaviorMap) -> Self:
    for event_type, handlers in other.registry.items():
        self.bind(event_type, handlers)
    return self

requests

RequestHandler

Bases: ABC, Generic[RequestT, ResponseT]

The request handler interface.

The request handler is an object, which gets a request as input and may return a response as a result.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: JoinMeetingCommand, /) -> None:
      await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
      link = await self._meetings_api.get_link(request.meeting_id)
      return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handle abstractmethod async
handle(request: RequestT) -> ResponseT
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

handler

RequestHandler

Bases: ABC, Generic[RequestT, ResponseT]

The request handler interface.

The request handler is an object, which gets a request as input and may return a response as a result.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: JoinMeetingCommand, /) -> None:
      await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: ReadMeetingQuery, /) -> ReadMeetingQueryResult:
      link = await self._meetings_api.get_link(request.meeting_id)
      return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handle abstractmethod async
handle(request: RequestT) -> ResponseT
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT, /) -> ResponseT:
    raise NotImplementedError

map

RequestMapRegistry module-attribute
RequestMap
RequestMap()
Source code in src/waku/cqrs/requests/map.py
def __init__(self) -> None:
    self._registry: RequestMapRegistry[Any, Any] = {}
registry property
bind
bind(
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, ResponseT]],
) -> Self
Source code in src/waku/cqrs/requests/map.py
def bind(
    self,
    request_type: type[RequestT],
    handler_type: type[RequestHandler[RequestT, ResponseT]],
) -> Self:
    if request_type in self._registry:
        raise RequestHandlerAlreadyRegistered(request_type, handler_type)
    self._registry[request_type] = handler_type
    return self
merge
merge(other: RequestMap) -> Self
Source code in src/waku/cqrs/requests/map.py
def merge(self, other: RequestMap) -> Self:
    for request_type, handler_type in other.registry.items():
        self.bind(request_type, handler_type)
    return self

utils

get_request_response_type cached

get_request_response_type(
    request_type: type[Request[ResponseT]],
) -> type[ResponseT]
Source code in src/waku/cqrs/utils.py
@functools.cache
def get_request_response_type(request_type: type[Request[ResponseT]]) -> type[ResponseT]:
    return typing.cast(type[ResponseT], typing.get_args(get_original_bases(request_type)[0])[0])

di

Activator module-attribute

ProviderSpec module-attribute

ProviderSpec: TypeAlias = Provider | ConditionalProvider

ActivationBuilder

Bases: Protocol

has_active abstractmethod

has_active(type_: Any) -> bool
Source code in src/waku/di/_activation.py
@abstractmethod
def has_active(self, type_: Any) -> bool:
    raise NotImplementedError

ActivationContext

Bases: NamedTuple

Context passed to activators for provider activation decisions.

container_context instance-attribute

container_context: dict[Any, Any] | None

module_type instance-attribute

module_type: ModuleType | DynamicModule

provided_type instance-attribute

provided_type: Any

builder instance-attribute

ConditionalProvider dataclass

ConditionalProvider(
    provider: Provider, when: Activator, provided_type: Any
)

Provider with activation condition.

provider instance-attribute

provider: Provider

when instance-attribute

when: Activator

provided_type instance-attribute

provided_type: Any

Has dataclass

Has(type_: Any)

Activator that checks if a provider for a type is registered.

type_ instance-attribute

type_: Any

IProviderFilter

Bases: Protocol

Strategy for filtering providers based on activation context.

filter

filter(
    providers: list[ProviderSpec],
    context: dict[Any, Any] | None,
    module_type: ModuleType | DynamicModule,
    builder: ActivationBuilder,
) -> list[Provider]
Source code in src/waku/di/_activation.py
def filter(
    self,
    providers: list[ProviderSpec],
    context: dict[Any, Any] | None,
    module_type: ModuleType | DynamicModule,
    builder: ActivationBuilder,
) -> list[Provider]: ...

ProviderFilter dataclass

ProviderFilter(on_skip: OnSkipCallback | None = None)

Default provider filter implementation.

on_skip class-attribute instance-attribute

on_skip: OnSkipCallback | None = field(default=None)

filter

filter(
    providers: list[ProviderSpec],
    context: dict[Any, Any] | None,
    module_type: ModuleType | DynamicModule,
    builder: ActivationBuilder,
) -> list[Provider]
Source code in src/waku/di/_activation.py
def filter(
    self,
    providers: list[ProviderSpec],
    context: dict[Any, Any] | None,
    module_type: ModuleType | DynamicModule,
    builder: ActivationBuilder,
) -> list[Provider]:
    result: list[Provider] = []

    for spec in providers:
        if isinstance(spec, ConditionalProvider):
            ctx = ActivationContext(
                container_context=context,
                module_type=module_type,
                provided_type=spec.provided_type,
                builder=builder,
            )
            if spec.when(ctx):
                result.append(spec.provider)
            elif self.on_skip:
                self.on_skip(spec, ctx)
        else:
            result.append(spec)

    return result

contextual

contextual(
    provided_type: Any, *, scope: Scope = REQUEST
) -> Provider
contextual(
    provided_type: Any,
    *,
    scope: Scope = REQUEST,
    when: Activator,
) -> ConditionalProvider
contextual(
    provided_type: Any,
    *,
    scope: Scope = REQUEST,
    when: Activator | None = None,
) -> ProviderSpec

Provide a dependency from the current context (e.g., app/request).

PARAMETER DESCRIPTION
provided_type

The type to resolve from context.

TYPE: Any

scope

Scope of the context variable (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

Source code in src/waku/di/_providers.py
def contextual(
    provided_type: Any,
    *,
    scope: Scope = Scope.REQUEST,
    when: Activator | None = None,
) -> ProviderSpec:
    """Provide a dependency from the current context (e.g., app/request).

    Args:
        provided_type: The type to resolve from context.
        scope: Scope of the context variable (default: Scope.REQUEST).
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.
    """
    provider_ = Provider()
    provider_.from_context(provided_type, scope=scope)

    if when is None:
        return provider_
    return ConditionalProvider(provider=provider_, when=when, provided_type=provided_type)

many

many(
    interface: Any,
    *implementations: Any,
    scope: Scope = REQUEST,
    cache: bool = True,
) -> Provider
many(
    interface: Any,
    *implementations: Any,
    scope: Scope = REQUEST,
    cache: bool = True,
    when: Activator,
) -> ConditionalProvider
many(
    interface: Any,
    *implementations: Any,
    scope: Scope = REQUEST,
    cache: bool = True,
    when: Activator | None = None,
) -> ProviderSpec

Register multiple implementations as a collection.

PARAMETER DESCRIPTION
interface

Interface type for the collection.

TYPE: Any

*implementations

Implementation types or factory functions to include in collection.

TYPE: Any DEFAULT: ()

scope

Scope of the collection (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

cache

Whether to cache the resolve results within scope.

TYPE: bool DEFAULT: True

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

RAISES DESCRIPTION
ValueError

If no implementations are provided.

TypeError

If a factory function lacks a return type annotation.

Source code in src/waku/di/_providers.py
def many(
    interface: Any,
    *implementations: Any,
    scope: Scope = Scope.REQUEST,
    cache: bool = True,
    when: Activator | None = None,
) -> ProviderSpec:
    """Register multiple implementations as a collection.

    Args:
        interface: Interface type for the collection.
        *implementations: Implementation types or factory functions to include in collection.
        scope: Scope of the collection (default: Scope.REQUEST).
        cache: Whether to cache the resolve results within scope.
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.

    Raises:
        ValueError: If no implementations are provided.
        TypeError: If a factory function lacks a return type annotation.
    """
    if not implementations:
        msg = 'At least one implementation must be provided'
        raise ValueError(msg)

    provided_types = [_get_provided_type(impl) for impl in implementations]

    provider_ = Provider(scope=scope)
    provider_.provide_all(*implementations, cache=cache)

    provider_.provide(
        lambda: [],  # noqa: PIE807
        provides=list[interface],
        cache=cache,
    )
    provider_.alias(list[interface], provides=Sequence[interface], cache=cache)

    for provided_type in provided_types:

        @provider_.decorate
        def _(many_: list[interface], one: provided_type) -> list[interface]:  # type: ignore[valid-type]
            return [*many_, one]

    if when is None:
        return provider_
    return ConditionalProvider(provider=provider_, when=when, provided_type=Sequence[interface])

object_

object_(
    obj: Any, *, provided_type: Any | None = None
) -> Provider
object_(
    obj: Any,
    *,
    provided_type: Any | None = None,
    when: Activator,
) -> ConditionalProvider
object_(
    obj: Any,
    *,
    provided_type: Any | None = None,
    when: Activator | None = None,
) -> ProviderSpec

Provide the exact object passed at creation time as a singleton dependency.

The provider always returns the same object instance, without instantiation or copying.

PARAMETER DESCRIPTION
obj

The instance to provide as-is.

TYPE: Any

provided_type

Explicit type to provide (default: inferred).

TYPE: Any | None DEFAULT: None

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

Source code in src/waku/di/_providers.py
def object_(
    obj: Any,
    *,
    provided_type: Any | None = None,
    when: Activator | None = None,
) -> ProviderSpec:
    """Provide the exact object passed at creation time as a singleton dependency.

    The provider always returns the same object instance, without instantiation or copying.

    Args:
        obj: The instance to provide as-is.
        provided_type: Explicit type to provide (default: inferred).
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.
    """
    actual_type = provided_type if provided_type is not None else type(obj)
    base = provider(lambda: obj, scope=Scope.APP, provided_type=actual_type, cache=True)

    if when is None:
        return base
    return ConditionalProvider(provider=base, when=when, provided_type=actual_type)

provider

provider(
    source: Callable[..., Any] | type[Any],
    *,
    scope: Scope = REQUEST,
    provided_type: Any | None = None,
    cache: bool = True,
) -> Provider

Create a Dishka provider for a callable or type.

PARAMETER DESCRIPTION
source

Callable or type to provide as a dependency.

TYPE: Callable[..., Any] | type[Any]

scope

Scope of the dependency (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

provided_type

Explicit type to provide (default: inferred).

TYPE: Any | None DEFAULT: None

cache

Whether to cache the instance in the scope.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Provider

Configured provider instance.

TYPE: Provider

Source code in src/waku/di/_providers.py
def provider(
    source: Callable[..., Any] | type[Any],
    *,
    scope: Scope = Scope.REQUEST,
    provided_type: Any | None = None,
    cache: bool = True,
) -> Provider:
    """Create a Dishka provider for a callable or type.

    Args:
        source: Callable or type to provide as a dependency.
        scope: Scope of the dependency (default: Scope.REQUEST).
        provided_type: Explicit type to provide (default: inferred).
        cache: Whether to cache the instance in the scope.

    Returns:
        Provider: Configured provider instance.
    """
    provider_ = Provider(scope=scope)
    provider_.provide(source, provides=provided_type, cache=cache)
    return provider_

scoped

scoped(source: type[_T] | Callable[..., _T]) -> Provider
scoped(
    source: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
scoped(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
) -> Provider
scoped(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
scoped(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any]
    | Callable[..., Any]
    | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec

Create a scoped provider (lifetime: request).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

Source code in src/waku/di/_providers.py
def scoped(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec:
    """Create a scoped provider (lifetime: request).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.
    """
    if implementation is not None:
        provided_type = interface_or_source
        base = provider(implementation, scope=Scope.REQUEST, provided_type=provided_type)
    else:
        provided_type = _get_provided_type(interface_or_source)
        base = provider(interface_or_source, scope=Scope.REQUEST)

    if when is None:
        return base
    return ConditionalProvider(provider=base, when=when, provided_type=provided_type)

singleton

singleton(source: type[_T] | Callable[..., _T]) -> Provider
singleton(
    source: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
singleton(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
) -> Provider
singleton(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
singleton(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any]
    | Callable[..., Any]
    | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec

Create a singleton provider (lifetime: app).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

Source code in src/waku/di/_providers.py
def singleton(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec:
    """Create a singleton provider (lifetime: app).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.
    """
    if implementation is not None:
        provided_type = interface_or_source
        base = provider(implementation, scope=Scope.APP, provided_type=provided_type)
    else:
        provided_type = _get_provided_type(interface_or_source)
        base = provider(interface_or_source, scope=Scope.APP)

    if when is None:
        return base
    return ConditionalProvider(provider=base, when=when, provided_type=provided_type)

transient

transient(source: type[_T] | Callable[..., _T]) -> Provider
transient(
    source: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
transient(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
) -> Provider
transient(
    interface: Any,
    implementation: type[_T] | Callable[..., _T],
    /,
    *,
    when: Activator,
) -> ConditionalProvider
transient(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any]
    | Callable[..., Any]
    | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec

Create a transient provider (new instance per injection).

PARAMETER DESCRIPTION
interface_or_source

Interface type or source if no separate implementation.

TYPE: type[Any] | Callable[..., Any]

implementation

Implementation type if interface is provided.

TYPE: type[Any] | Callable[..., Any] | None DEFAULT: None

when

Optional predicate to conditionally activate the provider.

TYPE: Activator | None DEFAULT: None

RETURNS DESCRIPTION
ProviderSpec

Provider or ConditionalProvider if when is specified.

Source code in src/waku/di/_providers.py
def transient(
    interface_or_source: type[Any] | Callable[..., Any],
    implementation: type[Any] | Callable[..., Any] | None = None,
    /,
    *,
    when: Activator | None = None,
) -> ProviderSpec:
    """Create a transient provider (new instance per injection).

    Args:
        interface_or_source: Interface type or source if no separate implementation.
        implementation: Implementation type if interface is provided.
        when: Optional predicate to conditionally activate the provider.

    Returns:
        Provider or ConditionalProvider if `when` is specified.
    """
    if implementation is not None:
        provided_type = interface_or_source
        base = provider(implementation, scope=Scope.REQUEST, provided_type=provided_type, cache=False)
    else:
        provided_type = _get_provided_type(interface_or_source)
        base = provider(interface_or_source, scope=Scope.REQUEST, cache=False)

    if when is None:
        return base
    return ConditionalProvider(provider=base, when=when, provided_type=provided_type)

exceptions

WakuError

Bases: Exception

extensions

ApplicationExtension module-attribute

ModuleExtension module-attribute

DEFAULT_EXTENSIONS module-attribute

DEFAULT_EXTENSIONS: Sequence[ApplicationExtension] = (
    ValidationExtension(
        [DependenciesAccessibleRule()], strict=True
    ),
)

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async

after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None: ...

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async

on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None: ...

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async

on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None: ...

OnBeforeContainerBuild

Bases: Protocol

Extension for aggregating configuration across modules before container build.

This hook runs after all modules are collected but before the DI container is created. Use this for cross-module configuration aggregation that needs to produce injectable registries.

Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).

Execution order
  1. Application-level extensions (in registration order)
  2. Module-level extensions (in topological order)

on_before_container_build

on_before_container_build(
    modules: Sequence[Module],
    context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]

Aggregate configuration from modules and return providers to register.

PARAMETER DESCRIPTION
modules

All application modules in topological order (dependencies first).

TYPE: Sequence[Module]

context

Application context passed to WakuFactory (read-only).

TYPE: Mapping[Any, Any] | None

RETURNS DESCRIPTION
Sequence[ProviderSpec]

Sequence of providers to add to the container.

Source code in src/waku/extensions/protocols.py
def on_before_container_build(
    self,
    modules: Sequence[Module],
    context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]:
    """Aggregate configuration from modules and return providers to register.

    Args:
        modules: All application modules in topological order (dependencies first).
        context: Application context passed to WakuFactory (read-only).

    Returns:
        Sequence of providers to add to the container.
    """
    ...

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure

on_module_configure(metadata: ModuleMetadata) -> None

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async

on_module_destroy(module: Module) -> None
Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None: ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async

on_module_init(module: Module) -> None
Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None: ...

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)

register_application_extension

register_application_extension(
    extension: ApplicationExtension,
) -> Self

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast(type[ApplicationExtension], base)].append(extension)
    return self

register_module_extension

register_module_extension(
    module_type: ModuleType, extension: ModuleExtension
) -> Self
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self

get_application_extensions

get_application_extensions(
    extension_type: type[_AppExtT],
) -> list[_AppExtT]
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast(list[_AppExtT], self._app_extensions.get(cast(type[ApplicationExtension], extension_type), []))

get_module_extensions

get_module_extensions(
    module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast(list[_ModExtT], self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

protocols

Extension protocols for application and module lifecycle hooks.

ApplicationExtension module-attribute

ModuleExtension module-attribute

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async
on_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None: ...

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async
after_app_init(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None: ...

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async
on_app_shutdown(app: WakuApplication) -> None
Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None: ...

OnBeforeContainerBuild

Bases: Protocol

Extension for aggregating configuration across modules before container build.

This hook runs after all modules are collected but before the DI container is created. Use this for cross-module configuration aggregation that needs to produce injectable registries.

Can be declared at both application level (passed to WakuFactory) and module level (in module's extensions list).

Execution order
  1. Application-level extensions (in registration order)
  2. Module-level extensions (in topological order)
on_before_container_build
on_before_container_build(
    modules: Sequence[Module],
    context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]

Aggregate configuration from modules and return providers to register.

PARAMETER DESCRIPTION
modules

All application modules in topological order (dependencies first).

TYPE: Sequence[Module]

context

Application context passed to WakuFactory (read-only).

TYPE: Mapping[Any, Any] | None

RETURNS DESCRIPTION
Sequence[ProviderSpec]

Sequence of providers to add to the container.

Source code in src/waku/extensions/protocols.py
def on_before_container_build(
    self,
    modules: Sequence[Module],
    context: Mapping[Any, Any] | None,
) -> Sequence[ProviderSpec]:
    """Aggregate configuration from modules and return providers to register.

    Args:
        modules: All application modules in topological order (dependencies first).
        context: Application context passed to WakuFactory (read-only).

    Returns:
        Sequence of providers to add to the container.
    """
    ...

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure
on_module_configure(metadata: ModuleMetadata) -> None

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async
on_module_init(module: Module) -> None
Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None: ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async
on_module_destroy(module: Module) -> None
Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None: ...

registry

Extension registry for centralized management of extensions.

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)
register_application_extension
register_application_extension(
    extension: ApplicationExtension,
) -> Self

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast(type[ApplicationExtension], base)].append(extension)
    return self
register_module_extension
register_module_extension(
    module_type: ModuleType, extension: ModuleExtension
) -> Self
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self
get_application_extensions
get_application_extensions(
    extension_type: type[_AppExtT],
) -> list[_AppExtT]
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast(list[_AppExtT], self._app_extensions.get(cast(type[ApplicationExtension], extension_type), []))
get_module_extensions
get_module_extensions(
    module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast(list[_ModExtT], self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

factory

ContainerConfig dataclass

ContainerConfig(
    *,
    lock_factory: _LockFactory = Lock,
    start_scope: Scope | None = None,
    skip_validation: bool = False,
)

lock_factory class-attribute instance-attribute

lock_factory: _LockFactory = Lock

start_scope class-attribute instance-attribute

start_scope: Scope | None = None

skip_validation class-attribute instance-attribute

skip_validation: bool = False

WakuFactory

WakuFactory(
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[
        ApplicationExtension
    ] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
    provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
    provider_filter: IProviderFilter | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()
    self._provider_filter = provider_filter

create

create() -> WakuApplication
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(
        self._root_module_type,
        context=self._context,
        provider_filter=self._provider_filter,
    ).build()

    additional_providers = self._execute_before_build_hooks(registry.modules)
    all_providers = (*registry.providers, *additional_providers)

    container = self._build_container(all_providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

lifespan

LifespanFunc module-attribute

LifespanFunc: TypeAlias = (
    Callable[
        ['WakuApplication'],
        AbstractAsyncContextManager[None],
    ]
    | AbstractAsyncContextManager[None]
)

LifespanWrapper

LifespanWrapper(lifespan_func: LifespanFunc)
Source code in src/waku/lifespan.py
def __init__(self, lifespan_func: LifespanFunc) -> None:
    self._lifespan_func = lifespan_func

lifespan async

lifespan(app: WakuApplication) -> AsyncIterator[None]
Source code in src/waku/lifespan.py
@asynccontextmanager
async def lifespan(self, app: WakuApplication) -> AsyncIterator[None]:
    ctx_manager = (
        self._lifespan_func
        if isinstance(self._lifespan_func, AbstractAsyncContextManager)
        else self._lifespan_func(app)
    )
    async with ctx_manager:
        yield

modules

ModuleType module-attribute

DynamicModule dataclass

DynamicModule(
    *,
    providers: list[ProviderSpec] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
    parent_module: ModuleType,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers: list[ProviderSpec] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

parent_module instance-attribute

parent_module: ModuleType

HasModuleMetadata

Bases: Protocol

ModuleCompiler

extract_metadata

extract_metadata(
    module_type: ModuleType | DynamicModule,
) -> tuple[ModuleType, ModuleMetadata]
Source code in src/waku/modules/_metadata.py
def extract_metadata(self, module_type: ModuleType | DynamicModule) -> tuple[ModuleType, ModuleMetadata]:
    try:
        return self._extract_metadata(cast(Hashable, module_type))
    except AttributeError:
        msg = f'{type(module_type).__name__} is not module'
        raise ValueError(msg) from None

ModuleMetadata dataclass

ModuleMetadata(
    *,
    providers: list[ProviderSpec] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
)

providers class-attribute instance-attribute

providers: list[ProviderSpec] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

Module

Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[ProviderSpec]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id: Final[UUID] = id

target instance-attribute

target: Final[ModuleType] = module_type

providers instance-attribute

providers: Final[Sequence[ProviderSpec]] = providers

imports instance-attribute

imports: Final[Sequence[ModuleType | DynamicModule]] = (
    imports
)

exports instance-attribute

exports: Final[
    Sequence[type[object] | ModuleType | DynamicModule]
] = exports

extensions instance-attribute

extensions: Final[Sequence[ModuleExtension]] = extensions

is_global instance-attribute

is_global: Final[bool] = is_global

name property

name: str

provider property

provider: BaseProvider

Get the aggregated provider for this module.

This property returns the provider created by create_provider(). Must be called after create_provider() has been invoked.

RAISES DESCRIPTION
RuntimeError

If create_provider() has not been called yet.

create_provider

create_provider(
    context: dict[Any, Any] | None,
    builder: ActivationBuilder,
    provider_filter: IProviderFilter,
) -> BaseProvider

Create aggregated provider with activation filtering applied.

PARAMETER DESCRIPTION
context

Context dict for activation decisions.

TYPE: dict[Any, Any] | None

builder

Activation builder for checking if types are registered.

TYPE: ActivationBuilder

provider_filter

Filter strategy for conditional provider activation.

TYPE: IProviderFilter

RETURNS DESCRIPTION
BaseProvider

BaseProvider with only active providers aggregated.

Source code in src/waku/modules/_module.py
def create_provider(
    self,
    context: dict[Any, Any] | None,
    builder: ActivationBuilder,
    provider_filter: IProviderFilter,
) -> BaseProvider:
    """Create aggregated provider with activation filtering applied.

    Args:
        context: Context dict for activation decisions.
        builder: Activation builder for checking if types are registered.
        provider_filter: Filter strategy for conditional provider activation.

    Returns:
        BaseProvider with only active providers aggregated.
    """
    active_providers = provider_filter.filter(
        list(self.providers),
        context=context,
        module_type=self.target,
        builder=builder,
    )

    cls = cast(type[_ModuleProvider], type(f'{self.name}Provider', (_ModuleProvider,), {}))
    self._provider = cls(active_providers)
    return self._provider

ModuleRegistry

ModuleRegistry(
    *,
    compiler: ModuleCompiler,
    root_module: Module,
    modules: dict[UUID, Module],
    providers: list[BaseProvider],
    adjacency: AdjacencyMatrix,
)

Immutable registry and graph for module queries, traversal, and lookups.

Source code in src/waku/modules/_registry.py
def __init__(
    self,
    *,
    compiler: ModuleCompiler,
    root_module: Module,
    modules: dict[UUID, Module],
    providers: list[BaseProvider],
    adjacency: AdjacencyMatrix,
) -> None:
    self._compiler = compiler
    self._root_module = root_module
    self._modules = modules
    self._providers = tuple(providers)
    self._adjacency = adjacency

root_module property

root_module: Module

modules property

modules: tuple[Module, ...]

providers property

providers: tuple[BaseProvider, ...]

compiler property

compiler: ModuleCompiler

get

get(module_type: ModuleType | DynamicModule) -> Module
Source code in src/waku/modules/_registry.py
def get(self, module_type: ModuleType | DynamicModule) -> Module:
    module_id = self.compiler.extract_metadata(module_type)[1].id
    return self.get_by_id(module_id)

get_by_id

get_by_id(module_id: UUID) -> Module
Source code in src/waku/modules/_registry.py
def get_by_id(self, module_id: UUID) -> Module:
    module = self._modules.get(module_id)
    if module is None:
        msg = f'Module with ID {module_id} is not registered in the graph.'
        raise KeyError(msg)
    return module

traverse

traverse(from_: Module | None = None) -> Iterator[Module]

Traverse the module graph in depth-first post-order (children before parent) recursively.

PARAMETER DESCRIPTION
from_

Start module (default: root)

TYPE: Module | None DEFAULT: None

YIELDS DESCRIPTION
Module

Each traversed module (post-order)

TYPE:: Module

Source code in src/waku/modules/_registry.py
def traverse(self, from_: Module | None = None) -> Iterator[Module]:
    """Traverse the module graph in depth-first post-order (children before parent) recursively.

    Args:
        from_: Start module (default: root)

    Yields:
        Module: Each traversed module (post-order)
    """
    start_module = from_ or self._root_module
    visited: set[UUID] = set()

    def _dfs(module: Module) -> Iterator[Module]:
        if module.id in visited:
            return

        visited.add(module.id)

        # Process children first (maintain original order)
        neighbor_ids = self._adjacency[module.id]
        for neighbor_id in neighbor_ids:
            if neighbor_id == module.id:
                continue
            neighbor = self.get_by_id(neighbor_id)
            yield from _dfs(neighbor)

        # Process current module after children (post-order)
        yield module

    yield from _dfs(start_module)

ModuleRegistryBuilder

ModuleRegistryBuilder(
    root_module_type: ModuleType,
    compiler: ModuleCompiler | None = None,
    context: dict[Any, Any] | None = None,
    provider_filter: IProviderFilter | None = None,
)
Source code in src/waku/modules/_registry_builder.py
def __init__(
    self,
    root_module_type: ModuleType,
    compiler: ModuleCompiler | None = None,
    context: dict[Any, Any] | None = None,
    provider_filter: IProviderFilter | None = None,
) -> None:
    self._compiler: Final = compiler or ModuleCompiler()
    self._root_module_type: Final = root_module_type
    self._context: Final = context
    self._provider_filter: Final[IProviderFilter] = provider_filter or ProviderFilter()
    self._modules: dict[UUID, Module] = {}
    self._providers: list[BaseProvider] = []

    self._metadata_cache: dict[ModuleType | DynamicModule, tuple[ModuleType, ModuleMetadata]] = {}
    self._builder: Final = _ActivationBuilder()

build

build() -> ModuleRegistry
Source code in src/waku/modules/_registry_builder.py
def build(self) -> ModuleRegistry:
    modules, adjacency = self._collect_modules()
    self._build_type_registry(modules)
    root_module = self._register_modules(modules)
    return self._build_registry(root_module, adjacency)

module

module(
    *,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[
        type[object] | ModuleType | DynamicModule
    ] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[ProviderSpec] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

testing

override

override(
    container: AsyncContainer,
    *providers: BaseProvider,
    context: dict[Any, Any] | None = None,
) -> Iterator[None]

Temporarily override providers and/or context in an AsyncContainer for testing.

PARAMETER DESCRIPTION
container

The container whose providers/context will be overridden.

TYPE: AsyncContainer

*providers

Providers to override in the container.

TYPE: BaseProvider DEFAULT: ()

context

Context values to override.

TYPE: dict[Any, Any] | None DEFAULT: None

YIELDS DESCRIPTION
None

Context in which the container uses the overridden providers/context.

TYPE:: None

Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override


class Service: ...


class ServiceOverride(Service): ...


# Override providers
with override(application.container, singleton(ServiceOverride, provided_type=Service)):
    service = await application.container.get(Service)
    assert isinstance(service, ServiceOverride)

# Override context
with override(application.container, context={int: 123}):
    ...
Source code in src/waku/testing.py
@contextmanager
def override(
    container: AsyncContainer,
    *providers: BaseProvider,
    context: dict[Any, Any] | None = None,
) -> Iterator[None]:
    """Temporarily override providers and/or context in an AsyncContainer for testing.

    Args:
        container: The container whose providers/context will be overridden.
        *providers: Providers to override in the container.
        context: Context values to override.

    Yields:
        None: Context in which the container uses the overridden providers/context.

    Example:
        ```python
        from waku import WakuFactory, module
        from waku.di import Scope, singleton
        from waku.testing import override


        class Service: ...


        class ServiceOverride(Service): ...


        # Override providers
        with override(application.container, singleton(ServiceOverride, provided_type=Service)):
            service = await application.container.get(Service)
            assert isinstance(service, ServiceOverride)

        # Override context
        with override(application.container, context={int: 123}):
            ...
        ```
    """
    _mark_as_overrides(providers)

    original_context = cast(dict[Any, Any], container._context)  # noqa: SLF001
    merged_context = {**original_context, **(context or {})}
    context_override_types = frozenset(context.keys()) if context else frozenset()

    new_container = make_async_container(
        _container_provider(container, context_override_types),
        *providers,
        context=merged_context,
        start_scope=container.scope,
        validation_settings=STRICT_VALIDATION,
    )

    _swap(container, new_container)
    yield
    _swap(new_container, container)

create_test_app async

create_test_app(
    *,
    base: ModuleType | DynamicModule | None = None,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    app_extensions: Sequence[
        ApplicationExtension
    ] = DEFAULT_EXTENSIONS,
    context: dict[Any, Any] | None = None,
) -> AsyncIterator[WakuApplication]

Create a minimal test application with given configuration.

Useful for testing extensions and module configurations in isolation without needing to set up a full application structure.

PARAMETER DESCRIPTION
base

Base module to build upon. When provided, the test module imports this module and providers act as overrides.

TYPE: ModuleType | DynamicModule | None DEFAULT: None

providers

Providers to register in the test module. When base is provided, these override existing providers.

TYPE: Sequence[ProviderSpec] DEFAULT: ()

imports

Additional modules to import into the test module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

extensions

Module extensions to register.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

app_extensions

Application extensions to register (default: DEFAULT_EXTENSIONS).

TYPE: Sequence[ApplicationExtension] DEFAULT: DEFAULT_EXTENSIONS

context

Context values to pass to the container.

TYPE: dict[Any, Any] | None DEFAULT: None

YIELDS DESCRIPTION
AsyncIterator[WakuApplication]

Initialized WakuApplication.

Example
from waku.testing import create_test_app
from waku.di import singleton


class IRepository(Protocol):
    async def get(self, id: str) -> Entity: ...


class FakeRepository(IRepository):
    async def get(self, id: str) -> Entity:
        return Entity(id=id)


# Create test app from scratch
async def test_my_extension():
    extension = MyExtension().bind(SomeEvent, SomeHandler)

    async with create_test_app(
        extensions=[extension],
        providers=[singleton(IRepository, FakeRepository)],
    ) as app:
        service = await app.container.get(MyService)
        result = await service.do_something()
        assert result == expected


# Create test app based on existing module with overrides
async def test_with_base_module():
    async with create_test_app(
        base=AppModule,
        providers=[singleton(IRepository, FakeRepository)],
    ) as app:
        # FakeRepository replaces the real one from AppModule
        repo = await app.container.get(IRepository)
        assert isinstance(repo, FakeRepository)
Source code in src/waku/testing.py
@asynccontextmanager
async def create_test_app(
    *,
    base: ModuleType | DynamicModule | None = None,
    providers: Sequence[ProviderSpec] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    app_extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    context: dict[Any, Any] | None = None,
) -> AsyncIterator[WakuApplication]:
    """Create a minimal test application with given configuration.

    Useful for testing extensions and module configurations in isolation
    without needing to set up a full application structure.

    Args:
        base: Base module to build upon. When provided, the test module
            imports this module and providers act as overrides.
        providers: Providers to register in the test module.
            When `base` is provided, these override existing providers.
        imports: Additional modules to import into the test module.
        extensions: Module extensions to register.
        app_extensions: Application extensions to register (default: DEFAULT_EXTENSIONS).
        context: Context values to pass to the container.

    Yields:
        Initialized WakuApplication.

    Example:
        ```python
        from waku.testing import create_test_app
        from waku.di import singleton


        class IRepository(Protocol):
            async def get(self, id: str) -> Entity: ...


        class FakeRepository(IRepository):
            async def get(self, id: str) -> Entity:
                return Entity(id=id)


        # Create test app from scratch
        async def test_my_extension():
            extension = MyExtension().bind(SomeEvent, SomeHandler)

            async with create_test_app(
                extensions=[extension],
                providers=[singleton(IRepository, FakeRepository)],
            ) as app:
                service = await app.container.get(MyService)
                result = await service.do_something()
                assert result == expected


        # Create test app based on existing module with overrides
        async def test_with_base_module():
            async with create_test_app(
                base=AppModule,
                providers=[singleton(IRepository, FakeRepository)],
            ) as app:
                # FakeRepository replaces the real one from AppModule
                repo = await app.container.get(IRepository)
                assert isinstance(repo, FakeRepository)
        ```
    """
    all_imports = list(imports)
    if base is not None:
        all_imports.insert(0, base)

    override_providers = list(providers)
    if base is not None:
        _mark_as_overrides(override_providers)

    @module(
        providers=override_providers,
        imports=all_imports,
        extensions=list(extensions),
    )
    class _TestModule:
        pass

    app = WakuFactory(_TestModule, context=context, extensions=app_extensions).create()
    async with app:
        yield app

validation

ValidationRule

Bases: Protocol

validate

validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/_abc.py
def validate(self, context: ValidationContext) -> list[ValidationError]: ...

ValidationError

Bases: WakuError

ValidationExtension

ValidationExtension(
    rules: Sequence[ValidationRule], *, strict: bool = True
)

Bases: AfterApplicationInit

Source code in src/waku/validation/_extension.py
def __init__(self, rules: Sequence[ValidationRule], *, strict: bool = True) -> None:
    self.rules = rules
    self.strict: Final = strict

rules instance-attribute

rules = rules

strict instance-attribute

strict: Final = strict

after_app_init async

after_app_init(app: WakuApplication) -> None
Source code in src/waku/validation/_extension.py
async def after_app_init(self, app: WakuApplication) -> None:
    context = ValidationContext(app=app)

    errors_chain = chain.from_iterable(rule.validate(context) for rule in self.rules)
    if errors := list(errors_chain):
        self._raise(errors)

rules

DependenciesAccessibleRule

DependenciesAccessibleRule(cache_size: int = 1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()

    registry = context.app.registry
    modules = list(registry.modules)
    container = context.app.container

    strategies: list[AccessibilityStrategy] = [
        GlobalProvidersStrategy(modules, container, self._types_extractor, registry),
        LocalProvidersStrategy(self._types_extractor),
        ContextVarsStrategy(self._types_extractor),
        ImportedModulesStrategy(registry, self._types_extractor),
    ]

    checker = DependencyAccessChecker(strategies)
    errors: list[ValidationError] = []

    for module in modules:
        for factory in module.provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors

DependencyInaccessibleError

DependencyInaccessibleError(
    required_type: type[object],
    required_by: object,
    from_module: Module,
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module

dependency_accessible

DependencyInaccessibleError
DependencyInaccessibleError(
    required_type: type[object],
    required_by: object,
    from_module: Module,
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module
AccessibilityStrategy

Bases: ABC

Strategy for checking if a type is accessible to a module.

is_accessible abstractmethod
is_accessible(
    required_type: type[object], module: Module
) -> bool

Check if the required type is accessible to the given module.

Source code in src/waku/validation/rules/dependency_accessible.py
@abstractmethod
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    """Check if the required type is accessible to the given module."""
GlobalProvidersStrategy
GlobalProvidersStrategy(
    modules: Sequence[Module],
    container: AsyncContainer,
    types_extractor: ModuleTypesExtractor,
    registry: ModuleRegistry,
)

Bases: AccessibilityStrategy

Check if type is provided by a global module or APP-scoped context.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    modules: Sequence[Module],
    container: AsyncContainer,
    types_extractor: ModuleTypesExtractor,
    registry: ModuleRegistry,
) -> None:
    self._global_types = self._build_global_types(modules, container, types_extractor, registry)
is_accessible
is_accessible(
    required_type: type[object], module: Module
) -> bool
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._global_types
LocalProvidersStrategy
LocalProvidersStrategy(
    types_extractor: ModuleTypesExtractor,
)

Bases: AccessibilityStrategy

Check if type is provided by the module itself.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, types_extractor: ModuleTypesExtractor) -> None:
    self._types_extractor = types_extractor
is_accessible
is_accessible(
    required_type: type[object], module: Module
) -> bool
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._types_extractor.get_provided_types(module)
ContextVarsStrategy
ContextVarsStrategy(types_extractor: ModuleTypesExtractor)

Bases: AccessibilityStrategy

Check if type is provided by application or request container context.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, types_extractor: ModuleTypesExtractor) -> None:
    self._types_extractor = types_extractor
is_accessible
is_accessible(
    required_type: type[object], module: Module
) -> bool
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    return required_type in self._types_extractor.get_context_vars(module)
ImportedModulesStrategy
ImportedModulesStrategy(
    registry: ModuleRegistry,
    types_extractor: ModuleTypesExtractor,
)

Bases: AccessibilityStrategy

Check if type is accessible via imported modules (direct export or re-export).

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, registry: ModuleRegistry, types_extractor: ModuleTypesExtractor) -> None:
    self._registry = registry
    self._types_extractor = types_extractor
is_accessible
is_accessible(
    required_type: type[object], module: Module
) -> bool
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def is_accessible(self, required_type: type[object], module: Module) -> bool:
    for imported in module.imports:
        imported_module = self._registry.get(imported)
        if self._is_directly_exported(required_type, imported_module):
            return True
        if self._is_reexported(required_type, imported_module):
            return True
    return False
DependencyAccessChecker
DependencyAccessChecker(
    strategies: Sequence[AccessibilityStrategy],
)

Handles dependency accessibility checks between modules.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, strategies: Sequence[AccessibilityStrategy]) -> None:
    self._strategies = strategies
find_inaccessible_dependencies
find_inaccessible_dependencies(
    dependencies: Sequence[DependencyKey], module: Module
) -> Iterable[type[object]]
Source code in src/waku/validation/rules/dependency_accessible.py
def find_inaccessible_dependencies(
    self,
    dependencies: Sequence[DependencyKey],
    module: Module,
) -> Iterable[type[object]]:
    for dependency in dependencies:
        if not self._is_accessible(dependency.type_hint, module):
            yield dependency.type_hint
DependenciesAccessibleRule
DependenciesAccessibleRule(cache_size: int = 1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()

    registry = context.app.registry
    modules = list(registry.modules)
    container = context.app.container

    strategies: list[AccessibilityStrategy] = [
        GlobalProvidersStrategy(modules, container, self._types_extractor, registry),
        LocalProvidersStrategy(self._types_extractor),
        ContextVarsStrategy(self._types_extractor),
        ImportedModulesStrategy(registry, self._types_extractor),
    ]

    checker = DependencyAccessChecker(strategies)
    errors: list[ValidationError] = []

    for module in modules:
        for factory in module.provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors