Skip to content

waku

WakuApplication

WakuApplication(
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container: AsyncContainer

registry property

registry: ModuleRegistry

initialize async

initialize() -> None
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close() -> None
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

WakuFactory

WakuFactory(
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[
        ApplicationExtension
    ] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()

create

create() -> WakuApplication
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(self._root_module_type).build()
    container = self._build_container(registry.providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

DynamicModule dataclass

DynamicModule(
    *,
    providers: list[BaseProvider] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
    parent_module: ModuleType,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers: list[BaseProvider] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

parent_module instance-attribute

parent_module: ModuleType

Module

Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[BaseProvider]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id: Final[UUID] = id

target instance-attribute

target: Final[ModuleType] = module_type

providers instance-attribute

providers: Final[Sequence[BaseProvider]] = providers

imports instance-attribute

imports: Final[Sequence[ModuleType | DynamicModule]] = (
    imports
)

exports instance-attribute

exports: Final[
    Sequence[type[object] | ModuleType | DynamicModule]
] = exports

extensions instance-attribute

extensions: Final[Sequence[ModuleExtension]] = extensions

is_global instance-attribute

is_global: Final[bool] = is_global

name property

name: str

provider cached property

provider: BaseProvider

module

module(
    *,
    providers: Sequence[BaseProvider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[
        type[object] | ModuleType | DynamicModule
    ] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[BaseProvider] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[BaseProvider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

application

WakuApplication

WakuApplication(
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
)
Source code in src/waku/application.py
def __init__(
    self,
    *,
    container: AsyncContainer,
    registry: ModuleRegistry,
    lifespan: Sequence[LifespanFunc | LifespanWrapper],
    extension_registry: ExtensionRegistry,
) -> None:
    self._container = container
    self._registry = registry
    self._lifespan = tuple(
        LifespanWrapper(lifespan_func) if not isinstance(lifespan_func, LifespanWrapper) else lifespan_func
        for lifespan_func in lifespan
    )
    self._extension_registry = extension_registry

    self._exit_stack = AsyncExitStack()
    self._initialized = False

container property

container: AsyncContainer

registry property

registry: ModuleRegistry

initialize async

initialize() -> None
Source code in src/waku/application.py
async def initialize(self) -> None:
    if self._initialized:
        return
    await self._call_on_init_extensions()
    self._initialized = True
    await self._call_after_init_extensions()

close async

close() -> None
Source code in src/waku/application.py
async def close(self) -> None:
    if not self._initialized:
        return
    await self._call_on_shutdown_extensions()
    self._initialized = False

cqrs

NextHandlerType module-attribute

NextHandlerType: TypeAlias = Callable[
    [RequestT], Awaitable[ResponseT]
]

Event dataclass

Event()

Base class for events.

IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async

handle(
    request: RequestT,
    next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

Request dataclass

Request(*, request_id: UUID = uuid4())

Bases: Generic[ResponseT]

Base class for request-type objects.

request_id class-attribute instance-attribute

request_id: UUID = field(default_factory=uuid4)

Response dataclass

Response()

Base class for response type objects.

EventHandler

Bases: ABC, Generic[EventT]

The event handler interface.

Usage::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, event: UserJoinedEvent) -> None:
      await self._meetings_api.notify_room(event.meeting_id, "New user joined!")

handle abstractmethod async

handle(event: EventT) -> None
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: EventT) -> None:
    raise NotImplementedError

Mediator

Mediator(
    container: AsyncContainer,
    event_publisher: EventPublisher,
)

Bases: IMediator

Default CQRS implementation.

Initialize the mediator.

PARAMETER DESCRIPTION
container

Container used to resolve handlers and behaviors

TYPE: AsyncContainer

event_publisher

Function to publish events to handlers

TYPE: EventPublisher

Source code in src/waku/cqrs/impl.py
def __init__(self, container: AsyncContainer, event_publisher: EventPublisher) -> None:
    """Initialize the mediator.

    Args:
        container: Container used to resolve handlers and behaviors
        event_publisher: Function to publish events to handlers
    """
    self._container = container
    self._event_publisher = event_publisher

send async

send(request: Request[ResponseT]) -> ResponseT

Send a request through the CQRS pipeline chain.

PARAMETER DESCRIPTION
request

The request to process

TYPE: Request[ResponseT]

RETURNS DESCRIPTION
ResponseT

Response from the handler

RAISES DESCRIPTION
RequestHandlerNotFound

If no handler is registered for the request type

Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Send a request through the CQRS pipeline chain.

    Args:
        request: The request to process

    Returns:
        Response from the handler

    Raises:
        RequestHandlerNotFound: If no handler is registered for the request type
    """
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)
    return await self._handle_request(handler, request)

publish async

publish(event: Event) -> None

Publish an event to all registered handlers.

PARAMETER DESCRIPTION
event

The event to publish

TYPE: Event

RAISES DESCRIPTION
EventHandlerNotFound

If no handlers are registered for the event type

Source code in src/waku/cqrs/impl.py
@override
async def publish(self, event: Event) -> None:
    """Publish an event to all registered handlers.

    Args:
        event: The event to publish

    Raises:
        EventHandlerNotFound: If no handlers are registered for the event type
    """
    event_type = type(event)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, event)

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async

publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event) -> None:
    """Asynchronously send event to multiple handlers."""

send abstractmethod async

send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish event through the cqrs to be handled by multiple handlers.

publish abstractmethod async

publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event) -> None:
    """Asynchronously send event to multiple handlers."""

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async

send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Asynchronously send a request to a single handler."""

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type: type[
        IMediator
    ] = Mediator,
    event_publisher: type[
        EventPublisher
    ] = SequentialEventPublisher,
    pipeline_behaviors: Sequence[
        type[IPipelineBehavior[Any, Any]]
    ] = (),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)

mediator_implementation_type class-attribute instance-attribute

mediator_implementation_type: type[IMediator] = Mediator

event_publisher class-attribute instance-attribute

pipeline_behaviors class-attribute instance-attribute

pipeline_behaviors: Sequence[
    type[IPipelineBehavior[Any, Any]]
] = ()

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._request_map = RequestMap()
    self._event_map = EventMap()
    self._behavior_map = PipelineBehaviourMap()

bind_request

bind_request(
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
    *,
    behaviors: list[
        type[IPipelineBehavior[RequestT, ResponseT]]
    ]
    | None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, ResponseT]]] | None = None,
) -> Self:
    self._request_map.bind(request_type, handler_type)
    if behaviors:
        self._behavior_map.bind(request_type, behaviors)
    return self

bind_event

bind_event(
    event_type: type[EventT],
    handler_types: list[EventHandlerType[EventT]],
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[EventT],
    handler_types: list[EventHandlerType[EventT]],
) -> Self:
    self._event_map.bind(event_type, handler_types)
    return self

on_module_configure

on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    metadata.providers.extend(
        chain(
            self._create_request_handler_providers(),
            self._create_event_handler_providers(),
            self._create_pipeline_behavior_providers(),
        ),
    )

MediatorModule

register classmethod

register(
    config: MediatorConfig | None = None,
) -> DynamicModule

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        is_global=True,
    )

RequestHandler

Bases: ABC, Generic[RequestT, ResponseT]

The request handler interface.

The request handler is an object, which gets a request as input and may return a response as a result.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: JoinMeetingCommand) -> None:
      await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
      link = await self._meetings_api.get_link(request.meeting_id)
      return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)

handle abstractmethod async

handle(request: RequestT) -> ResponseT
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT) -> ResponseT:
    raise NotImplementedError

RequestMap

RequestMap()
Source code in src/waku/cqrs/requests/map.py
def __init__(self) -> None:
    self._registry: RequestMapRegistry[Any, Any] = {}

registry property

bind

bind(
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
) -> Self
Source code in src/waku/cqrs/requests/map.py
def bind(self, request_type: type[RequestT], handler_type: RequestHandlerType[RequestT, ResponseT]) -> Self:
    if request_type in self._registry:
        raise RequestHandlerAlreadyRegistered(request_type, handler_type)
    self._registry[request_type] = handler_type
    return self

merge

merge(other: RequestMap) -> Self
Source code in src/waku/cqrs/requests/map.py
def merge(self, other: RequestMap) -> Self:
    for request_type, handler_type in other.registry.items():
        self.bind(request_type, handler_type)
    return self

contracts

event

EventT module-attribute
EventT = TypeVar(
    'EventT', bound='Event', contravariant=True
)
Event dataclass
Event()

Base class for events.

pipeline

NextHandlerType module-attribute
NextHandlerType: TypeAlias = Callable[
    [RequestT], Awaitable[ResponseT]
]
IPipelineBehavior

Bases: ABC, Generic[RequestT, ResponseT]

Interface for pipeline behaviors that wrap request handling.

handle abstractmethod async
handle(
    request: RequestT,
    next_handler: NextHandlerType[RequestT, ResponseT],
) -> ResponseT

Handle the request and call the next handler in the pipeline.

PARAMETER DESCRIPTION
request

The request to handle

TYPE: RequestT

next_handler

Function to call the next handler in the pipeline

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
ResponseT

The response from the pipeline

Source code in src/waku/cqrs/contracts/pipeline.py
@abstractmethod
async def handle(self, request: RequestT, next_handler: NextHandlerType[RequestT, ResponseT]) -> ResponseT:
    """Handle the request and call the next handler in the pipeline.

    Args:
        request: The request to handle
        next_handler: Function to call the next handler in the pipeline

    Returns:
        The response from the pipeline
    """
    ...

request

RequestT module-attribute
RequestT = TypeVar(
    'RequestT', bound='Request[Any]', contravariant=True
)
ResponseT module-attribute
ResponseT = TypeVar(
    'ResponseT',
    bound='Response | None',
    default=None,
    covariant=True,
)
Request dataclass
Request(*, request_id: UUID = uuid4())

Bases: Generic[ResponseT]

Base class for request-type objects.

request_id class-attribute instance-attribute
request_id: UUID = field(default_factory=uuid4)
Response dataclass
Response()

Base class for response type objects.

events

handler

EventHandlerType module-attribute
EventHandlerType: TypeAlias = type[EventHandler[EventT]]
EventHandler

Bases: ABC, Generic[EventT]

The event handler interface.

Usage::

class UserJoinedEventHandler(EventHandler[UserJoinedEvent]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, event: UserJoinedEvent) -> None:
      await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
handle abstractmethod async
handle(event: EventT) -> None
Source code in src/waku/cqrs/events/handler.py
@abc.abstractmethod
async def handle(self, event: EventT) -> None:
    raise NotImplementedError

map

EventMapRegistry module-attribute
EventMap
EventMap()
Source code in src/waku/cqrs/events/map.py
def __init__(self) -> None:
    self._registry: EventMapRegistry[Any] = defaultdict(list)
registry property
registry: EventMapRegistry[Any]
bind
bind(
    event_type: type[EventT],
    handler_types: list[EventHandlerType[EventT]],
) -> Self
Source code in src/waku/cqrs/events/map.py
def bind(self, event_type: type[EventT], handler_types: list[EventHandlerType[EventT]]) -> Self:
    for handler_type in handler_types:
        if handler_type in self._registry[event_type]:
            raise EventHandlerAlreadyRegistered(event_type, handler_type)
        self._registry[event_type].append(handler_type)
    return self
merge
merge(other: EventMap) -> Self
Source code in src/waku/cqrs/events/map.py
def merge(self, other: EventMap) -> Self:
    for event_type, handlers in other.registry.items():
        self.bind(event_type, handlers)
    return self

publish

EventPublisher

Bases: Protocol

SequentialEventPublisher
GroupEventPublisher

exceptions

MediatorError

Bases: WakuError

Base exception for all cqrs-related errors.

ImproperlyConfiguredError

Bases: MediatorError

Raised when cqrs configuration is invalid.

RequestHandlerAlreadyRegistered

RequestHandlerAlreadyRegistered(
    request_type: type[Request[Any]],
    handler_type: RequestHandlerType[Any, Any],
)

Bases: MediatorError, KeyError

Raised when a request handler is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request[Any]], handler_type: RequestHandlerType[Any, Any]) -> None:
    self.request_type = request_type
    self.handler_type = handler_type
request_type instance-attribute
request_type = request_type
handler_type instance-attribute
handler_type = handler_type

RequestHandlerNotFound

RequestHandlerNotFound(request_type: type[Request[Any]])

Bases: MediatorError, TypeError

Raised when a request handler is not found.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request[Any]]) -> None:
    self.request_type = request_type
request_type instance-attribute
request_type = request_type

EventHandlerAlreadyRegistered

EventHandlerAlreadyRegistered(
    event_type: type[EventT],
    handler_type: EventHandlerType[EventT],
)

Bases: MediatorError, KeyError

Raised when an event handler is already registered.

ATTRIBUTE DESCRIPTION
event_type

The type of event that caused the error.

handler_type

The type of handler that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, event_type: type[EventT], handler_type: EventHandlerType[EventT]) -> None:
    self.event_type = event_type
    self.handler_type = handler_type
event_type instance-attribute
event_type = event_type
handler_type instance-attribute
handler_type = handler_type

EventHandlerNotFound

EventHandlerNotFound(event_type: type[Event])

Bases: MediatorError, TypeError

Raised when an event handler is not found.

ATTRIBUTE DESCRIPTION
event_type

The type of event that caused the error.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, event_type: type[Event]) -> None:
    self.event_type = event_type
event_type instance-attribute
event_type = event_type

PipelineBehaviorAlreadyRegistered

PipelineBehaviorAlreadyRegistered(
    request_type: type[Request],
    behavior_type: type[IPipelineBehavior[Any, Any]],
)

Bases: MediatorError, KeyError

Raised when a pipeline behavior is already registered.

ATTRIBUTE DESCRIPTION
request_type

The type of request that caused the error.

behavior_type

The type of behavior that was already registered.

Source code in src/waku/cqrs/exceptions.py
def __init__(self, request_type: type[Request], behavior_type: type[IPipelineBehavior[Any, Any]]) -> None:
    self.request_type = request_type
    self.behavior_type = behavior_type
request_type instance-attribute
request_type = request_type
behavior_type instance-attribute
behavior_type = behavior_type

impl

Mediator

Mediator(
    container: AsyncContainer,
    event_publisher: EventPublisher,
)

Bases: IMediator

Default CQRS implementation.

Initialize the mediator.

PARAMETER DESCRIPTION
container

Container used to resolve handlers and behaviors

TYPE: AsyncContainer

event_publisher

Function to publish events to handlers

TYPE: EventPublisher

Source code in src/waku/cqrs/impl.py
def __init__(self, container: AsyncContainer, event_publisher: EventPublisher) -> None:
    """Initialize the mediator.

    Args:
        container: Container used to resolve handlers and behaviors
        event_publisher: Function to publish events to handlers
    """
    self._container = container
    self._event_publisher = event_publisher
send async
send(request: Request[ResponseT]) -> ResponseT

Send a request through the CQRS pipeline chain.

PARAMETER DESCRIPTION
request

The request to process

TYPE: Request[ResponseT]

RETURNS DESCRIPTION
ResponseT

Response from the handler

RAISES DESCRIPTION
RequestHandlerNotFound

If no handler is registered for the request type

Source code in src/waku/cqrs/impl.py
@override
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Send a request through the CQRS pipeline chain.

    Args:
        request: The request to process

    Returns:
        Response from the handler

    Raises:
        RequestHandlerNotFound: If no handler is registered for the request type
    """
    request_type = type(request)
    handler = await self._resolve_request_handler(request_type)
    return await self._handle_request(handler, request)
publish async
publish(event: Event) -> None

Publish an event to all registered handlers.

PARAMETER DESCRIPTION
event

The event to publish

TYPE: Event

RAISES DESCRIPTION
EventHandlerNotFound

If no handlers are registered for the event type

Source code in src/waku/cqrs/impl.py
@override
async def publish(self, event: Event) -> None:
    """Publish an event to all registered handlers.

    Args:
        event: The event to publish

    Raises:
        EventHandlerNotFound: If no handlers are registered for the event type
    """
    event_type = type(event)
    handlers = await self._resolve_event_handlers(event_type)
    await self._event_publisher(handlers, event)

interfaces

ISender

Bases: ABC

Send a request through the cqrs middleware chain to be handled by a single handler.

send abstractmethod async
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Asynchronously send a request to a single handler."""

IPublisher

Bases: ABC

Publish event through the cqrs to be handled by multiple handlers.

publish abstractmethod async
publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event) -> None:
    """Asynchronously send event to multiple handlers."""

IMediator

Bases: ISender, IPublisher, ABC

Defines a cqrs to encapsulate request/response and publishing interaction patterns.

publish abstractmethod async
publish(event: Event) -> None

Asynchronously send event to multiple handlers.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def publish(self, event: Event) -> None:
    """Asynchronously send event to multiple handlers."""
send abstractmethod async
send(request: Request[ResponseT]) -> ResponseT

Asynchronously send a request to a single handler.

Source code in src/waku/cqrs/interfaces.py
@abc.abstractmethod
async def send(self, request: Request[ResponseT]) -> ResponseT:
    """Asynchronously send a request to a single handler."""

modules

MediatorConfig dataclass

MediatorConfig(
    *,
    mediator_implementation_type: type[
        IMediator
    ] = Mediator,
    event_publisher: type[
        EventPublisher
    ] = SequentialEventPublisher,
    pipeline_behaviors: Sequence[
        type[IPipelineBehavior[Any, Any]]
    ] = (),
)

Configuration for the Mediator extension.

This class defines the configuration options for setting up the cqrs pattern implementation in the application.

ATTRIBUTE DESCRIPTION
mediator_implementation_type

The concrete implementation class for the cqrs interface (IMediator). Defaults to the standard Mediator class.

TYPE: type[IMediator]

event_publisher

The implementation class for publishing events. Defaults to SequentialEventPublisher.

TYPE: type[EventPublisher]

pipeline_behaviors

A sequence of pipeline behavior configurations that will be applied to the cqrs pipeline. Behaviors are executed in the order they are defined. Defaults to an empty sequence.

TYPE: Sequence[type[IPipelineBehavior[Any, Any]]]

Example
config = MediatorConfig(
    pipeline_behaviors=[
        LoggingBehavior,
        ValidationBehavior,
    ]
)
mediator_implementation_type class-attribute instance-attribute
mediator_implementation_type: type[IMediator] = Mediator
event_publisher class-attribute instance-attribute
pipeline_behaviors class-attribute instance-attribute
pipeline_behaviors: Sequence[
    type[IPipelineBehavior[Any, Any]]
] = ()

MediatorModule

register classmethod
register(
    config: MediatorConfig | None = None,
) -> DynamicModule

Application-level module for Mediator setup.

PARAMETER DESCRIPTION
config

Configuration for the Mediator extension.

TYPE: MediatorConfig | None DEFAULT: None

Source code in src/waku/cqrs/modules.py
@classmethod
def register(cls, config: MediatorConfig | None = None, /) -> DynamicModule:
    """Application-level module for Mediator setup.

    Args:
        config: Configuration for the Mediator extension.
    """
    config_ = config or MediatorConfig()
    return DynamicModule(
        parent_module=cls,
        providers=[
            *cls._create_mediator_providers(config_),
            *cls._create_pipeline_behavior_providers(config_),
        ],
        is_global=True,
    )

MediatorExtension

MediatorExtension()

Bases: OnModuleConfigure

Source code in src/waku/cqrs/modules.py
def __init__(self) -> None:
    self._request_map = RequestMap()
    self._event_map = EventMap()
    self._behavior_map = PipelineBehaviourMap()
bind_request
bind_request(
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
    *,
    behaviors: list[
        type[IPipelineBehavior[RequestT, ResponseT]]
    ]
    | None = None,
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_request(
    self,
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
    *,
    behaviors: list[type[IPipelineBehavior[RequestT, ResponseT]]] | None = None,
) -> Self:
    self._request_map.bind(request_type, handler_type)
    if behaviors:
        self._behavior_map.bind(request_type, behaviors)
    return self
bind_event
bind_event(
    event_type: type[EventT],
    handler_types: list[EventHandlerType[EventT]],
) -> Self
Source code in src/waku/cqrs/modules.py
def bind_event(
    self,
    event_type: type[EventT],
    handler_types: list[EventHandlerType[EventT]],
) -> Self:
    self._event_map.bind(event_type, handler_types)
    return self
on_module_configure
on_module_configure(metadata: ModuleMetadata) -> None
Source code in src/waku/cqrs/modules.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    metadata.providers.extend(
        chain(
            self._create_request_handler_providers(),
            self._create_event_handler_providers(),
            self._create_pipeline_behavior_providers(),
        ),
    )

pipeline

PipelineBehaviorWrapper

PipelineBehaviorWrapper(
    behaviors: Sequence[
        IPipelineBehavior[RequestT, ResponseT]
    ],
)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Initialize the pipeline behavior chain.

PARAMETER DESCRIPTION
behaviors

Sequence of pipeline behaviors to execute in order

TYPE: Sequence[IPipelineBehavior[RequestT, ResponseT]]

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    """Initialize the pipeline behavior chain.

    Args:
        behaviors: Sequence of pipeline behaviors to execute in order
    """
    self._behaviors = list(behaviors)  # Convert to list immediately
wrap

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    for behavior in reversed(self._behaviors):
        handle = functools.partial(behavior.handle, next_handler=handle)

    return handle

chain

PipelineBehaviorWrapper
PipelineBehaviorWrapper(
    behaviors: Sequence[
        IPipelineBehavior[RequestT, ResponseT]
    ],
)

Bases: Generic[RequestT, ResponseT]

Composes pipeline behaviors into a processing chain.

Initialize the pipeline behavior chain.

PARAMETER DESCRIPTION
behaviors

Sequence of pipeline behaviors to execute in order

TYPE: Sequence[IPipelineBehavior[RequestT, ResponseT]]

Source code in src/waku/cqrs/pipeline/chain.py
def __init__(self, behaviors: Sequence[IPipelineBehavior[RequestT, ResponseT]]) -> None:
    """Initialize the pipeline behavior chain.

    Args:
        behaviors: Sequence of pipeline behaviors to execute in order
    """
    self._behaviors = list(behaviors)  # Convert to list immediately
wrap

Create a pipeline that wraps the handler function with behaviors.

Pipeline behaviors are executed in the order they are provided.

PARAMETER DESCRIPTION
handle

The handler function to wrap with behaviors

TYPE: NextHandlerType[RequestT, ResponseT]

RETURNS DESCRIPTION
NextHandlerType[RequestT, ResponseT]

A function that executes the entire pipeline

Source code in src/waku/cqrs/pipeline/chain.py
def wrap(self, handle: NextHandlerType[RequestT, ResponseT]) -> NextHandlerType[RequestT, ResponseT]:
    """Create a pipeline that wraps the handler function with behaviors.

    Pipeline behaviors are executed in the order they are provided.

    Args:
        handle: The handler function to wrap with behaviors

    Returns:
        A function that executes the entire pipeline
    """
    for behavior in reversed(self._behaviors):
        handle = functools.partial(behavior.handle, next_handler=handle)

    return handle

map

PipelineBehaviorMapRegistry module-attribute
PipelineBehaviorMapRegistry = MutableMapping[
    type[RequestT],
    list[type[IPipelineBehavior[RequestT, ResponseT]]],
]
PipelineBehaviourMap
PipelineBehaviourMap()
Source code in src/waku/cqrs/pipeline/map.py
def __init__(self) -> None:
    self._registry: PipelineBehaviorMapRegistry[Any, Any] = defaultdict(list)
registry property
bind
bind(
    request_type: type[RequestT],
    behavior_types: list[
        type[IPipelineBehavior[RequestT, ResponseT]]
    ],
) -> Self
Source code in src/waku/cqrs/pipeline/map.py
def bind(
    self,
    request_type: type[RequestT],
    behavior_types: list[type[IPipelineBehavior[RequestT, ResponseT]]],
) -> Self:
    for behavior_type in behavior_types:
        if behavior_type in self._registry[request_type]:
            raise PipelineBehaviorAlreadyRegistered(request_type, behavior_type)
        self._registry[request_type].append(behavior_type)
    return self
merge
merge(other: PipelineBehaviourMap) -> Self
Source code in src/waku/cqrs/pipeline/map.py
def merge(self, other: PipelineBehaviourMap) -> Self:
    for event_type, handlers in other.registry.items():
        self.bind(event_type, handlers)
    return self

requests

handler

RequestHandlerType module-attribute
RequestHandlerType: TypeAlias = type[
    RequestHandler[RequestT, ResponseT]
]
RequestHandler

Bases: ABC, Generic[RequestT, ResponseT]

The request handler interface.

The request handler is an object, which gets a request as input and may return a response as a result.

Command handler example::

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: JoinMeetingCommand) -> None:
      await self._meetings_api.join_user(request.user_id, request.meeting_id)

Query handler example::

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def init(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api

  async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
      link = await self._meetings_api.get_link(request.meeting_id)
      return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
handle abstractmethod async
handle(request: RequestT) -> ResponseT
Source code in src/waku/cqrs/requests/handler.py
@abc.abstractmethod
async def handle(self, request: RequestT) -> ResponseT:
    raise NotImplementedError

map

RequestMapRegistry module-attribute
RequestMap
RequestMap()
Source code in src/waku/cqrs/requests/map.py
def __init__(self) -> None:
    self._registry: RequestMapRegistry[Any, Any] = {}
registry property
bind
bind(
    request_type: type[RequestT],
    handler_type: RequestHandlerType[RequestT, ResponseT],
) -> Self
Source code in src/waku/cqrs/requests/map.py
def bind(self, request_type: type[RequestT], handler_type: RequestHandlerType[RequestT, ResponseT]) -> Self:
    if request_type in self._registry:
        raise RequestHandlerAlreadyRegistered(request_type, handler_type)
    self._registry[request_type] = handler_type
    return self
merge
merge(other: RequestMap) -> Self
Source code in src/waku/cqrs/requests/map.py
def merge(self, other: RequestMap) -> Self:
    for request_type, handler_type in other.registry.items():
        self.bind(request_type, handler_type)
    return self

utils

get_request_response_type cached

get_request_response_type(
    request_type: type[Request[ResponseT]],
) -> type[ResponseT]
Source code in src/waku/cqrs/utils.py
@functools.cache
def get_request_response_type(request_type: type[Request[ResponseT]]) -> type[ResponseT]:
    return typing.cast(type[ResponseT], typing.get_args(get_original_bases(request_type)[0])[0])

di

provider

provider(
    source: Callable[..., Any] | type[Any],
    *,
    scope: Scope = REQUEST,
    provided_type: Any = None,
    cache: bool = True,
) -> Provider

Create a Dishka provider for a callable or type.

PARAMETER DESCRIPTION
source

Callable or type to provide as a dependency.

TYPE: Callable[..., Any] | type[Any]

scope

Scope of the dependency (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

provided_type

Explicit type to provide (default: inferred).

TYPE: Any DEFAULT: None

cache

Whether to cache the instance in the scope.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Provider

Configured provider instance.

TYPE: Provider

Source code in src/waku/di.py
def provider(
    source: Callable[..., Any] | type[Any],
    *,
    scope: Scope = Scope.REQUEST,
    provided_type: Any = None,
    cache: bool = True,
) -> Provider:
    """Create a Dishka provider for a callable or type.

    Args:
        source: Callable or type to provide as a dependency.
        scope: Scope of the dependency (default: Scope.REQUEST).
        provided_type: Explicit type to provide (default: inferred).
        cache: Whether to cache the instance in the scope.

    Returns:
        Provider: Configured provider instance.
    """
    provider_ = Provider(scope=scope)
    provider_.provide(source, provides=provided_type, cache=cache)
    return provider_

singleton

singleton(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider

Create a singleton provider (lifetime: app).

PARAMETER DESCRIPTION
source

Callable or type to provide as a singleton.

TYPE: Callable[..., Any] | type[Any]

provided_type

Explicit type to provide (default: inferred).

TYPE: Any DEFAULT: None

RETURNS DESCRIPTION
Provider

Singleton provider instance.

TYPE: Provider

Source code in src/waku/di.py
def singleton(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider:
    """Create a singleton provider (lifetime: app).

    Args:
        source: Callable or type to provide as a singleton.
        provided_type: Explicit type to provide (default: inferred).

    Returns:
        Provider: Singleton provider instance.
    """
    return provider(source, scope=Scope.APP, provided_type=provided_type)

scoped

scoped(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider

Create a scoped provider (lifetime: request).

PARAMETER DESCRIPTION
source

Callable or type to provide as a scoped dependency.

TYPE: Callable[..., Any] | type[Any]

provided_type

Explicit type to provide (default: inferred).

TYPE: Any DEFAULT: None

RETURNS DESCRIPTION
Provider

Scoped provider instance.

TYPE: Provider

Source code in src/waku/di.py
def scoped(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider:
    """Create a scoped provider (lifetime: request).

    Args:
        source: Callable or type to provide as a scoped dependency.
        provided_type: Explicit type to provide (default: inferred).

    Returns:
        Provider: Scoped provider instance.
    """
    return provider(source, scope=Scope.REQUEST, provided_type=provided_type)

transient

transient(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider

Create a transient provider (new instance per injection).

PARAMETER DESCRIPTION
source

Callable or type to provide as a transient dependency.

TYPE: Callable[..., Any] | type[Any]

provided_type

Explicit type to provide (default: inferred).

TYPE: Any DEFAULT: None

RETURNS DESCRIPTION
Provider

Transient provider instance.

TYPE: Provider

Source code in src/waku/di.py
def transient(
    source: Callable[..., Any] | type[Any],
    *,
    provided_type: Any = None,
) -> Provider:
    """Create a transient provider (new instance per injection).

    Args:
        source: Callable or type to provide as a transient dependency.
        provided_type: Explicit type to provide (default: inferred).

    Returns:
        Provider: Transient provider instance.
    """
    return provider(source, scope=Scope.REQUEST, provided_type=provided_type, cache=False)

object_

object_(
    source: Any, *, provided_type: Any = None
) -> Provider

Provide the exact object passed at creation time as a singleton dependency.

The provider always returns the same object instance, without instantiation or copying.

PARAMETER DESCRIPTION
source

The object to provide as-is.

TYPE: Any

provided_type

Explicit type to provide (default: inferred).

TYPE: Any DEFAULT: None

RETURNS DESCRIPTION
Provider

Provider that always returns the given object.

TYPE: Provider

Source code in src/waku/di.py
def object_(
    source: Any,
    *,
    provided_type: Any = None,
) -> Provider:
    """Provide the exact object passed at creation time as a singleton dependency.

    The provider always returns the same object instance, without instantiation or copying.

    Args:
        source: The object to provide as-is.
        provided_type: Explicit type to provide (default: inferred).

    Returns:
        Provider: Provider that always returns the given object.
    """
    return provider(lambda: source, scope=Scope.APP, provided_type=provided_type, cache=True)

contextual

contextual(
    provided_type: Any, *, scope: Scope = REQUEST
) -> Provider

Provide a dependency from the current context (e.g., app/request).

PARAMETER DESCRIPTION
provided_type

The type to resolve from context.

TYPE: Any

scope

Scope of the context variable (default: Scope.REQUEST).

TYPE: Scope DEFAULT: REQUEST

RETURNS DESCRIPTION
Provider

Contextual provider instance.

TYPE: Provider

Source code in src/waku/di.py
def contextual(
    provided_type: Any,
    *,
    scope: Scope = Scope.REQUEST,
) -> Provider:
    """Provide a dependency from the current context (e.g., app/request).

    Args:
        provided_type: The type to resolve from context.
        scope: Scope of the context variable (default: Scope.REQUEST).

    Returns:
        Provider: Contextual provider instance.
    """
    provider_ = Provider()
    provider_.from_context(provided_type, scope=scope)
    return provider_

exceptions

WakuError

Bases: Exception

extensions

ApplicationExtension module-attribute

ModuleExtension module-attribute

DEFAULT_EXTENSIONS module-attribute

DEFAULT_EXTENSIONS: Sequence[ApplicationExtension] = (
    ValidationExtension(
        [DependenciesAccessibleRule()], strict=True
    ),
)

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async

after_app_init(app: WakuApplication) -> None

Perform actions after application initialization.

Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None:
    """Perform actions after application initialization."""

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async

on_app_init(app: WakuApplication) -> None

Perform actions before application initialization.

Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None:
    """Perform actions before application initialization."""

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async

on_app_shutdown(app: WakuApplication) -> None

Perform actions before application shutdown.

Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None:
    """Perform actions before application shutdown."""

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure

on_module_configure(metadata: ModuleMetadata) -> None

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async

on_module_destroy(module: Module) -> None

Perform actions before application shutdown.

Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None:
    """Perform actions before application shutdown."""
    ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async

on_module_init(module: Module) -> None

Perform actions before application initialization.

Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None:
    """Perform actions before application initialization."""
    ...

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)

register_application_extension

register_application_extension(
    extension: ApplicationExtension,
) -> Self

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast(type[ApplicationExtension], base)].append(extension)
    return self

register_module_extension

register_module_extension(
    module_type: ModuleType, extension: ModuleExtension
) -> Self
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self

get_application_extensions

get_application_extensions(
    extension_type: type[_AppExtT],
) -> list[_AppExtT]
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast(list[_AppExtT], self._app_extensions.get(cast(type[ApplicationExtension], extension_type), []))

get_module_extensions

get_module_extensions(
    module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast(list[_ModExtT], self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

protocols

Extension protocols for application and module lifecycle hooks.

ApplicationExtension module-attribute

ModuleExtension module-attribute

OnApplicationInit

Bases: Protocol

Extension for application pre-initialization actions.

on_app_init async
on_app_init(app: WakuApplication) -> None

Perform actions before application initialization.

Source code in src/waku/extensions/protocols.py
async def on_app_init(self, app: WakuApplication) -> None:
    """Perform actions before application initialization."""

AfterApplicationInit

Bases: Protocol

Extension for application post-initialization actions.

after_app_init async
after_app_init(app: WakuApplication) -> None

Perform actions after application initialization.

Source code in src/waku/extensions/protocols.py
async def after_app_init(self, app: WakuApplication) -> None:
    """Perform actions after application initialization."""

OnApplicationShutdown

Bases: Protocol

Extension for application shutdown actions.

on_app_shutdown async
on_app_shutdown(app: WakuApplication) -> None

Perform actions before application shutdown.

Source code in src/waku/extensions/protocols.py
async def on_app_shutdown(self, app: WakuApplication) -> None:
    """Perform actions before application shutdown."""

OnModuleConfigure

Bases: Protocol

Extension for module configuration.

on_module_configure
on_module_configure(metadata: ModuleMetadata) -> None

Perform actions before module metadata transformed to module.

Source code in src/waku/extensions/protocols.py
def on_module_configure(self, metadata: ModuleMetadata) -> None:
    """Perform actions before module metadata transformed to module."""
    ...

OnModuleInit

Bases: Protocol

Extension for module initialization.

on_module_init async
on_module_init(module: Module) -> None

Perform actions before application initialization.

Source code in src/waku/extensions/protocols.py
async def on_module_init(self, module: Module) -> None:
    """Perform actions before application initialization."""
    ...

OnModuleDestroy

Bases: Protocol

Extension for module destroying.

on_module_destroy async
on_module_destroy(module: Module) -> None

Perform actions before application shutdown.

Source code in src/waku/extensions/protocols.py
async def on_module_destroy(self, module: Module) -> None:
    """Perform actions before application shutdown."""
    ...

registry

Extension registry for centralized management of extensions.

ExtensionRegistry

ExtensionRegistry()

Registry for extensions.

This registry maintains references to all extensions in the application, allowing for centralized management and discovery.

Source code in src/waku/extensions/registry.py
def __init__(self) -> None:
    self._app_extensions: dict[type[ApplicationExtension], list[ApplicationExtension]] = defaultdict(list)
    self._module_extensions: dict[ModuleType, list[ModuleExtension]] = defaultdict(list)
register_application_extension
register_application_extension(
    extension: ApplicationExtension,
) -> Self

Register an application extension with optional priority and tags.

Source code in src/waku/extensions/registry.py
def register_application_extension(self, extension: ApplicationExtension) -> Self:
    """Register an application extension with optional priority and tags."""
    ext_type = type(extension)
    extension_bases = [
        base
        for base in inspect.getmro(ext_type)
        if (isinstance(base, ApplicationExtension) and base != ext_type)  # type: ignore[unreachable]
    ]
    for base in extension_bases:
        self._app_extensions[cast(type[ApplicationExtension], base)].append(extension)
    return self
register_module_extension
register_module_extension(
    module_type: ModuleType, extension: ModuleExtension
) -> Self
Source code in src/waku/extensions/registry.py
def register_module_extension(self, module_type: ModuleType, extension: ModuleExtension) -> Self:
    self._module_extensions[module_type].append(extension)
    return self
get_application_extensions
get_application_extensions(
    extension_type: type[_AppExtT],
) -> list[_AppExtT]
Source code in src/waku/extensions/registry.py
def get_application_extensions(self, extension_type: type[_AppExtT]) -> list[_AppExtT]:
    return cast(list[_AppExtT], self._app_extensions.get(cast(type[ApplicationExtension], extension_type), []))
get_module_extensions
get_module_extensions(
    module_type: ModuleType, extension_type: type[_ModExtT]
) -> list[_ModExtT]
Source code in src/waku/extensions/registry.py
def get_module_extensions(self, module_type: ModuleType, extension_type: type[_ModExtT]) -> list[_ModExtT]:
    extensions = cast(list[_ModExtT], self._module_extensions.get(module_type, []))
    return [ext for ext in extensions if isinstance(ext, extension_type)]

factory

ContainerConfig dataclass

ContainerConfig(
    *,
    lock_factory: _LockFactory = Lock,
    start_scope: Scope | None = None,
    skip_validation: bool = False,
)

lock_factory class-attribute instance-attribute

lock_factory: _LockFactory = Lock

start_scope class-attribute instance-attribute

start_scope: Scope | None = None

skip_validation class-attribute instance-attribute

skip_validation: bool = False

WakuFactory

WakuFactory(
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[
        ApplicationExtension
    ] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
)
Source code in src/waku/factory.py
def __init__(
    self,
    root_module_type: ModuleType,
    /,
    context: dict[Any, Any] | None = None,
    lifespan: Sequence[LifespanFunc] = (),
    extensions: Sequence[ApplicationExtension] = DEFAULT_EXTENSIONS,
    container_config: ContainerConfig | None = None,
) -> None:
    self._root_module_type = root_module_type

    self._context = context
    self._lifespan = lifespan
    self._extensions = extensions
    self._container_config = container_config or ContainerConfig()

create

create() -> WakuApplication
Source code in src/waku/factory.py
def create(self) -> WakuApplication:
    registry = ModuleRegistryBuilder(self._root_module_type).build()
    container = self._build_container(registry.providers)
    return WakuApplication(
        container=container,
        registry=registry,
        lifespan=self._lifespan,
        extension_registry=self._build_extension_registry(registry.modules),
    )

lifespan

LifespanFunc module-attribute

LifespanFunc: TypeAlias = (
    Callable[
        ['WakuApplication'],
        AbstractAsyncContextManager[None],
    ]
    | AbstractAsyncContextManager[None]
)

LifespanWrapper

LifespanWrapper(lifespan_func: LifespanFunc)
Source code in src/waku/lifespan.py
def __init__(self, lifespan_func: LifespanFunc) -> None:
    self._lifespan_func = lifespan_func

lifespan async

lifespan(app: WakuApplication) -> AsyncIterator[None]
Source code in src/waku/lifespan.py
@asynccontextmanager
async def lifespan(self, app: WakuApplication) -> AsyncIterator[None]:
    ctx_manager = (
        self._lifespan_func
        if isinstance(self._lifespan_func, AbstractAsyncContextManager)
        else self._lifespan_func(app)
    )
    async with ctx_manager:
        yield

modules

ModuleType module-attribute

DynamicModule dataclass

DynamicModule(
    *,
    providers: list[BaseProvider] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
    parent_module: ModuleType,
)

Bases: ModuleMetadata

providers class-attribute instance-attribute

providers: list[BaseProvider] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

parent_module instance-attribute

parent_module: ModuleType

HasModuleMetadata

Bases: Protocol

ModuleCompiler

extract_metadata

extract_metadata(
    module_type: ModuleType | DynamicModule,
) -> tuple[ModuleType, ModuleMetadata]
Source code in src/waku/modules/_metadata.py
def extract_metadata(self, module_type: ModuleType | DynamicModule) -> tuple[ModuleType, ModuleMetadata]:
    try:
        return self._extract_metadata(cast(Hashable, module_type))
    except AttributeError:
        msg = f'{type(module_type).__name__} is not module'
        raise ValueError(msg) from None

ModuleMetadata dataclass

ModuleMetadata(
    *,
    providers: list[BaseProvider] = list(),
    imports: list[ModuleType | DynamicModule] = list(),
    exports: list[
        type[object] | ModuleType | DynamicModule
    ] = list(),
    extensions: list[ModuleExtension] = list(),
    is_global: bool = False,
    id: UUID = uuid4(),
)

providers class-attribute instance-attribute

providers: list[BaseProvider] = field(default_factory=list)

List of providers for dependency injection.

imports class-attribute instance-attribute

imports: list[ModuleType | DynamicModule] = field(
    default_factory=list
)

List of modules imported by this module.

exports class-attribute instance-attribute

exports: list[type[object] | ModuleType | DynamicModule] = (
    field(default_factory=list)
)

List of types or modules exported by this module.

extensions class-attribute instance-attribute

extensions: list[ModuleExtension] = field(
    default_factory=list
)

List of module extensions for lifecycle hooks.

is_global class-attribute instance-attribute

is_global: bool = False

Whether this module is global or not.

id class-attribute instance-attribute

id: UUID = field(default_factory=uuid4)

Module

Module(module_type: ModuleType, metadata: ModuleMetadata)
Source code in src/waku/modules/_module.py
def __init__(self, module_type: ModuleType, metadata: ModuleMetadata) -> None:
    self.id: Final[UUID] = metadata.id
    self.target: Final[ModuleType] = module_type

    self.providers: Final[Sequence[BaseProvider]] = metadata.providers
    self.imports: Final[Sequence[ModuleType | DynamicModule]] = metadata.imports
    self.exports: Final[Sequence[type[object] | ModuleType | DynamicModule]] = metadata.exports
    self.extensions: Final[Sequence[ModuleExtension]] = metadata.extensions
    self.is_global: Final[bool] = metadata.is_global

    self._provider: BaseProvider | None = None

id instance-attribute

id: Final[UUID] = id

target instance-attribute

target: Final[ModuleType] = module_type

providers instance-attribute

providers: Final[Sequence[BaseProvider]] = providers

imports instance-attribute

imports: Final[Sequence[ModuleType | DynamicModule]] = (
    imports
)

exports instance-attribute

exports: Final[
    Sequence[type[object] | ModuleType | DynamicModule]
] = exports

extensions instance-attribute

extensions: Final[Sequence[ModuleExtension]] = extensions

is_global instance-attribute

is_global: Final[bool] = is_global

name property

name: str

provider cached property

provider: BaseProvider

ModuleRegistry

ModuleRegistry(
    *,
    compiler: ModuleCompiler,
    root_module: Module,
    modules: dict[UUID, Module],
    providers: list[BaseProvider],
    adjacency: AdjacencyMatrix,
)

Immutable registry and graph for module queries, traversal, and lookups.

Source code in src/waku/modules/_registry.py
def __init__(
    self,
    *,
    compiler: ModuleCompiler,
    root_module: Module,
    modules: dict[UUID, Module],
    providers: list[BaseProvider],
    adjacency: AdjacencyMatrix,
) -> None:
    self._compiler = compiler
    self._root_module = root_module
    self._modules = modules
    self._providers = tuple(providers)
    self._adjacency = adjacency

root_module property

root_module: Module

modules property

modules: tuple[Module, ...]

providers property

providers: tuple[BaseProvider, ...]

compiler property

compiler: ModuleCompiler

get

get(module_type: ModuleType | DynamicModule) -> Module
Source code in src/waku/modules/_registry.py
def get(self, module_type: ModuleType | DynamicModule) -> Module:
    module_id = self.compiler.extract_metadata(module_type)[1].id
    return self.get_by_id(module_id)

get_by_id

get_by_id(module_id: UUID) -> Module
Source code in src/waku/modules/_registry.py
def get_by_id(self, module_id: UUID) -> Module:
    module = self._modules.get(module_id)
    if module is None:
        msg = f'Module with ID {module_id} is not registered in the graph.'
        raise KeyError(msg)
    return module

traverse

traverse(from_: Module | None = None) -> Iterator[Module]

Traverse the module graph in depth-first post-order (children before parent) recursively.

PARAMETER DESCRIPTION
from_

Start module (default: root)

TYPE: Module | None DEFAULT: None

YIELDS DESCRIPTION
Module

Each traversed module (post-order)

TYPE:: Module

Source code in src/waku/modules/_registry.py
def traverse(self, from_: Module | None = None) -> Iterator[Module]:
    """Traverse the module graph in depth-first post-order (children before parent) recursively.

    Args:
        from_: Start module (default: root)

    Yields:
        Module: Each traversed module (post-order)
    """
    start_module = from_ or self._root_module
    visited: set[UUID] = set()

    def _dfs(module: Module) -> Iterator[Module]:
        if module.id in visited:
            return

        visited.add(module.id)

        # Process children first (maintain original order)
        neighbor_ids = self._adjacency[module.id]
        for neighbor_id in neighbor_ids:
            if neighbor_id == module.id:
                continue
            neighbor = self.get_by_id(neighbor_id)
            yield from _dfs(neighbor)

        # Process current module after children (post-order)
        yield module

    yield from _dfs(start_module)

ModuleRegistryBuilder

ModuleRegistryBuilder(
    root_module_type: ModuleType,
    compiler: ModuleCompiler | None = None,
)
Source code in src/waku/modules/_registry_builder.py
def __init__(self, root_module_type: ModuleType, compiler: ModuleCompiler | None = None) -> None:
    self._compiler: Final = compiler or ModuleCompiler()
    self._root_module_type: Final = root_module_type
    self._modules: dict[UUID, Module] = {}
    self._providers: list[BaseProvider] = []

    self._metadata_cache: dict[ModuleType | DynamicModule, tuple[ModuleType, ModuleMetadata]] = {}

build

build() -> ModuleRegistry
Source code in src/waku/modules/_registry_builder.py
def build(self) -> ModuleRegistry:
    modules, adjacency = self._collect_modules()
    root_module = self._register_modules(modules)
    return self._build_registry(root_module, adjacency)

module

module(
    *,
    providers: Sequence[BaseProvider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[
        type[object] | ModuleType | DynamicModule
    ] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]

Decorator to define a module.

PARAMETER DESCRIPTION
providers

Sequence of providers for dependency injection.

TYPE: Sequence[BaseProvider] DEFAULT: ()

imports

Sequence of modules imported by this module.

TYPE: Sequence[ModuleType | DynamicModule] DEFAULT: ()

exports

Sequence of types or modules exported by this module.

TYPE: Sequence[type[object] | ModuleType | DynamicModule] DEFAULT: ()

extensions

Sequence of module extensions for lifecycle hooks.

TYPE: Sequence[ModuleExtension] DEFAULT: ()

is_global

Whether this module is global or not.

TYPE: bool DEFAULT: False

Source code in src/waku/modules/_metadata.py
def module(
    *,
    providers: Sequence[BaseProvider] = (),
    imports: Sequence[ModuleType | DynamicModule] = (),
    exports: Sequence[type[object] | ModuleType | DynamicModule] = (),
    extensions: Sequence[ModuleExtension] = (),
    is_global: bool = False,
) -> Callable[[type[_T]], type[_T]]:
    """Decorator to define a module.

    Args:
        providers: Sequence of providers for dependency injection.
        imports: Sequence of modules imported by this module.
        exports: Sequence of types or modules exported by this module.
        extensions: Sequence of module extensions for lifecycle hooks.
        is_global: Whether this module is global or not.
    """

    def decorator(cls: type[_T]) -> type[_T]:
        metadata = ModuleMetadata(
            providers=list(providers),
            imports=list(imports),
            exports=list(exports),
            extensions=list(extensions),
            is_global=is_global,
        )
        for extension in metadata.extensions:
            if isinstance(extension, OnModuleConfigure):
                extension.on_module_configure(metadata)

        setattr(cls, _MODULE_METADATA_KEY, metadata)
        return cls

    return decorator

testing

override

override(
    container: AsyncContainer, *providers: BaseProvider
) -> Iterator[None]

Temporarily override providers in an AsyncContainer for testing.

PARAMETER DESCRIPTION
container

The container whose providers will be overridden.

TYPE: AsyncContainer

*providers

Providers to override in the container.

TYPE: BaseProvider DEFAULT: ()

YIELDS DESCRIPTION
None

Context in which the container uses the overridden providers.

TYPE:: None

Example
from waku import WakuFactory, module
from waku.di import Scope, singleton
from waku.testing import override


class Service: ...


class ServiceOverride(Service): ...


with override(application.container, singleton(ServiceOverride, provided_type=Service)):
    service = await application.container.get(Service)
    assert isinstance(service, ServiceOverride)
Source code in src/waku/testing.py
@contextmanager
def override(container: AsyncContainer, *providers: BaseProvider) -> Iterator[None]:
    """Temporarily override providers in an AsyncContainer for testing.

    Args:
        container: The container whose providers will be overridden.
        *providers: Providers to override in the container.

    Yields:
        None: Context in which the container uses the overridden providers.

    Example:
        ```python
        from waku import WakuFactory, module
        from waku.di import Scope, singleton
        from waku.testing import override


        class Service: ...


        class ServiceOverride(Service): ...


        with override(application.container, singleton(ServiceOverride, provided_type=Service)):
            service = await application.container.get(Service)
            assert isinstance(service, ServiceOverride)
        ```
    """
    for provider in providers:
        for factory in chain(provider.factories, provider.aliases):
            cast(_Overrideable, factory).override = True

    new_container = make_async_container(
        _container_provider(container),
        *providers,
        context=container._context,  # noqa: SLF001
        start_scope=container.scope,
        validation_settings=STRICT_VALIDATION,
    )

    _swap(container, new_container)
    yield
    _swap(new_container, container)

validation

ValidationRule

Bases: Protocol

validate

validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/_abc.py
def validate(self, context: ValidationContext) -> list[ValidationError]: ...

ValidationError

Bases: WakuError

ValidationExtension

ValidationExtension(
    rules: Sequence[ValidationRule], *, strict: bool = True
)

Bases: AfterApplicationInit

Source code in src/waku/validation/_extension.py
def __init__(self, rules: Sequence[ValidationRule], *, strict: bool = True) -> None:
    self.rules = rules
    self.strict: Final = strict

rules instance-attribute

rules = rules

strict instance-attribute

strict: Final = strict

after_app_init async

after_app_init(app: WakuApplication) -> None
Source code in src/waku/validation/_extension.py
async def after_app_init(self, app: WakuApplication) -> None:
    context = ValidationContext(app=app)

    errors_chain = chain.from_iterable(rule.validate(context) for rule in self.rules)
    if errors := list(errors_chain):
        self._raise(errors)

rules

DependenciesAccessibleRule

DependenciesAccessibleRule(cache_size: int = 1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()  # Clear cache before validation

    registry = context.app.registry
    modules = list(registry.modules)

    checker = DependencyAccessChecker(modules, context, self._types_extractor)
    errors: list[ValidationError] = []

    for module in modules:
        module_provider = module.provider
        for factory in module_provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors

DependencyInaccessibleError

DependencyInaccessibleError(
    required_type: type[object],
    required_by: object,
    from_module: Module,
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module

dependency_accessible

DependencyInaccessibleError
DependencyInaccessibleError(
    required_type: type[object],
    required_by: object,
    from_module: Module,
)

Bases: ValidationError

Error indicating a dependency is not accessible to a provider/module.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    required_type: type[object],
    required_by: object,
    from_module: Module,
) -> None:
    self.required_type = required_type
    self.required_by = required_by
    self.from_module = from_module
    super().__init__(str(self))
required_type instance-attribute
required_type = required_type
required_by instance-attribute
required_by = required_by
from_module instance-attribute
from_module = from_module
DependencyAccessChecker
DependencyAccessChecker(
    modules: list[Module],
    context: ValidationContext,
    types_extractor: ModuleTypesExtractor,
)

Handles dependency accessibility checks between modules.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(
    self,
    modules: list[Module],
    context: ValidationContext,
    types_extractor: ModuleTypesExtractor,
) -> None:
    self._modules = modules
    self._context = context
    self._registry = context.app.registry
    self._type_provider = types_extractor
find_inaccessible_dependencies
find_inaccessible_dependencies(
    dependencies: Sequence[Any], module: Module
) -> Iterable[type[object]]

Find dependencies that are not accessible to a module.

Source code in src/waku/validation/rules/dependency_accessible.py
def find_inaccessible_dependencies(self, dependencies: Sequence[Any], module: Module) -> Iterable[type[object]]:
    """Find dependencies that are not accessible to a module."""
    return (
        dependency.type_hint for dependency in dependencies if not self._is_accessible(dependency.type_hint, module)
    )
DependenciesAccessibleRule
DependenciesAccessibleRule(cache_size: int = 1000)

Bases: ValidationRule

Validates that all dependencies required by providers are accessible.

Source code in src/waku/validation/rules/dependency_accessible.py
def __init__(self, cache_size: int = 1000) -> None:
    self._cache = LRUCache[set[type[object]]](cache_size)
    self._types_extractor = ModuleTypesExtractor(self._cache)
validate
validate(
    context: ValidationContext,
) -> list[ValidationError]
Source code in src/waku/validation/rules/dependency_accessible.py
@override
def validate(self, context: ValidationContext) -> list[ValidationError]:
    self._cache.clear()  # Clear cache before validation

    registry = context.app.registry
    modules = list(registry.modules)

    checker = DependencyAccessChecker(modules, context, self._types_extractor)
    errors: list[ValidationError] = []

    for module in modules:
        module_provider = module.provider
        for factory in module_provider.factories:
            inaccessible_deps = checker.find_inaccessible_dependencies(
                dependencies=factory.dependencies,
                module=module,
            )
            errors.extend(
                DependencyInaccessibleError(
                    required_type=dep_type,
                    required_by=factory.source,
                    from_module=module,
                )
                for dep_type in inaccessible_deps
            )

    return errors