Skip to content

Mediator (CQRS)

Based on the CQRS pattern, the mediator is used to decouple the command and query logic from the domain model.

Implementation heavily inspired by C# MediatR library.

For full documentation, visit the Mediator (CQRS) section.

Code

import asyncio
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from uuid import UUID, uuid4

from waku import Application, ApplicationFactory
from waku.di import Injected, inject
from waku.di.contrib.aioinject import AioinjectDependencyProvider
from waku.mediator import (
    IMediator,
    MediatorConfig,
    MediatorModule,
    Request,
    RequestHandler,
    Response,
)
from waku.mediator.contracts.event import Event
from waku.mediator.contracts.request import RequestT, ResponseT
from waku.mediator.events.handler import EventHandler
from waku.mediator.middlewares import HandleType, Middleware
from waku.mediator.modules import MediatorExtension
from waku.modules import module

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class CreateMeetingResult(Response):
    meeting_id: UUID


@dataclass(frozen=True, kw_only=True)
class CreateMeetingCommand(Request[CreateMeetingResult]):
    user_id: UUID


@dataclass(frozen=True, kw_only=True)
class MeetingCreatedEvent(Event):
    user_id: UUID
    meeting_id: UUID


class CreatingMeetingCommandHandler(RequestHandler[CreateMeetingCommand, CreateMeetingResult]):
    def __init__(self, mediator: IMediator) -> None:
        self._mediator = mediator

    async def handle(self, request: CreateMeetingCommand) -> CreateMeetingResult:
        meeting_id = uuid4()
        logger.info('new meeting created user_id=%s', request.user_id)
        await self._mediator.publish(MeetingCreatedEvent(user_id=request.user_id, meeting_id=meeting_id))
        return CreateMeetingResult(meeting_id=meeting_id)


class MeetingCreatedEventHandler(EventHandler[MeetingCreatedEvent]):
    async def handle(self, event: MeetingCreatedEvent) -> None:
        logger.info('meeting created event handled user_id=%s', event.user_id)


class LogMiddleware(Middleware[RequestT, ResponseT]):
    async def __call__(self, request: RequestT, handle: HandleType[RequestT, ResponseT]) -> ResponseT:
        logger.info('request=%s', request)
        response = await handle(request)
        logger.info('response=%s', response)
        return response


@asynccontextmanager
async def lifespan(_: Application) -> AsyncIterator[None]:
    logger.info('Lifespan startup')
    yield
    logger.info('Lifespan shutdown')


@module(
    extensions=[
        (
            MediatorExtension()
            .bind_request(CreateMeetingCommand, CreatingMeetingCommandHandler)
            .bind_event(MeetingCreatedEvent, [MeetingCreatedEventHandler])
        ),
    ],
)
class SomeModule:
    pass


@module(
    imports=[
        SomeModule,
        MediatorModule.register(MediatorConfig(middlewares=[LogMiddleware])),
    ],
)
class AppModule:
    pass


@inject
async def handler(mediator: Injected[IMediator]) -> None:
    command = CreateMeetingCommand(user_id=uuid4())
    await mediator.send(command)


async def main() -> None:
    dp = AioinjectDependencyProvider()
    app = ApplicationFactory.create(
        AppModule,
        dependency_provider=dp,
        lifespan=[lifespan],
    )

    async with app, app.container.context():
        await handler()  # type: ignore[call-arg]


if __name__ == '__main__':
    asyncio.run(main())