Coverage for src / lilbee / providers / worker / health_ticker.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Background ticker that drives WorkerPool.reap_idle and ping_role on a fixed cadence.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8from concurrent.futures import Future 

9from dataclasses import dataclass 

10from typing import TYPE_CHECKING 

11 

12if TYPE_CHECKING: 

13 # circular: health_ticker -> pool via WorkerPool/PoolRuntime (annotation-only) 

14 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool 

15 

16log = logging.getLogger(__name__) 

17 

18# 30s is the smallest interval that meaningfully reduces wall-clock idle 

19# accumulation without burning bg-loop wakeups when nothing is live (the 

20# tick is a cheap no-op when no role channels are alive). Tunable via 

21# monkeypatch in tests; not exposed as a user setting because the per-role 

22# max_idle_s knob already covers the meaningful axis. 

23_TICK_INTERVAL_S = 30.0 

24 

25 

26@dataclass 

27class HealthTickerHandle: 

28 """Holds the ticker's background future for later cancellation.""" 

29 

30 future: Future[None] | None = None 

31 

32 

33async def _tick_once(pool: WorkerPool, runtime: PoolRuntime) -> None: 

34 """Run one reap + ping pass against every currently-live role. 

35 

36 Bridges the bg-loop (where this coroutine runs) to the PoolRuntime 

37 loop (which owns the per-role asyncio.Lock instances). We never block 

38 the bg-loop while waiting for the pool's response. ``asyncio.CancelledError`` 

39 is a ``BaseException`` (not an ``Exception``) so it escapes the 

40 ``except Exception`` guards naturally and unwinds the outer 

41 ``_ticker_loop`` cleanly. 

42 """ 

43 live_roles = tuple(role for role in pool.registered_roles if pool.accessor(role).is_alive) 

44 if not live_roles: 

45 return 

46 try: 

47 await asyncio.wrap_future(runtime.submit(pool.reap_idle())) 

48 except Exception: 

49 log.debug("Pool reap_idle failed", exc_info=True) 

50 for role in live_roles: 

51 try: 

52 await asyncio.wrap_future(runtime.submit(pool.ping_role(role))) 

53 except Exception: 

54 # Pool's restart-on-crash policy already records the crash; the 

55 # next real call respawns the role lazily. 

56 log.debug("Health ping failed for role=%s", role, exc_info=True) 

57 

58 

59async def _ticker_loop(pool: WorkerPool, runtime: PoolRuntime, interval_s: float) -> None: 

60 """Sleep then tick forever. Cancellation is the normal exit path.""" 

61 try: 

62 while True: 

63 await asyncio.sleep(interval_s) 

64 await _tick_once(pool, runtime) 

65 except asyncio.CancelledError: 

66 return 

67 

68 

69def start_health_ticker( 

70 pool: WorkerPool, 

71 runtime: PoolRuntime, 

72 bg_loop: asyncio.AbstractEventLoop, 

73 *, 

74 interval_s: float = _TICK_INTERVAL_S, 

75) -> HealthTickerHandle: 

76 """Schedule the ticker on *bg_loop* and return a handle for cancellation.""" 

77 handle = HealthTickerHandle() 

78 handle.future = asyncio.run_coroutine_threadsafe( 

79 _ticker_loop(pool, runtime, interval_s), bg_loop 

80 ) 

81 return handle 

82 

83 

84def stop_health_ticker(handle: HealthTickerHandle, *, timeout: float = 5.0) -> None: 

85 """Cancel the ticker and wait briefly for it to wind down. Idempotent. 

86 

87 CancelledError is the expected exit; any other exception is treated 

88 as best-effort cleanup because the bg-loop owns the task lifetime 

89 past this point. 

90 """ 

91 future = handle.future 

92 if future is None: 

93 return 

94 handle.future = None 

95 future.cancel() 

96 with contextlib.suppress(TimeoutError, asyncio.CancelledError, Exception): 

97 future.result(timeout=timeout) 

98 

99 

100__all__ = [ 

101 "_TICK_INTERVAL_S", 

102 "HealthTickerHandle", 

103 "start_health_ticker", 

104 "stop_health_ticker", 

105]