AsyncioLogSink

This page documents AsyncioLogSink as a reusable base for custom sinks that need asynchronous delivery behind the synchronous sink interface.

For the internal runtime design, lifecycle details, queueing model, dispatcher behavior, and shutdown mechanics, see the architecture article. This page focuses on the public API and subclass extension points.

When to use it

Use AsyncioLogSink when a custom sink must keep log(event) fast and thread-safe, but actual backend delivery may require asynchronous work.

Typical use cases:

network log collectors
remote services
databases
message queues
Redis streams
file or backend sinks with buffered delivery

A subclass normally implements:

build_descriptor(...)
_dispatch_core(event)

and may override:

_on_starting()
_on_stopped()

AsyncioLogSink provides the common runtime and package-managed create() implementation. The subclass remains responsible for describing its own sink identity through build_descriptor() and for delivering one event through _dispatch_core(event).

Relationship to sink contracts

AsyncioLogSink is a base for package-managed custom sinks.

It already provides:

log(event)
create(...)

But it cannot provide a meaningful descriptor for every possible backend.

Therefore, subclasses that participate in package-level sink registration must implement:

build_descriptor(...)

The descriptor should describe the backend resource and relevant configuration for that concrete sink.

For example:

Redis sink
    resource key: redis URL, stream name
    config key: relevant delivery options

HTTP collector sink
    resource key: collector endpoint
    config key: headers, timeout policy, formatting options

The async base owns the runtime. The concrete subclass owns the sink identity and backend delivery.

Lifecycle states

AsyncioLogSinkState describes the runtime state of an async sink instance.

enum mvx.common.logger.AsyncioLogSinkState(value)

Lifecycle state of an AsyncioLogSink instance.

The state describes whether the sink has not been started yet, is starting, is running, is stopping, has stopped normally, or has reached a terminal error state.

Member Type:

str

Valid values are as follows:

VIRGIN = <AsyncioLogSinkState.VIRGIN: 'VIRGIN'>

The sink has been created but has not started yet.

STARTING = <AsyncioLogSinkState.STARTING: 'STARTING'>

Startup is in progress.

RUNNING = <AsyncioLogSinkState.RUNNING: 'RUNNING'>

The sink is running and can accept events.

STOPPING = <AsyncioLogSinkState.STOPPING: 'STOPPING'>

Shutdown is in progress.

STOPPED = <AsyncioLogSinkState.STOPPED: 'STOPPED'>

The sink has stopped normally.

FAILURE = <AsyncioLogSinkState.FAILURE: 'FAILURE'>

The sink has entered a terminal failure state.

CANCELLED = <AsyncioLogSinkState.CANCELLED: 'CANCELLED'>

The dispatching task was cancelled unexpectedly.

Queue overflow policy

AsyncioLogSinkQueueOverflowPolicy controls what happens when the accepted-event limit is reached.

enum mvx.common.logger.AsyncioLogSinkQueueOverflowPolicy(value)

Queue overflow behavior for AsyncioLogSink.

Member Type:

str

Valid values are as follows:

DROP = <AsyncioLogSinkQueueOverflowPolicy.DROP: 'DROP'>

Drop an event when the pending-event limit is reached.

RAISE_ERROR = <AsyncioLogSinkQueueOverflowPolicy.RAISE_ERROR: 'RAISE_ERROR'>

Raise AsyncioLogSinkQueueOverflowError when the pending-event limit is reached.

Lifecycle operations

AsyncioLogSinkOp identifies lifecycle operations reported by wait handles.

enum mvx.common.logger.AsyncioLogSinkOp(value)

Lifecycle operation names reported by AsyncioLogSinkOpResult.

Member Type:

str

Valid values are as follows:

START = <AsyncioLogSinkOp.START: 'START'>

Start operation.

STOP = <AsyncioLogSinkOp.STOP: 'STOP'>

Stop operation.

AsyncioLogSinkOpResult is returned by AsyncioLogSinkWaitHandle.wait() and by awaiting a wait handle.

class mvx.common.logger.AsyncioLogSinkOpResult(op_name, success, error=None)

Result of an AsyncioLogSink lifecycle operation.

Parameters:
  • op_name (AsyncioLogSinkOp) – lifecycle operation name.

  • success (bool) – whether the operation completed successfully.

  • error (AsyncioLogSinkError | None) – operation error, or None if the operation succeeded.

Wait handle

AsyncioLogSinkWaitHandle is returned by start() and stop().

It can be used from synchronous code through wait() or awaited from async code.

class mvx.common.logger.AsyncioLogSinkWaitHandle(operation)

Wait handle returned by AsyncioLogSink.start() and AsyncioLogSink.stop().

The handle can be used synchronously through wait() or awaited from async code. Both forms return AsyncioLogSinkOpResult.

Create a wait handle for a lifecycle operation.

Parameters:

operation (AsyncioLogSinkOp) – lifecycle operation represented by this handle.

wait()

Wait synchronously for the lifecycle operation to finish.

Return type:

AsyncioLogSinkOpResult

Returns:

lifecycle operation result.

Base class

AsyncioLogSink is the base class for asynchronous sink implementations.

It implements the synchronous log(event) sink boundary while moving delivery to an internal asyncio runtime.

class mvx.common.logger.AsyncioLogSink(*, namespace=None, queue_max_size=None, queue_overflow_policy=AsyncioLogSinkQueueOverflowPolicy.RAISE_ERROR)

Base class for sinks with asynchronous delivery.

AsyncioLogSink preserves the synchronous LogSinkProto.log(event) boundary while moving actual delivery into an asyncio dispatcher.

The class provides lifecycle management, thread-safe event acceptance, lazy startup, pending-event accounting, overflow handling, flushing, dispatcher failure handling, and package-managed threaded runtime creation.

Subclasses must implement _dispatch_core(). They may override _on_starting() and _on_stopped() to manage backend resources.

Create an async sink bound to the current running event loop.

Direct construction requires a running event loop in the current thread. Package-managed creation through create() builds a dedicated event loop thread and constructs the sink inside it.

Parameters:
  • namespace (str | None) – optional namespace used for internal runtime task names.

  • queue_max_size (int | None) – optional maximum number of pending accepted events. If omitted, DEFAULT_QUEUE_MAX_SIZE is used.

  • queue_overflow_policy (AsyncioLogSinkQueueOverflowPolicy) – behavior used when the pending-event limit is reached.

Raises:

AsyncioLogSinkEventLoopUnavailableError – if no running event loop is available in the current thread.

get_status()

Return the current lifecycle state.

Return type:

AsyncioLogSinkState

Returns:

current sink state.

start()

Start the async sink runtime.

The method schedules startup on the sink event loop and returns immediately with a wait handle. If startup is already in progress, the returned handle is attached to the same startup operation.

Return type:

AsyncioLogSinkWaitHandle

Returns:

wait handle for the start operation.

stop()

Stop the async sink runtime.

The method schedules shutdown on the sink event loop and returns immediately with a wait handle. Stop is valid only when the sink is running.

Shutdown flushes accepted events on a best-effort basis, stops the dispatcher, and runs the stop hook.

Return type:

AsyncioLogSinkWaitHandle

Returns:

wait handle for the stop operation.

async _on_starting()

Run backend-specific startup logic.

This hook is called before the dispatcher is started. Subclasses may override it to open connections, create clients, or prepare backend resources.

Return type:

None

Returns:

None.

async _on_stopped()

Run backend-specific shutdown logic.

This hook is called after the dispatcher has stopped during normal shutdown. Subclasses may override it to close connections, clients, handlers, or other backend resources.

Return type:

None

Returns:

None.

abstractmethod async _dispatch_core(event)

Deliver one event to the backend.

Subclasses must implement this method. It is called by the dispatcher task for each accepted event.

Parameters:

event (LogEvent) – prepared event to deliver.

Return type:

None

Returns:

None.

log(event)

Accept a log event for asynchronous delivery.

This method is thread-safe and designed for the logging hot path. It performs state checks, pending-event accounting, optional lazy startup, and thread-safe queue handoff. It does not wait for actual backend delivery.

Accepted events are delivered later by the dispatcher task.

Parameters:

event (LogEvent) – prepared event to accept.

Return type:

None

Returns:

None.

Raises:
classmethod build_descriptor(**kwargs)

Build a descriptor for this async sink configuration.

Subclasses that participate in package-level sink registration must override this method.

Parameters:

kwargs (Any) – sink-specific configuration arguments.

Return type:

LogSinkDescriptor

Returns:

sink descriptor.

Raises:

NotImplementedError – always raised by the base class.

classmethod create(**kwargs)

Create a package-managed async sink with a dedicated event loop thread.

The method starts a new thread, creates an event loop inside it, constructs the sink on that loop, starts the sink, and returns the sink with an idempotent terminator.

The returned terminator stops the sink runtime, schedules event loop shutdown, joins the runtime thread, and raises any termination error.

Parameters:

kwargs (Any) – arguments passed to the sink constructor.

Return type:

tuple[LogSinkProto, Callable[[], None]]

Returns:

pair containing the created sink and its terminator.

Raises:

RuntimeError – if runtime creation, sink bootstrap, sink startup, or runtime shutdown setup fails.

Required subclass methods

A concrete package-managed async sink must implement two methods.

build_descriptor(...)
    describe the sink resource and configuration for registry identity

_dispatch_core(event)
    deliver one accepted event to the backend

build_descriptor() is a class-level method used before sink creation by the package-level registry.

The base implementation raises NotImplementedError. This is intentional: AsyncioLogSink cannot know which backend resource or configuration values define descriptor identity for a concrete sink.

_dispatch_core(event) is the per-event delivery hook called by the dispatcher task.

The base class cannot implement it because backend delivery is specific to the concrete sink.

Optional subclass hooks

Subclasses may override these lifecycle hooks:

_on_starting()
    open backend resources before the dispatcher starts

_on_stopped()
    close backend resources after the dispatcher stops

Use _on_starting() for backend setup such as opening connections, creating clients, authenticating, or preparing handles.

Use _on_stopped() for backend cleanup such as closing clients, handlers, files, or network connections.

If no backend setup or cleanup is needed, the inherited no-op hooks are sufficient.

Method ownership

A typical subclass should not override log(event).

The base implementation owns:

state checks
lazy startup
pending-event accounting
overflow handling
thread-safe queue handoff

A typical subclass should not override create(...) either.

The base create() implementation owns:

dedicated event loop thread creation
sink bootstrap inside that loop
startup wait
terminator creation
runtime shutdown

A concrete subclass normally supplies only:

build_descriptor(...)
_dispatch_core(event)
optional startup/shutdown hooks

Package-managed creation

AsyncioLogSink.create() creates a package-managed async sink runtime.

It returns:

sink
terminator

The sink is used through the normal LogSinkProto boundary.

The terminator stops the sink runtime and shuts down the dedicated event loop thread.

create() is common runtime machinery. build_descriptor() is still required on the subclass so that configure_log_sink() can detect idempotent registrations and configuration conflicts before creation.

Direct construction

Direct construction binds the sink to the currently running event loop.

sink = MyAsyncSink(...)

This form requires a running event loop in the current thread.

Direct construction is useful when the caller owns the event loop and wants the sink runtime to live there.

Package-level registration usually uses create() instead, because create() builds a dedicated event loop thread and returns an idempotent terminator.

Custom subclass shape

A concrete async sink usually has this shape:

from typing import Any

from mvx.common.logger import AsyncioLogSink, LogEvent, LogSinkDescriptor


class CustomAsyncSink(AsyncioLogSink):
    @classmethod
    def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
        return LogSinkDescriptor(
            sink_type="custom",
            resource_key=("custom", kwargs["target"]),
            config_key=(),
        )

    async def _on_starting(self) -> None:
        ...

    async def _dispatch_core(self, event: LogEvent) -> None:
        ...

    async def _on_stopped(self) -> None:
        ...

Only _dispatch_core() and build_descriptor() are conceptually required for a concrete package-managed async sink.

The lifecycle hooks are optional and depend on backend needs.

Error hierarchy

AsyncioLogSink has its own error family.

class mvx.common.logger.AsyncioLogSinkError(*, message, details=None, cause=None, reason=None)

Base class for AsyncioLogSink errors.

All errors raised by the async sink runtime inherit from this class and carry a reason code from AsyncioLogSinkErrorReason.

Parameters:
  • message (str)

  • details (Optional[Mapping[str, Any]])

  • cause (Optional[Exception])

  • reason (Optional[str])

enum mvx.common.logger.AsyncioLogSinkErrorReason(value)

Reason codes used by AsyncioLogSinkError subclasses.

Member Type:

str

Valid values are as follows:

EVENT_LOOP_UNAVAILABLE = <AsyncioLogSinkErrorReason.EVENT_LOOP_UNAVAILABLE: 'EVENT_LOOP_UNAVAILABLE'>

A sink was created without an available running event loop.

INVALID_LOG_SINK_STATE = <AsyncioLogSinkErrorReason.INVALID_LOG_SINK_STATE: 'INVALID_LOG_SINK_STATE'>

An operation was requested while the sink was in an invalid state.

ON_STARTING_HOOK_FAILED = <AsyncioLogSinkErrorReason.ON_STARTING_HOOK_FAILED: 'ON_STARTING_HOOK_FAILED'>

The _on_starting() hook failed.

ON_STOPPED_HOOK_FAILED = <AsyncioLogSinkErrorReason.ON_STOPPED_HOOK_FAILED: 'ON_STOPPED_HOOK_FAILED'>

The _on_stopped() hook failed.

QUEUE_OVERFLOW = <AsyncioLogSinkErrorReason.QUEUE_OVERFLOW: 'QUEUE_OVERFLOW'>

The sink queue limit was reached.

DISPATCHER_UNEXPECTEDLY_CANCELLED = <AsyncioLogSinkErrorReason.DISPATCHER_UNEXPECTEDLY_CANCELLED: 'DISPATCHER_UNEXPECTEDLY_CANCELLED'>

The dispatcher task was cancelled unexpectedly.

UNEXPECTED_ERROR = <AsyncioLogSinkErrorReason.UNEXPECTED_ERROR: 'UNEXPECTED_ERROR'>

An unexpected error occurred inside the sink runtime.

class mvx.common.logger.AsyncioLogSinkEventLoopUnavailableError

Raised when direct sink construction cannot find a running event loop.

This applies to direct AsyncioLogSink construction. Package-managed creation through create() builds a dedicated event loop runtime.

class mvx.common.logger.AsyncioLogSinkInvalidStateError(sink_state, expected_states, cause=None)

Raised when a sink operation is not valid for the current lifecycle state.

Create an invalid-state error.

Parameters:
  • sink_state (AsyncioLogSinkState) – actual sink state.

  • expected_states (tuple[AsyncioLogSinkState, ...]) – states allowed for the requested operation.

  • cause (Exception | None) – optional underlying cause.

class mvx.common.logger.AsyncioLogSinkOnStartingHookFailedError(cause)

Raised when the _on_starting() lifecycle hook fails.

Create a startup-hook failure error.

Parameters:

cause (Exception) – exception raised by the startup hook.

class mvx.common.logger.AsyncioLogSinkOnStoppedHookFailedError(cause)

Raised when the _on_stopped() lifecycle hook fails.

Create a stop-hook failure error.

Parameters:

cause (Exception) – exception raised by the stop hook.

class mvx.common.logger.AsyncioLogSinkQueueOverflowError

Raised when the accepted-event limit is reached.

This error is raised when queue overflow policy is RAISE_ERROR.

class mvx.common.logger.AsyncioLogSinkDispatcherCancelledError

Raised when the dispatcher task is cancelled unexpectedly.

Normal shutdown cancels the dispatcher as part of stopping. This error represents cancellation outside the normal stopping path.

class mvx.common.logger.AsyncioLogSinkUnexpectedError(cause)

Raised when an unexpected runtime error is wrapped by the async sink.

Create an unexpected-error wrapper.

Parameters:

cause (Exception) – original unexpected exception.