Coverage for src / lilbee / runtime / asyncio_loop.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Process-lifetime background asyncio loop for TUI workers. 

2 

3One loop on a daemon thread, used by every @work(thread=True) worker. 

4CLI one-shots and the server own their own loops: don't route them here. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import atexit 

11import concurrent.futures 

12import contextlib 

13import threading 

14from collections.abc import Coroutine 

15from typing import Any, TypeVar 

16 

17T = TypeVar("T") 

18 

19# concurrent.futures raises this exact RuntimeError message when submitting to 

20# a shutdown executor (Python 3.11+). There is no dedicated exception class to 

21# catch, so callers have to string-match the message. The constant + helper 

22# below give every caller one place to depend on instead of duplicating the 

23# string + isinstance check inline. 

24_EXECUTOR_SHUTDOWN_MSG = "cannot schedule new futures after shutdown" 

25 

26 

27def is_executor_shutdown(exc: BaseException) -> bool: 

28 """True if ``exc`` is the concurrent.futures shutdown-race ``RuntimeError``.""" 

29 return isinstance(exc, RuntimeError) and _EXECUTOR_SHUTDOWN_MSG in str(exc) 

30 

31 

32_loop: asyncio.AbstractEventLoop | None = None 

33_thread: threading.Thread | None = None 

34_lock = threading.Lock() 

35_atexit_registered = False 

36 

37 

38def get_loop() -> asyncio.AbstractEventLoop: 

39 """Return the background loop, starting it on a daemon thread if needed.""" 

40 global _loop, _thread, _atexit_registered 

41 with _lock: 

42 if _loop is not None and not _loop.is_closed(): 

43 return _loop 

44 loop = asyncio.new_event_loop() 

45 thread = threading.Thread( 

46 target=loop.run_forever, 

47 name="lilbee-bg-loop", 

48 daemon=True, 

49 ) 

50 thread.start() 

51 _loop = loop 

52 _thread = thread 

53 if not _atexit_registered: 

54 # Register once per process; shutdown is idempotent, so restarting 

55 # the loop later doesn't need a second registration. 

56 atexit.register(shutdown) 

57 _atexit_registered = True 

58 return loop 

59 

60 

61def run(coro: Coroutine[Any, Any, T]) -> T: 

62 """Submit *coro* to the background loop from any thread; block for result. 

63 

64 Exceptions raised inside *coro* propagate unchanged, including 

65 asyncio.CancelledError. 

66 """ 

67 loop = get_loop() 

68 try: 

69 return asyncio.run_coroutine_threadsafe(coro, loop).result() 

70 except concurrent.futures.CancelledError as exc: 

71 # run_coroutine_threadsafe re-raises cancellation as the concurrent 

72 # flavour; rewrap so `except asyncio.CancelledError` still matches. 

73 raise asyncio.CancelledError(*exc.args) from None 

74 

75 

76def shutdown() -> None: 

77 """Cancel pending tasks, stop the loop, join the thread. Idempotent.""" 

78 global _loop, _thread 

79 with _lock: 

80 loop, _loop = _loop, None 

81 thread, _thread = _thread, None 

82 if loop is None or loop.is_closed(): 

83 return 

84 # Best-effort drain; always stop the loop even if drain raised. 

85 with contextlib.suppress(Exception): 

86 asyncio.run_coroutine_threadsafe(_drain(loop), loop).result(timeout=10.0) 

87 loop.call_soon_threadsafe(loop.stop) 

88 if thread is not None: 

89 thread.join(timeout=10.0) 

90 loop.close() 

91 

92 

93async def _drain(loop: asyncio.AbstractEventLoop) -> None: 

94 pending = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()] 

95 for task in pending: 

96 task.cancel() 

97 if pending: 

98 await asyncio.gather(*pending, return_exceptions=True) 

99 # Give scheduled close callbacks a chance to run before we stop the loop. 

100 await asyncio.sleep(0.05)