Coverage for src / lilbee / runtime / asyncio_loop.py: 100%
54 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Process-lifetime background asyncio loop for TUI workers.
3One loop on a daemon thread, used by every @work(thread=True) worker.
4CLI one-shots and the server own their own loops: don't route them here.
5"""
7from __future__ import annotations
9import asyncio
10import atexit
11import concurrent.futures
12import contextlib
13import threading
14from collections.abc import Coroutine
15from typing import Any, TypeVar
17T = TypeVar("T")
19# concurrent.futures raises this exact RuntimeError message when submitting to
20# a shutdown executor (Python 3.11+). There is no dedicated exception class to
21# catch, so callers have to string-match the message. The constant + helper
22# below give every caller one place to depend on instead of duplicating the
23# string + isinstance check inline.
24_EXECUTOR_SHUTDOWN_MSG = "cannot schedule new futures after shutdown"
27def is_executor_shutdown(exc: BaseException) -> bool:
28 """True if ``exc`` is the concurrent.futures shutdown-race ``RuntimeError``."""
29 return isinstance(exc, RuntimeError) and _EXECUTOR_SHUTDOWN_MSG in str(exc)
32_loop: asyncio.AbstractEventLoop | None = None
33_thread: threading.Thread | None = None
34_lock = threading.Lock()
35_atexit_registered = False
38def get_loop() -> asyncio.AbstractEventLoop:
39 """Return the background loop, starting it on a daemon thread if needed."""
40 global _loop, _thread, _atexit_registered
41 with _lock:
42 if _loop is not None and not _loop.is_closed():
43 return _loop
44 loop = asyncio.new_event_loop()
45 thread = threading.Thread(
46 target=loop.run_forever,
47 name="lilbee-bg-loop",
48 daemon=True,
49 )
50 thread.start()
51 _loop = loop
52 _thread = thread
53 if not _atexit_registered:
54 # Register once per process; shutdown is idempotent, so restarting
55 # the loop later doesn't need a second registration.
56 atexit.register(shutdown)
57 _atexit_registered = True
58 return loop
61def run(coro: Coroutine[Any, Any, T]) -> T:
62 """Submit *coro* to the background loop from any thread; block for result.
64 Exceptions raised inside *coro* propagate unchanged, including
65 asyncio.CancelledError.
66 """
67 loop = get_loop()
68 try:
69 return asyncio.run_coroutine_threadsafe(coro, loop).result()
70 except concurrent.futures.CancelledError as exc:
71 # run_coroutine_threadsafe re-raises cancellation as the concurrent
72 # flavour; rewrap so `except asyncio.CancelledError` still matches.
73 raise asyncio.CancelledError(*exc.args) from None
76def shutdown() -> None:
77 """Cancel pending tasks, stop the loop, join the thread. Idempotent."""
78 global _loop, _thread
79 with _lock:
80 loop, _loop = _loop, None
81 thread, _thread = _thread, None
82 if loop is None or loop.is_closed():
83 return
84 # Best-effort drain; always stop the loop even if drain raised.
85 with contextlib.suppress(Exception):
86 asyncio.run_coroutine_threadsafe(_drain(loop), loop).result(timeout=10.0)
87 loop.call_soon_threadsafe(loop.stop)
88 if thread is not None:
89 thread.join(timeout=10.0)
90 loop.close()
93async def _drain(loop: asyncio.AbstractEventLoop) -> None:
94 pending = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()]
95 for task in pending:
96 task.cancel()
97 if pending:
98 await asyncio.gather(*pending, return_exceptions=True)
99 # Give scheduled close callbacks a chance to run before we stop the loop.
100 await asyncio.sleep(0.05)