Custom sink

Overview

A custom sink is the lowest-level delivery extension point of the logger.

It receives completed LogEvent objects and delivers or hands them off to a backend.

The minimal contract is small:

class MySink:
    def log(self, event: LogEvent) -> None:
        ...

But the behavior behind that method matters.

log(event) is called on the logging hot path and may be called concurrently. A custom sink should keep that method fast and thread-safe.

When to write a custom sink

Write a custom sink when ready-to-use sinks do not match the destination you need.

Good reasons:

send events to an internal collector
append events to an application-owned queue
write events into a custom in-memory store for tests
integrate with a proprietary logging backend
bridge LogEvent objects into another telemetry system

Do not write a custom sink just to change which events are emitted or how payloads are normalized.

Use event policy for event selection.

Use payload processing for payload representation.

Use a sink only for delivery.

Minimal direct sink

A direct sink can be created and passed directly to LogContext.

from threading import RLock

from mvx.common.logger import LogEvent


class ListSink:
    def __init__(self) -> None:
        self._lock = RLock()
        self.events: list[LogEvent] = []

    def log(self, event: LogEvent) -> None:
        with self._lock:
            self.events.append(event)

Use it directly:

from mvx.common.logger import LogContext, LogPayloadProcessor


sink = ListSink()
ctx = LogContext(
    namespace="test",
    log_sink=sink,
    payload_processor=LogPayloadProcessor(),
)

This is enough for tests and simple local integrations.

No descriptor is needed because the sink is not registered in the package-level sink registry.

No terminator is needed unless the sink owns resources that must be closed.

Why the lock matters

The lock in the example is not decorative.

log(event) may be called from multiple threads. If the sink mutates internal state, it must protect that state.

For example, this state needs synchronization:

in-memory event lists
closed flags
handler references
internal counters
queues that are not already thread-safe
backend clients shared across threads

If a sink uses only thread-safe primitives, an explicit lock may not be needed. But the implementation must still provide safe behavior under concurrent calls.

Hot path behavior

log(event) should be fast.

Avoid slow work inside this method:

network calls
database writes
remote acknowledgements
retry loops
reconnect logic
slow file flushes
large serialization work

If delivery may be slow, log(event) should hand the event off to another runtime.

Typical handoff strategies:

thread-safe queue
background thread
event loop callback
async dispatcher
existing non-blocking client

For asynchronous delivery, use AsyncioLogSink instead of building lifecycle machinery from scratch.

What the sink receives

A sink receives a fully prepared LogEvent.

By the time log(event) is called:

LogEventMeta has already been built
event policy has already accepted the event
payload has already been normalized unless explicitly skipped
LogEvent has already been created

The sink should not normally inspect raw application objects.

It should not normalize payload values.

It should not decide whether an event should be emitted.

It should deliver or hand off the event it receives.

Formatting inside a sink

A sink may perform backend-specific formatting.

For example, a sink may convert LogEvent into:

JSON object
logging.LogRecord
database row
message queue record
HTTP request body

Keep formatting small and predictable on the hot path.

If formatting is expensive, move it to the background delivery runtime together with backend I/O.

Direct sink with close()

If a direct sink owns resources, expose an explicit close method.

from threading import RLock

from mvx.common.logger import LogEvent


class ClosableListSink:
    def __init__(self) -> None:
        self._lock = RLock()
        self._closed = False
        self.events: list[LogEvent] = []

    def log(self, event: LogEvent) -> None:
        with self._lock:
            if self._closed:
                return
            self.events.append(event)

    def close(self) -> None:
        with self._lock:
            self._closed = True

Calls after close are ignored in this example.

A different sink may choose to raise instead. The important part is that the behavior is explicit and thread-safe.

Package-managed custom sink

A sink that should be registered through configure_log_sink() must provide the package-managed sink class contract.

That means:

log(event)
build_descriptor(...)
create(...)

Example:

from threading import RLock
from typing import Any

from mvx.common.logger import (
    LogEvent,
    LogSinkDescriptor,
    LogSinkProto,
    LogSinkTerminator,
)


class MemorySink:
    def __init__(self, *, bucket: str) -> None:
        self._lock = RLock()
        self._closed = False
        self._bucket = bucket
        self.events: list[LogEvent] = []

    @classmethod
    def build_descriptor(cls, **kwargs: Any) -> LogSinkDescriptor:
        bucket = kwargs["bucket"]
        return LogSinkDescriptor(
            sink_type="memory",
            resource_key=("memory", bucket),
            config_key=(),
        )

    @classmethod
    def create(cls, **kwargs: Any) -> tuple[LogSinkProto, LogSinkTerminator]:
        sink = cls(**kwargs)

        def terminator() -> None:
            sink.close()

        return sink, terminator

    def log(self, event: LogEvent) -> None:
        with self._lock:
            if self._closed:
                return
            self.events.append(event)

    def close(self) -> None:
        with self._lock:
            self._closed = True

Register it:

from mvx.common.logger import configure_log_sink


sink = configure_log_sink(
    name="memory",
    sink_cls=MemorySink,
    bucket="test-events",
)

This sink can now participate in package-level sink registration and reset flows.

Descriptor design

build_descriptor() should describe sink identity.

The descriptor has three parts:

sink_type
    logical sink type

resource_key
    backend resource identity

config_key
    relevant configuration that affects compatibility

The descriptor is used by the package-level registry before sink creation.

It decides whether a repeated registration request is idempotent or conflicting.

same name + same descriptor
    return existing sink

same name + different descriptor
    raise conflict

Use resource_key for the target resource.

Use config_key for behavior that would make the same named sink incompatible with an existing one.

Terminator design

create() returns a sink and a terminator.

The terminator is called when the package-level lifecycle closes or resets the sink.

A terminator should be:

idempotent
thread-safe
safe if the sink was only partially used

For simple sinks, the terminator can call close().

For async or background sinks, the terminator should stop the runtime and release backend resources.

When to use AsyncioLogSink instead

Do not build your own background runtime unless you need something unusual.

Use AsyncioLogSink when:

delivery may be slow
backend API is async
sink needs queueing
sink needs background dispatch
sink needs lifecycle start/stop hooks
sink must keep log(event) fast while delivery happens elsewhere

With AsyncioLogSink, the subclass normally implements:

build_descriptor(...)
_dispatch_core(event)
optional _on_starting()
optional _on_stopped()

The base class owns event acceptance, thread-safe handoff, lifecycle, and package-managed runtime creation.

What a custom sink should not do

A custom sink should not normally filter events.

Filtering belongs to event policy.

A custom sink should not normalize raw payload values.

Payload normalization belongs to payload processing.

A custom sink should not do slow backend delivery directly inside log(event).

Slow delivery belongs behind a queue, thread, event loop, or another runtime boundary.

A custom sink should not mutate shared state without synchronization.

Thread-safety is part of the expected sink behavior.

Testing a custom sink

A custom sink should be tested independently from the package-level facade.

At minimum, test:

log(event) accepts prepared events
internal state is protected under concurrent calls
close or terminator is idempotent
descriptor is stable for equivalent configuration
descriptor changes for conflicting configuration

For package-managed sinks, also test:

configure_log_sink returns existing sink for same descriptor
configure_log_sink raises conflict for different descriptor
close_log_sink calls terminator

Design summary

A custom sink is a delivery extension point.

The minimal direct sink implements log(event).

A package-managed sink also implements build_descriptor() and create().

log(event) should be fast and thread-safe.

The sink receives prepared LogEvent objects and should deliver or hand them off without taking over event selection or payload normalization.