Asyncio sink backend

Overview

AsyncioLogSink is the recommended base for custom sinks that need slow or asynchronous backend delivery.

It preserves the normal sink boundary:

sink.log(event)

but moves actual delivery into an internal asyncio runtime.

Use it when a custom sink needs to keep log(event) fast and thread-safe while backend work happens elsewhere.

Typical backends:

network collectors
message queues
database writers
Redis streams
HTTP endpoints
custom async file or storage layers

What the base provides

AsyncioLogSink provides the reusable runtime shell:

thread-safe log(event)
lazy startup
pending-event accounting
overflow policy
async dispatcher
start/stop lifecycle
wait handles
package-managed create()
idempotent terminator

A subclass provides backend-specific behavior:

build_descriptor(...)
_dispatch_core(event)
optional _on_starting()
optional _on_stopped()

This split keeps custom backend sinks small. The subclass should focus on backend identity, connection lifecycle, and event delivery.

Minimal subclass shape

A package-managed async sink usually has this shape:

from typing import Any

from mvx.common.logger import AsyncioLogSink, LogEvent, LogSinkDescriptor


class BackendSink(AsyncioLogSink):
    @classmethod
    def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
        ...

    async def _on_starting(self) -> None:
        ...

    async def _dispatch_core(self, event: LogEvent) -> None:
        ...

    async def _on_stopped(self) -> None:
        ...

build_descriptor() and _dispatch_core() are the important required pieces for a concrete package-managed backend sink.

_on_starting() and _on_stopped() are optional hooks for backend resources.

Descriptor responsibility

AsyncioLogSink provides create(), but it cannot provide a real descriptor for every backend.

The subclass must implement build_descriptor().

The descriptor should describe:

which backend resource this sink targets
which configuration values make this sink compatible or conflicting

Example:

from typing import Any

from mvx.common.logger import LogSinkDescriptor


@classmethod
def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
    endpoint = kwargs["endpoint"]
    stream_name = kwargs["stream_name"]

    return LogSinkDescriptor(
        sink_type="backend",
        resource_key=("backend", endpoint, stream_name),
        config_key=(),
    )

A stable descriptor makes package-level registration safe:

same name + same descriptor
    return existing sink

same name + different descriptor
    raise conflict

Constructor responsibility

The constructor stores backend configuration and passes runtime options to AsyncioLogSink.

from mvx.common.logger import AsyncioLogSink


class BackendSink(AsyncioLogSink):
    def __init__(
        self,
        *,
        endpoint: str,
        stream_name: str,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self._endpoint = endpoint
        self._stream_name = stream_name
        self._client = None

Do not perform slow backend connection work inside log(event).

If the backend needs startup work, use _on_starting().

Startup hook

_on_starting() runs before the dispatcher starts.

Use it to prepare backend resources:

open connection
create async client
authenticate
prepare stream or topic
initialize backend handles

Example:

async def _on_starting(self) -> None:
    self._client = await open_backend_client(self._endpoint)

If _on_starting() raises, startup fails and the sink enters a failure state.

The error is reported through the start wait handle or through package-managed creation.

Dispatch hook

_dispatch_core(event) is called by the dispatcher for each accepted event.

This is where backend delivery happens.

async def _dispatch_core(self, event: LogEvent) -> None:
    assert self._client is not None
    await self._client.write(
        stream=self._stream_name,
        payload={
            "namespace": event.meta.event_namespace,
            "name": event.meta.event_name,
            "outcome": event.event_outcome,
            "level": event.level,
            "timestamp": event.timestamp,
            "payload": dict(event.payload),
        },
    )

The dispatcher awaits this method.

If it raises, the dispatcher fails and the sink moves into an error state.

The base class does not retry or reconnect automatically.

Stop hook

_on_stopped() runs during normal shutdown after the dispatcher has stopped.

Use it to release backend resources:

async def _on_stopped(self) -> None:
    if self._client is not None:
        await self._client.close()
        self._client = None

If _on_stopped() raises during normal stop, the stop operation reports a hook failure and the sink enters a failure state.

Toy backend example

This example uses a small async in-memory backend. It shows the structure without depending on an external service.

from typing import Any

from mvx.common.logger import AsyncioLogSink, LogEvent, LogSinkDescriptor


class MemoryBackend:
    def __init__(self) -> None:
        self.records: list[dict[str, Any]] = []
        self.closed = False

    async def write(self, record: dict[str, Any]) -> None:
        if self.closed:
            raise RuntimeError("backend is closed")
        self.records.append(record)

    async def close(self) -> None:
        self.closed = True


class MemoryBackendSink(AsyncioLogSink):
    def __init__(
        self,
        *,
        backend: MemoryBackend,
        stream_name: str,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self._backend = backend
        self._stream_name = stream_name

    @classmethod
    def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
        stream_name = kwargs["stream_name"]
        return LogSinkDescriptor(
            sink_type="memory_backend",
            resource_key=("memory_backend", stream_name),
            config_key=(),
        )

    async def _dispatch_core(self, event: LogEvent) -> None:
        await self._backend.write(
            {
                "stream": self._stream_name,
                "event_name": event.meta.event_name,
                "event_outcome": event.event_outcome,
                "payload": dict(event.payload),
            }
        )

    async def _on_stopped(self) -> None:
        await self._backend.close()

This sink inherits:

log(event)
start()
stop()
create(...)
thread-safe event acceptance
queueing
dispatcher lifecycle
terminator creation

It implements only backend identity and backend delivery.

Package-level registration

A subclass can be registered through the package-level facade.

from mvx.common.logger import configure_log_sink


backend = MemoryBackend()

sink = configure_log_sink(
    name="memory_backend",
    sink_cls=MemoryBackendSink,
    backend=backend,
    stream_name="events",
)

Package-level registration calls build_descriptor() before creating the sink.

If registration creates a new sink, the inherited create() method builds the runtime and starts the sink.

The package-level registry stores the returned terminator and calls it when the sink is closed or the logger is reset.

Direct construction

Direct construction is also possible, but it requires a running event loop.

sink = MemoryBackendSink(
    backend=backend,
    stream_name="events",
)

await sink.start()
sink.log(event)
await sink.stop()

This form is useful when the caller owns the event loop and does not want a dedicated runtime thread.

Package-level registration usually uses create() instead.

What not to override

Most subclasses should not override log(event).

The base implementation owns:

state validation
lazy startup
pending counter
overflow behavior
thread-safe queue handoff

Most subclasses should not override create().

The base implementation owns:

event loop thread creation
sink bootstrap
startup wait
terminator creation
runtime shutdown

Override these methods only if the sink needs a fundamentally different runtime model.

What the base does not do

AsyncioLogSink does not implement backend policy.

It does not provide:

retry policy
reconnect strategy
batching
dead-letter queue
durability guarantee
exactly-once delivery
schema migration

If the backend needs those behaviors, the concrete sink must implement them explicitly.

A simple example sink should not pretend to provide production-grade delivery semantics.

Testing checklist

Test the subclass at the backend boundary.

At minimum, check:

build_descriptor() is stable for equivalent config
build_descriptor() changes for conflicting config
log(event) accepts events quickly
_dispatch_core(event) writes the expected backend record
stop() closes backend resources
terminator is idempotent
backend delivery failure moves the sink into failure behavior

For package-level registration, also test:

configure_log_sink returns existing sink for same descriptor
configure_log_sink raises conflict for different descriptor
close_log_sink calls the terminator

Design summary

Use AsyncioLogSink when a sink needs asynchronous backend delivery but must preserve a fast, thread-safe log(event) boundary.

The base class owns runtime mechanics.

The subclass owns:

backend identity through build_descriptor()
backend delivery through _dispatch_core(event)
optional startup and shutdown resource handling

This keeps custom async sinks focused on backend behavior instead of reimplementing lifecycle, queueing, thread handoff, and package-managed runtime creation.