Coverage for src / lilbee / providers / worker / health_ticker.py: 100%
45 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Background ticker that drives WorkerPool.reap_idle and ping_role on a fixed cadence."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8from concurrent.futures import Future
9from dataclasses import dataclass
10from typing import TYPE_CHECKING
12if TYPE_CHECKING:
13 # circular: health_ticker -> pool via WorkerPool/PoolRuntime (annotation-only)
14 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool
16log = logging.getLogger(__name__)
18# 30s is the smallest interval that meaningfully reduces wall-clock idle
19# accumulation without burning bg-loop wakeups when nothing is live (the
20# tick is a cheap no-op when no role channels are alive). Tunable via
21# monkeypatch in tests; not exposed as a user setting because the per-role
22# max_idle_s knob already covers the meaningful axis.
23_TICK_INTERVAL_S = 30.0
26@dataclass
27class HealthTickerHandle:
28 """Holds the ticker's background future for later cancellation."""
30 future: Future[None] | None = None
33async def _tick_once(pool: WorkerPool, runtime: PoolRuntime) -> None:
34 """Run one reap + ping pass against every currently-live role.
36 Bridges the bg-loop (where this coroutine runs) to the PoolRuntime
37 loop (which owns the per-role asyncio.Lock instances). We never block
38 the bg-loop while waiting for the pool's response. ``asyncio.CancelledError``
39 is a ``BaseException`` (not an ``Exception``) so it escapes the
40 ``except Exception`` guards naturally and unwinds the outer
41 ``_ticker_loop`` cleanly.
42 """
43 live_roles = tuple(role for role in pool.registered_roles if pool.accessor(role).is_alive)
44 if not live_roles:
45 return
46 try:
47 await asyncio.wrap_future(runtime.submit(pool.reap_idle()))
48 except Exception:
49 log.debug("Pool reap_idle failed", exc_info=True)
50 for role in live_roles:
51 try:
52 await asyncio.wrap_future(runtime.submit(pool.ping_role(role)))
53 except Exception:
54 # Pool's restart-on-crash policy already records the crash; the
55 # next real call respawns the role lazily.
56 log.debug("Health ping failed for role=%s", role, exc_info=True)
59async def _ticker_loop(pool: WorkerPool, runtime: PoolRuntime, interval_s: float) -> None:
60 """Sleep then tick forever. Cancellation is the normal exit path."""
61 try:
62 while True:
63 await asyncio.sleep(interval_s)
64 await _tick_once(pool, runtime)
65 except asyncio.CancelledError:
66 return
69def start_health_ticker(
70 pool: WorkerPool,
71 runtime: PoolRuntime,
72 bg_loop: asyncio.AbstractEventLoop,
73 *,
74 interval_s: float = _TICK_INTERVAL_S,
75) -> HealthTickerHandle:
76 """Schedule the ticker on *bg_loop* and return a handle for cancellation."""
77 handle = HealthTickerHandle()
78 handle.future = asyncio.run_coroutine_threadsafe(
79 _ticker_loop(pool, runtime, interval_s), bg_loop
80 )
81 return handle
84def stop_health_ticker(handle: HealthTickerHandle, *, timeout: float = 5.0) -> None:
85 """Cancel the ticker and wait briefly for it to wind down. Idempotent.
87 CancelledError is the expected exit; any other exception is treated
88 as best-effort cleanup because the bg-loop owns the task lifetime
89 past this point.
90 """
91 future = handle.future
92 if future is None:
93 return
94 handle.future = None
95 future.cancel()
96 with contextlib.suppress(TimeoutError, asyncio.CancelledError, Exception):
97 future.result(timeout=timeout)
100__all__ = [
101 "_TICK_INTERVAL_S",
102 "HealthTickerHandle",
103 "start_health_ticker",
104 "stop_health_ticker",
105]