run_with_cancellation_policy

run_with_cancellation_policy runs one async operation under an explicit cancellation policy.

It is intended for lifecycle and cleanup code where caller cancellation should not always abort the protected operation immediately.

Why it exists

In asyncio, cancellation is delivered by injecting asyncio.CancelledError into the awaiting task.

For ordinary operations, this behavior is usually correct:

result = await operation()

If the caller task is cancelled, the await is interrupted.

But some operations are different. Cleanup, graceful disconnect, drain-to-idle, or teardown sequences may need to finish even if the caller task receives cancellation while waiting.

Examples:

disconnect
cleanup
drain pending work
close transport
wait until internal queues become idle

In those cases, the code often needs one of three behaviors:

  • propagate cancellation normally;

  • defer cancellation and return whether it happened;

  • defer cancellation and re-raise it after the protected operation completes.

run_with_cancellation_policy makes that choice explicit.

Cancellation policies

The behavior is controlled by CancellationPolicy.

from mvx.common.helpers import CancellationPolicy

PLAIN

CancellationPolicy.PLAIN behaves like a normal await.


from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> None:
    ...

async def main():
    result = await run_with_cancellation_policy(
        core_func=operation,
        policy=CancellationPolicy.PLAIN,
    )

This is equivalent to:

result = await operation()

No separate task is created. No shielding is applied.

If the caller task is cancelled, cancellation propagates normally and the await is interrupted.

Use this policy when no special cancellation handling is needed.

DEFER_FLAG

CancellationPolicy.DEFER_FLAG protects the core operation from caller-task cancellation and reports whether cancellation was requested.

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> None:
    ...
async def main():
    cancel_requested, result = await run_with_cancellation_policy(
        core_func=operation,
        policy=CancellationPolicy.DEFER_FLAG,
        op_name="disconnect",
    )

The core operation runs in its own task. The caller awaits it through asyncio.shield().

If the caller task is cancelled while waiting:

  • the core task is not cancelled;

  • the cancellation request is recorded;

  • the caller task cancellation state is cleared;

  • waiting continues until the core task finishes.

On success, the function returns:

cancel_requested, result

This policy is useful when higher-level code needs to know that cancellation was requested, but the protected operation still had to finish.

DEFER_RERAISE

CancellationPolicy.DEFER_RERAISE also protects the core operation from caller-task cancellation.

The difference is the final outcome.

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> None:
    ...
async def main():
    result = await run_with_cancellation_policy(
        core_func=operation,
        policy=CancellationPolicy.DEFER_RERAISE,
        op_name="cleanup",
    )

If cancellation was requested while waiting, the function waits for the core operation to complete successfully and then raises asyncio.CancelledError.

This gives deferred cancellation semantics:

finish the protected operation first
then restore cancellation to the caller

Use this policy when cleanup must finish, but the caller should still observe cancellation after cleanup is complete.

Core task cancellation

Deferred policies protect the core task from caller-task cancellation.

They do not hide cancellation of the core task itself.

If the core task is cancelled directly, asyncio.CancelledError is propagated.

This distinction matters:

caller cancelled while waiting  → can be deferred
core task cancelled             → propagated

Core exceptions

Exceptions raised by the core operation are never masked.

async def operation() -> int:
    raise RuntimeError("operation failed")

All policies propagate that exception:

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> None:
    ...
async def main():
    try:
        await run_with_cancellation_policy(
            core_func=operation,
            policy=CancellationPolicy.DEFER_FLAG,
        )
    except RuntimeError:
        handled = True

Deferred policies only affect caller-task cancellation while waiting. They do not turn core failures into successful results.

Operation invocation

The core_func callable is invoked exactly once.

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> str:
    return "done"

async def main():
    result = await run_with_cancellation_policy(
        core_func=operation,
        policy=CancellationPolicy.PLAIN,
    )

In deferred policies, the awaitable returned by core_func is wrapped into a single internal task.

The operation is not retried, restarted, or supervised.

Passing arguments to the operation

core_func is a zero-argument callable. If the coroutine function needs arguments, wrap the call with lambda.

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def disconnect(reason: str, force: bool) -> None:
    ...
async def main():
    await run_with_cancellation_policy(
        core_func=lambda: disconnect(reason="shutdown", force=True),
        policy=CancellationPolicy.DEFER_RERAISE,
        op_name="disconnect",
    )

The lambda is not used for laziness as a trick. It is the explicit boundary where the helper receives a no-argument callable and remains responsible only for cancellation policy, not for knowing the operation signature.

Task name

Deferred policies create an internal task.

The op_name argument is used as that task’s name:

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def operation() -> str:
    return "done"

async def main():
    cancel_requested, result = await run_with_cancellation_policy(
        core_func=operation,
        policy=CancellationPolicy.DEFER_FLAG,
        op_name="transport-cleanup",
    )

This is useful for debugging, logs, and asyncio task introspection.

op_name is ignored by CancellationPolicy.PLAIN.

Cancellation accounting

Deferred policies use Task.cancelling() and Task.uncancel().

Multiple calls to Task.cancel() may leave multiple pending cancellation requests on the caller task. If those pending requests are not cleared, the next await may immediately raise CancelledError again.

The helper drains pending caller-task cancellations while it is deliberately deferring cancellation.

This is the mechanism that allows the helper to continue waiting for the core task after caller cancellation was requested.

Typical usage

The helper is most useful around lifecycle code:

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def disconnect() -> bool:
    return True

async def main():
    cancel_requested, _ = await run_with_cancellation_policy(
        core_func=disconnect,
        policy=CancellationPolicy.DEFER_FLAG,
        op_name="transport-disconnect",
    )

    if cancel_requested:
        cleanup_info = "disconnect completed after cancellation request"

For cleanup that must finish but should preserve cancellation semantics:

from mvx.common.helpers import run_with_cancellation_policy, CancellationPolicy

async def cleanup() -> bool:
    return True

async def main():
    await run_with_cancellation_policy(
        core_func=cleanup,
        policy=CancellationPolicy.DEFER_RERAISE,
        op_name="processor-cleanup",
    )

Design rule

Use run_with_cancellation_policy when cancellation behavior is part of the operation contract.

Do not use it as a generic wrapper around every async call. Most awaits should remain plain awaits.

This helper is for places where the code must explicitly choose between normal cancellation, deferred cancellation with a flag, and deferred cancellation with re-raise.

API

class mvx.common.helpers.CancellationPolicy(*values)

Cancellation handling policy for run_with_cancellation_policy().

async mvx.common.helpers.run_with_cancellation_policy(core_func, *, policy=CancellationPolicy.DEFER_FLAG, op_name='unknown')

Run one awaitable under the selected cancellation policy.

Parameters:
  • core_func (Callable[[], Awaitable[TypeVar(T)]]) – Zero-argument callable returning the awaitable to run.

  • policy (CancellationPolicy) – Cancellation policy applied while awaiting the operation.

  • op_name (str) – Name assigned to the internal task in deferred policies.

Return type:

Union[TypeVar(T), Tuple[bool, TypeVar(T)]]

Returns:

The operation result, or (cancel_requested, result) when using CancellationPolicy.DEFER_FLAG.

Raises:
  • asyncio.CancelledError – Raised according to the selected cancellation policy or when the core task itself is cancelled.

  • Exception – Propagates exceptions raised by the core awaitable.