AsyncioLogSink
This page documents AsyncioLogSink as a reusable base for custom sinks that need asynchronous delivery behind the synchronous sink interface.
For the internal runtime design, lifecycle details, queueing model, dispatcher behavior, and shutdown mechanics, see the architecture article. This page focuses on the public API and subclass extension points.
When to use it
Use AsyncioLogSink when a custom sink must keep log(event) fast and thread-safe, but actual backend delivery may require asynchronous work.
Typical use cases:
network log collectors
remote services
databases
message queues
Redis streams
file or backend sinks with buffered delivery
A subclass normally implements:
build_descriptor(...)
_dispatch_core(event)
and may override:
_on_starting()
_on_stopped()
AsyncioLogSink provides the common runtime and package-managed create() implementation. The subclass remains responsible for describing its own sink identity through build_descriptor() and for delivering one event through _dispatch_core(event).
Relationship to sink contracts
AsyncioLogSink is a base for package-managed custom sinks.
It already provides:
log(event)
create(...)
But it cannot provide a meaningful descriptor for every possible backend.
Therefore, subclasses that participate in package-level sink registration must implement:
build_descriptor(...)
The descriptor should describe the backend resource and relevant configuration for that concrete sink.
For example:
Redis sink
resource key: redis URL, stream name
config key: relevant delivery options
HTTP collector sink
resource key: collector endpoint
config key: headers, timeout policy, formatting options
The async base owns the runtime. The concrete subclass owns the sink identity and backend delivery.
Lifecycle states
AsyncioLogSinkState describes the runtime state of an async sink instance.
- enum mvx.common.logger.AsyncioLogSinkState(value)
Lifecycle state of an AsyncioLogSink instance.
The state describes whether the sink has not been started yet, is starting, is running, is stopping, has stopped normally, or has reached a terminal error state.
- Member Type:
str
Valid values are as follows:
- VIRGIN = <AsyncioLogSinkState.VIRGIN: 'VIRGIN'>
The sink has been created but has not started yet.
- STARTING = <AsyncioLogSinkState.STARTING: 'STARTING'>
Startup is in progress.
- RUNNING = <AsyncioLogSinkState.RUNNING: 'RUNNING'>
The sink is running and can accept events.
- STOPPING = <AsyncioLogSinkState.STOPPING: 'STOPPING'>
Shutdown is in progress.
- STOPPED = <AsyncioLogSinkState.STOPPED: 'STOPPED'>
The sink has stopped normally.
- FAILURE = <AsyncioLogSinkState.FAILURE: 'FAILURE'>
The sink has entered a terminal failure state.
- CANCELLED = <AsyncioLogSinkState.CANCELLED: 'CANCELLED'>
The dispatching task was cancelled unexpectedly.
Queue overflow policy
AsyncioLogSinkQueueOverflowPolicy controls what happens when the accepted-event limit is reached.
- enum mvx.common.logger.AsyncioLogSinkQueueOverflowPolicy(value)
Queue overflow behavior for AsyncioLogSink.
- Member Type:
str
Valid values are as follows:
- DROP = <AsyncioLogSinkQueueOverflowPolicy.DROP: 'DROP'>
Drop an event when the pending-event limit is reached.
- RAISE_ERROR = <AsyncioLogSinkQueueOverflowPolicy.RAISE_ERROR: 'RAISE_ERROR'>
Raise AsyncioLogSinkQueueOverflowError when the pending-event limit is reached.
Lifecycle operations
AsyncioLogSinkOp identifies lifecycle operations reported by wait handles.
- enum mvx.common.logger.AsyncioLogSinkOp(value)
Lifecycle operation names reported by AsyncioLogSinkOpResult.
- Member Type:
str
Valid values are as follows:
- START = <AsyncioLogSinkOp.START: 'START'>
Start operation.
- STOP = <AsyncioLogSinkOp.STOP: 'STOP'>
Stop operation.
AsyncioLogSinkOpResult is returned by AsyncioLogSinkWaitHandle.wait() and by awaiting a wait handle.
- class mvx.common.logger.AsyncioLogSinkOpResult(op_name, success, error=None)
Result of an AsyncioLogSink lifecycle operation.
- Parameters:
op_name (
AsyncioLogSinkOp) – lifecycle operation name.success (
bool) – whether the operation completed successfully.error (
AsyncioLogSinkError|None) – operation error, or None if the operation succeeded.
Wait handle
AsyncioLogSinkWaitHandle is returned by start() and stop().
It can be used from synchronous code through wait() or awaited from async code.
- class mvx.common.logger.AsyncioLogSinkWaitHandle(operation)
Wait handle returned by AsyncioLogSink.start() and AsyncioLogSink.stop().
The handle can be used synchronously through wait() or awaited from async code. Both forms return AsyncioLogSinkOpResult.
Create a wait handle for a lifecycle operation.
- Parameters:
operation (
AsyncioLogSinkOp) – lifecycle operation represented by this handle.
- wait()
Wait synchronously for the lifecycle operation to finish.
- Return type:
- Returns:
lifecycle operation result.
Base class
AsyncioLogSink is the base class for asynchronous sink implementations.
It implements the synchronous log(event) sink boundary while moving delivery to an internal asyncio runtime.
- class mvx.common.logger.AsyncioLogSink(*, namespace=None, queue_max_size=None, queue_overflow_policy=AsyncioLogSinkQueueOverflowPolicy.RAISE_ERROR)
Base class for sinks with asynchronous delivery.
AsyncioLogSink preserves the synchronous LogSinkProto.log(event) boundary while moving actual delivery into an asyncio dispatcher.
The class provides lifecycle management, thread-safe event acceptance, lazy startup, pending-event accounting, overflow handling, flushing, dispatcher failure handling, and package-managed threaded runtime creation.
Subclasses must implement _dispatch_core(). They may override _on_starting() and _on_stopped() to manage backend resources.
Create an async sink bound to the current running event loop.
Direct construction requires a running event loop in the current thread. Package-managed creation through create() builds a dedicated event loop thread and constructs the sink inside it.
- Parameters:
namespace (
str|None) – optional namespace used for internal runtime task names.queue_max_size (
int|None) – optional maximum number of pending accepted events. If omitted, DEFAULT_QUEUE_MAX_SIZE is used.queue_overflow_policy (
AsyncioLogSinkQueueOverflowPolicy) – behavior used when the pending-event limit is reached.
- Raises:
AsyncioLogSinkEventLoopUnavailableError – if no running event loop is available in the current thread.
- get_status()
Return the current lifecycle state.
- Return type:
- Returns:
current sink state.
- start()
Start the async sink runtime.
The method schedules startup on the sink event loop and returns immediately with a wait handle. If startup is already in progress, the returned handle is attached to the same startup operation.
- Return type:
- Returns:
wait handle for the start operation.
- stop()
Stop the async sink runtime.
The method schedules shutdown on the sink event loop and returns immediately with a wait handle. Stop is valid only when the sink is running.
Shutdown flushes accepted events on a best-effort basis, stops the dispatcher, and runs the stop hook.
- Return type:
- Returns:
wait handle for the stop operation.
- async _on_starting()
Run backend-specific startup logic.
This hook is called before the dispatcher is started. Subclasses may override it to open connections, create clients, or prepare backend resources.
- Return type:
None- Returns:
None.
- async _on_stopped()
Run backend-specific shutdown logic.
This hook is called after the dispatcher has stopped during normal shutdown. Subclasses may override it to close connections, clients, handlers, or other backend resources.
- Return type:
None- Returns:
None.
- abstractmethod async _dispatch_core(event)
Deliver one event to the backend.
Subclasses must implement this method. It is called by the dispatcher task for each accepted event.
- Parameters:
event (
LogEvent) – prepared event to deliver.- Return type:
None- Returns:
None.
- log(event)
Accept a log event for asynchronous delivery.
This method is thread-safe and designed for the logging hot path. It performs state checks, pending-event accounting, optional lazy startup, and thread-safe queue handoff. It does not wait for actual backend delivery.
Accepted events are delivered later by the dispatcher task.
- Parameters:
event (
LogEvent) – prepared event to accept.- Return type:
None- Returns:
None.
- Raises:
AsyncioLogSinkInvalidStateError – if the sink cannot accept events in its current state.
AsyncioLogSinkQueueOverflowError – if the pending-event limit is reached and overflow policy is RAISE_ERROR.
- classmethod build_descriptor(**kwargs)
Build a descriptor for this async sink configuration.
Subclasses that participate in package-level sink registration must override this method.
- Parameters:
kwargs (
Any) – sink-specific configuration arguments.- Return type:
- Returns:
sink descriptor.
- Raises:
NotImplementedError – always raised by the base class.
- classmethod create(**kwargs)
Create a package-managed async sink with a dedicated event loop thread.
The method starts a new thread, creates an event loop inside it, constructs the sink on that loop, starts the sink, and returns the sink with an idempotent terminator.
The returned terminator stops the sink runtime, schedules event loop shutdown, joins the runtime thread, and raises any termination error.
- Parameters:
kwargs (
Any) – arguments passed to the sink constructor.- Return type:
tuple[LogSinkProto,Callable[[],None]]- Returns:
pair containing the created sink and its terminator.
- Raises:
RuntimeError – if runtime creation, sink bootstrap, sink startup, or runtime shutdown setup fails.
Required subclass methods
A concrete package-managed async sink must implement two methods.
build_descriptor(...)
describe the sink resource and configuration for registry identity
_dispatch_core(event)
deliver one accepted event to the backend
build_descriptor() is a class-level method used before sink creation by the package-level registry.
The base implementation raises NotImplementedError. This is intentional: AsyncioLogSink cannot know which backend resource or configuration values define descriptor identity for a concrete sink.
_dispatch_core(event) is the per-event delivery hook called by the dispatcher task.
The base class cannot implement it because backend delivery is specific to the concrete sink.
Optional subclass hooks
Subclasses may override these lifecycle hooks:
_on_starting()
open backend resources before the dispatcher starts
_on_stopped()
close backend resources after the dispatcher stops
Use _on_starting() for backend setup such as opening connections, creating clients, authenticating, or preparing handles.
Use _on_stopped() for backend cleanup such as closing clients, handlers, files, or network connections.
If no backend setup or cleanup is needed, the inherited no-op hooks are sufficient.
Method ownership
A typical subclass should not override log(event).
The base implementation owns:
state checks
lazy startup
pending-event accounting
overflow handling
thread-safe queue handoff
A typical subclass should not override create(...) either.
The base create() implementation owns:
dedicated event loop thread creation
sink bootstrap inside that loop
startup wait
terminator creation
runtime shutdown
A concrete subclass normally supplies only:
build_descriptor(...)
_dispatch_core(event)
optional startup/shutdown hooks
Package-managed creation
AsyncioLogSink.create() creates a package-managed async sink runtime.
It returns:
sink
terminator
The sink is used through the normal LogSinkProto boundary.
The terminator stops the sink runtime and shuts down the dedicated event loop thread.
create() is common runtime machinery. build_descriptor() is still required on the subclass so that configure_log_sink() can detect idempotent registrations and configuration conflicts before creation.
Direct construction
Direct construction binds the sink to the currently running event loop.
sink = MyAsyncSink(...)
This form requires a running event loop in the current thread.
Direct construction is useful when the caller owns the event loop and wants the sink runtime to live there.
Package-level registration usually uses create() instead, because create() builds a dedicated event loop thread and returns an idempotent terminator.
Custom subclass shape
A concrete async sink usually has this shape:
from typing import Any
from mvx.common.logger import AsyncioLogSink, LogEvent, LogSinkDescriptor
class CustomAsyncSink(AsyncioLogSink):
@classmethod
def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
return LogSinkDescriptor(
sink_type="custom",
resource_key=("custom", kwargs["target"]),
config_key=(),
)
async def _on_starting(self) -> None:
...
async def _dispatch_core(self, event: LogEvent) -> None:
...
async def _on_stopped(self) -> None:
...
Only _dispatch_core() and build_descriptor() are conceptually required for a concrete package-managed async sink.
The lifecycle hooks are optional and depend on backend needs.
Error hierarchy
AsyncioLogSink has its own error family.
- class mvx.common.logger.AsyncioLogSinkError(*, message, details=None, cause=None, reason=None)
Base class for AsyncioLogSink errors.
All errors raised by the async sink runtime inherit from this class and carry a reason code from AsyncioLogSinkErrorReason.
- Parameters:
message (str)
details (Optional[Mapping[str, Any]])
cause (Optional[Exception])
reason (Optional[str])
- enum mvx.common.logger.AsyncioLogSinkErrorReason(value)
Reason codes used by AsyncioLogSinkError subclasses.
- Member Type:
str
Valid values are as follows:
- EVENT_LOOP_UNAVAILABLE = <AsyncioLogSinkErrorReason.EVENT_LOOP_UNAVAILABLE: 'EVENT_LOOP_UNAVAILABLE'>
A sink was created without an available running event loop.
- INVALID_LOG_SINK_STATE = <AsyncioLogSinkErrorReason.INVALID_LOG_SINK_STATE: 'INVALID_LOG_SINK_STATE'>
An operation was requested while the sink was in an invalid state.
- ON_STARTING_HOOK_FAILED = <AsyncioLogSinkErrorReason.ON_STARTING_HOOK_FAILED: 'ON_STARTING_HOOK_FAILED'>
The _on_starting() hook failed.
- ON_STOPPED_HOOK_FAILED = <AsyncioLogSinkErrorReason.ON_STOPPED_HOOK_FAILED: 'ON_STOPPED_HOOK_FAILED'>
The _on_stopped() hook failed.
- QUEUE_OVERFLOW = <AsyncioLogSinkErrorReason.QUEUE_OVERFLOW: 'QUEUE_OVERFLOW'>
The sink queue limit was reached.
- DISPATCHER_UNEXPECTEDLY_CANCELLED = <AsyncioLogSinkErrorReason.DISPATCHER_UNEXPECTEDLY_CANCELLED: 'DISPATCHER_UNEXPECTEDLY_CANCELLED'>
The dispatcher task was cancelled unexpectedly.
- UNEXPECTED_ERROR = <AsyncioLogSinkErrorReason.UNEXPECTED_ERROR: 'UNEXPECTED_ERROR'>
An unexpected error occurred inside the sink runtime.
Raised when direct sink construction cannot find a running event loop.
This applies to direct AsyncioLogSink construction. Package-managed creation through create() builds a dedicated event loop runtime.
- class mvx.common.logger.AsyncioLogSinkInvalidStateError(sink_state, expected_states, cause=None)
Raised when a sink operation is not valid for the current lifecycle state.
Create an invalid-state error.
- Parameters:
sink_state (
AsyncioLogSinkState) – actual sink state.expected_states (
tuple[AsyncioLogSinkState,...]) – states allowed for the requested operation.cause (
Exception|None) – optional underlying cause.
- class mvx.common.logger.AsyncioLogSinkOnStartingHookFailedError(cause)
Raised when the _on_starting() lifecycle hook fails.
Create a startup-hook failure error.
- Parameters:
cause (
Exception) – exception raised by the startup hook.
- class mvx.common.logger.AsyncioLogSinkOnStoppedHookFailedError(cause)
Raised when the _on_stopped() lifecycle hook fails.
Create a stop-hook failure error.
- Parameters:
cause (
Exception) – exception raised by the stop hook.
- class mvx.common.logger.AsyncioLogSinkQueueOverflowError
Raised when the accepted-event limit is reached.
This error is raised when queue overflow policy is RAISE_ERROR.
- class mvx.common.logger.AsyncioLogSinkDispatcherCancelledError
Raised when the dispatcher task is cancelled unexpectedly.
Normal shutdown cancels the dispatcher as part of stopping. This error represents cancellation outside the normal stopping path.
- class mvx.common.logger.AsyncioLogSinkUnexpectedError(cause)
Raised when an unexpected runtime error is wrapped by the async sink.
Create an unexpected-error wrapper.
- Parameters:
cause (
Exception) – original unexpected exception.