Coverage for src / lilbee / providers / worker / pool.py: 100%

324 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Lifecycle for per-role inference subprocess workers. 

2 

3Owns the embed, chat, rerank, and vision worker processes. Lifecycle 

4contract, restart-budget policy, idle reaping, and health pings are 

5documented in ``docs/architecture.md`` under "Inference worker pool". 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import contextlib 

12import logging 

13import threading 

14import time 

15from collections import deque 

16from collections.abc import AsyncIterator, Callable, Coroutine 

17from concurrent.futures import Future 

18from dataclasses import dataclass, field 

19from typing import TYPE_CHECKING, Any, TypeVar 

20 

21from lilbee.providers.worker.transport import ( 

22 RoleConfig, 

23 WorkerChannel, 

24 WorkerEntrypoint, 

25 WorkerRole, 

26 WorkerSpawner, 

27) 

28from lilbee.providers.worker.transport_pipe import ( 

29 PipeSpawner, 

30 WorkerCrashError, 

31 WorkerError, 

32) 

33from lilbee.providers.worker.wire_kinds import WireKind 

34 

35if TYPE_CHECKING: 

36 # circular: health_ticker -> pool via WorkerPool/PoolRuntime 

37 from lilbee.providers.worker.health_ticker import HealthTickerHandle 

38 

39log = logging.getLogger(__name__) 

40 

41_T = TypeVar("_T") 

42 

43 

44_DEFAULT_SHUTDOWN_TIMEOUT_S = 5.0 

45_DEFAULT_CALL_TIMEOUT_S = 300.0 

46_DEFAULT_MAX_IDLE_S = 0.0 # 0 = no idle reaping by default 

47_HEALTH_TIMEOUT_S = 5.0 

48_RESTART_BUDGET = 3 

49_RESTART_WINDOW_S = 60.0 

50# After tripping the crash budget the role enters a cooldown instead of a 

51# permanent disable. When the cooldown expires the next call gets one 

52# "half-open" attempt; success clears the crash history, failure re-arms the 

53# cooldown. Matches the classic circuit-breaker pattern. 

54_DEGRADED_COOLDOWN_S = 60.0 

55_RUNTIME_THREAD_NAME = "lilbee-worker-pool-loop" 

56 

57 

58class PoolShutdownError(WorkerError): 

59 """Raised when a caller tries to use a pool that has been shut down.""" 

60 

61 def __init__(self) -> None: 

62 super().__init__( 

63 "PoolShutdownError", 

64 "Inference pool is shutting down. Please wait for current tasks to finish.", 

65 "", 

66 ) 

67 

68 

69class RoleDegradedError(WorkerError): 

70 """Raised when a role is in cooldown after a burst of crashes. 

71 

72 Carries the cooldown deadline so the caller can hint the user when the 

73 pool will accept another attempt. Older builds said "Restart lilbee to 

74 recover"; today the pool auto-retries after ``_DEGRADED_COOLDOWN_S``. 

75 """ 

76 

77 def __init__(self, role: WorkerRole, attempts: int, window_s: float, retry_in_s: float) -> None: 

78 super().__init__( 

79 "RoleDegradedError", 

80 ( 

81 f"The {role} worker crashed {attempts} times in the last " 

82 f"{window_s:.0f}s and is cooling down. Retry in " 

83 f"{retry_in_s:.0f}s." 

84 ), 

85 "", 

86 ) 

87 self.role = role 

88 self.retry_in_s = retry_in_s 

89 

90 

91@dataclass 

92class _Role: 

93 """Per-role registration: how to spawn it plus its live channel (if any). 

94 

95 ``degraded_until`` is the monotonic timestamp at which the role exits 

96 cooldown; 0.0 means "not degraded". Compared to ``time.monotonic()`` 

97 inside ``_ensure_channel`` to decide whether the next call gets the 

98 half-open attempt. 

99 """ 

100 

101 name: WorkerRole 

102 worker_main: WorkerEntrypoint 

103 config_factory: Callable[[], RoleConfig] 

104 channel: WorkerChannel | None = None 

105 spawn_lock: asyncio.Lock = field(default_factory=asyncio.Lock) 

106 last_used: float = 0.0 

107 crash_history: deque[float] = field(default_factory=deque) 

108 degraded_until: float = 0.0 

109 

110 

111class RoleAccessor: 

112 """Per-role facade returned by ``pool.<role>``. 

113 

114 Wraps the lazy-spawn dance so callers write ``await pool.embed.call(...)`` 

115 instead of ``await pool._spawn_then_call("embed", ...)``. Per-role 

116 workers in subsequent commits add typed convenience methods (e.g. 

117 ``await pool.embed.batch(texts)``) on top of the generic ``call`` / 

118 ``stream`` here. 

119 """ 

120 

121 def __init__(self, pool: WorkerPool, role: WorkerRole) -> None: 

122 self._pool = pool 

123 self._role = role 

124 

125 async def call( 

126 self, 

127 kind: WireKind, 

128 payload: object, 

129 *, 

130 timeout: float = _DEFAULT_CALL_TIMEOUT_S, 

131 ) -> object: 

132 """Lazy-spawn the worker on first call, then dispatch one request.""" 

133 channel = await self._pool._ensure_channel(self._role) 

134 try: 

135 result = await channel.call(kind, payload, timeout=timeout) 

136 except WorkerCrashError: 

137 await self._pool._on_crash(self._role) 

138 raise 

139 self._pool._stamp_used(self._role) 

140 return result 

141 

142 def stream(self, kind: WireKind, payload: object) -> object: 

143 """Lazy-spawn (synchronously async) and return the channel's async iterator. 

144 

145 The returned object is the ``stream`` async iterator from the 

146 underlying :class:`WorkerChannel`; the spawn step is folded into 

147 the iterator's first ``__anext__`` via :func:`_spawn_and_stream`. 

148 """ 

149 return _spawn_and_stream(self._pool, self._role, kind, payload) 

150 

151 async def ping(self, *, timeout: float) -> None: 

152 """Health check: lazy-spawn if needed, then ping over the dedicated health pipe. 

153 

154 Pings travel on a separate pipe from data so they cannot interleave 

155 with stream chunks; no in-flight skip needed. 

156 """ 

157 channel = await self._pool._ensure_channel(self._role) 

158 try: 

159 await channel.ping(timeout=timeout) 

160 except WorkerCrashError: 

161 await self._pool._on_crash(self._role) 

162 raise 

163 self._pool._stamp_used(self._role) 

164 

165 def cancel(self) -> None: 

166 """Flip the worker's abort flag if it is alive; no-op otherwise.""" 

167 channel = self._pool._channel_if_alive(self._role) 

168 if channel is not None: 

169 channel.cancel() 

170 

171 def clear_abort(self) -> None: 

172 """Reset the worker's abort flag if it is alive; no-op otherwise.""" 

173 channel = self._pool._channel_if_alive(self._role) 

174 if channel is not None: 

175 channel.clear_abort() 

176 

177 @property 

178 def is_alive(self) -> bool: 

179 """True iff the worker has been spawned and its process is alive.""" 

180 return self._pool._channel_if_alive(self._role) is not None 

181 

182 

183async def _spawn_and_stream( 

184 pool: WorkerPool, 

185 role: WorkerRole, 

186 kind: WireKind, 

187 payload: object, 

188) -> AsyncIterator[object]: 

189 """Async generator that spawns the worker on first iteration, then streams.""" 

190 channel = await pool._ensure_channel(role) 

191 try: 

192 async for chunk in channel.stream(kind, payload): 

193 yield chunk 

194 except WorkerCrashError: 

195 await pool._on_crash(role) 

196 raise 

197 pool._stamp_used(role) 

198 

199 

200class WorkerPool: 

201 """Owns every long-lived worker process. 

202 

203 Constructor takes an explicit *spawner* so tests can plug in an 

204 in-process fake (see ``tests/test_worker_pool.py``). Production callers 

205 pass nothing and get the default :class:`PipeSpawner`. 

206 

207 Roles are registered with :meth:`register` before the first call; the 

208 pool itself does not import the per-role worker entrypoint modules so 

209 we keep the worker-side and parent-side code paths clearly separated. 

210 Registration is intentionally explicit: callers (typically ``Services``) 

211 decide which roles exist in this process. 

212 """ 

213 

214 def __init__( 

215 self, 

216 *, 

217 spawner: WorkerSpawner | None = None, 

218 max_idle_s: float = _DEFAULT_MAX_IDLE_S, 

219 ) -> None: 

220 self._spawner: WorkerSpawner = spawner if spawner is not None else PipeSpawner() 

221 self._roles: dict[WorkerRole, _Role] = {} 

222 self._shutdown = False 

223 self._shutdown_lock = asyncio.Lock() 

224 self._max_idle_s = max_idle_s 

225 # Listeners fire around every spawn, including respawns after a 

226 # crash. Both default to empty so headless callers (CLI, server, 

227 # tests) pay nothing. 

228 self._on_role_spawning: list[Callable[[WorkerRole], None]] = [] 

229 self._on_role_spawned: list[Callable[[WorkerRole], None]] = [] 

230 

231 def add_listener( 

232 self, 

233 *, 

234 on_spawning: Callable[[WorkerRole], None] | None = None, 

235 on_spawned: Callable[[WorkerRole], None] | None = None, 

236 ) -> None: 

237 """Register callbacks fired immediately before / after a role spawn. 

238 

239 Both arguments are optional; pass either or both. The callbacks are 

240 invoked from the pool runtime thread, so consumers wanting to touch a 

241 UI must marshal to their own loop (e.g. ``app.call_from_thread``). 

242 """ 

243 if on_spawning is not None: 

244 self._on_role_spawning.append(on_spawning) 

245 if on_spawned is not None: 

246 self._on_role_spawned.append(on_spawned) 

247 

248 def register( 

249 self, 

250 role: WorkerRole, 

251 worker_main: WorkerEntrypoint, 

252 config_factory: Callable[[], RoleConfig], 

253 ) -> RoleAccessor: 

254 """Register a role's worker entrypoint and return its accessor. 

255 

256 ``config_factory`` is called every time the role spawns (lazy or 

257 on restart) so model swaps in cfg propagate without an explicit 

258 invalidation: the next spawn picks up whatever 

259 ``config_factory()`` returns. 

260 """ 

261 if role in self._roles: 

262 raise ValueError(f"Role {role!r} is already registered on this pool.") 

263 self._roles[role] = _Role( 

264 name=role, 

265 worker_main=worker_main, 

266 config_factory=config_factory, 

267 ) 

268 return RoleAccessor(self, role) 

269 

270 def accessor(self, role: WorkerRole) -> RoleAccessor: 

271 """Return the :class:`RoleAccessor` for *role*; must already be registered.""" 

272 if role not in self._roles: 

273 raise KeyError(f"Role {role!r} is not registered on this pool.") 

274 return RoleAccessor(self, role) 

275 

276 @property 

277 def registered_roles(self) -> tuple[WorkerRole, ...]: 

278 """Names of every role registered on this pool, in registration order.""" 

279 return tuple(self._roles) 

280 

281 async def start_eager(self) -> None: 

282 """Spawn every registered role concurrently; raise on first spawn failure. 

283 

284 Optional. Most callers rely on lazy spawn via the accessors. Use 

285 this when you want to absorb the per-worker cold-start cost up 

286 front (e.g. just after the TUI mounts so the first user action 

287 does not also pay for spawn). 

288 """ 

289 self._raise_if_shutdown() 

290 await asyncio.gather(*(self._ensure_channel(role) for role in self._roles)) 

291 

292 async def shutdown(self, *, timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_S) -> None: 

293 """Send shutdown to every live worker, terminate stragglers past *timeout*. 

294 

295 Idempotent. Safe to register on ``atexit``. 

296 """ 

297 async with self._shutdown_lock: 

298 if self._shutdown: 

299 return 

300 self._shutdown = True 

301 live: list[WorkerChannel] = [ 

302 role.channel for role in self._roles.values() if role.channel is not None 

303 ] 

304 for role in self._roles.values(): 

305 role.channel = None 

306 await asyncio.gather( 

307 *(channel.close(timeout=timeout) for channel in live), 

308 return_exceptions=True, 

309 ) 

310 

311 async def _ensure_channel(self, role: WorkerRole) -> WorkerChannel: 

312 """Return the role's live channel, spawning on first use, cooldown, or crash. 

313 

314 Half-open behavior: if a role is cooling down past its 

315 ``degraded_until`` deadline, one respawn attempt is allowed. A 

316 successful spawn clears the cooldown; a fresh crash extends it. See 

317 ``_DEGRADED_COOLDOWN_S``. 

318 """ 

319 self._raise_if_shutdown() 

320 registration = self._roles.get(role) 

321 if registration is None: 

322 raise KeyError(f"Role {role!r} is not registered on this pool.") 

323 self._refuse_or_clear_cooldown(role, registration) 

324 if registration.channel is not None and registration.channel.is_alive: 

325 return registration.channel 

326 async with registration.spawn_lock: 

327 if registration.channel is not None and registration.channel.is_alive: 

328 return registration.channel 

329 self._refuse_or_clear_cooldown(role, registration) 

330 self._raise_if_shutdown() 

331 self._fire_listeners(self._on_role_spawning, role) 

332 channel, _handle = await asyncio.get_running_loop().run_in_executor( 

333 None, 

334 self._spawner.spawn, 

335 registration.worker_main, 

336 registration.config_factory(), 

337 ) 

338 registration.channel = channel 

339 registration.last_used = time.monotonic() 

340 self._fire_listeners(self._on_role_spawned, role) 

341 log.info("Worker pool spawned role=%s pid=%s", role, channel.pid) 

342 return channel 

343 

344 def _fire_listeners( 

345 self, listeners: list[Callable[[WorkerRole], None]], role: WorkerRole 

346 ) -> None: 

347 """Invoke every registered listener; one bad listener does not break the pool.""" 

348 for listener in listeners: 

349 try: 

350 listener(role) 

351 except Exception: 

352 log.exception("Worker pool listener for role=%s raised", role) 

353 

354 def _stamp_used(self, role: WorkerRole) -> None: 

355 """Update *role*'s ``last_used`` timestamp; called by the accessor.""" 

356 registration = self._roles.get(role) 

357 if registration is not None: 

358 registration.last_used = time.monotonic() 

359 

360 def _channel_if_alive(self, role: WorkerRole) -> WorkerChannel | None: 

361 """Return the role's live channel without spawning; None if absent or dead.""" 

362 registration = self._roles.get(role) 

363 if registration is None or registration.channel is None: 

364 return None 

365 if not registration.channel.is_alive: 

366 return None 

367 return registration.channel 

368 

369 def detach_channel(self, role: WorkerRole) -> WorkerChannel | None: 

370 """Atomically clear *role*'s channel; return the prior live channel or None. 

371 

372 Caller owns the returned channel and closes it (typically by submitting 

373 ``channel.close(...)`` on the pool runtime). Other roles are untouched. 

374 The next ``_ensure_channel`` call for this role lazy-respawns with the 

375 current ``config_factory()`` snapshot. 

376 """ 

377 registration = self._roles.get(role) 

378 if registration is None: 

379 return None 

380 channel = registration.channel 

381 registration.channel = None 

382 return channel 

383 

384 def _refuse_or_clear_cooldown(self, role: WorkerRole, registration: _Role) -> None: 

385 """Raise ``RoleDegradedError`` if still cooling down; clear if past deadline. 

386 

387 Centralizes the half-open check so both early bail-outs in 

388 ``_ensure_channel`` apply identical logic. 

389 """ 

390 if registration.degraded_until <= 0.0: 

391 return 

392 now = time.monotonic() 

393 if now >= registration.degraded_until: 

394 # Cooldown expired: half-open. Reset crash history so the next 

395 # spawn attempt is judged on its own merit; if it crashes the 

396 # role re-enters cooldown via ``_on_crash``. 

397 registration.degraded_until = 0.0 

398 registration.crash_history.clear() 

399 log.info("Worker pool role=%s cooldown elapsed; allowing one retry", role) 

400 return 

401 raise RoleDegradedError( 

402 role, 

403 len(registration.crash_history), 

404 _RESTART_WINDOW_S, 

405 registration.degraded_until - now, 

406 ) 

407 

408 async def _on_crash(self, role: WorkerRole) -> None: 

409 """Drop a crashed channel; arm a cooldown if the restart budget is exhausted.""" 

410 registration = self._roles.get(role) 

411 if registration is None: 

412 return 

413 async with registration.spawn_lock: 

414 channel = registration.channel 

415 registration.channel = None 

416 now = time.monotonic() 

417 cutoff = now - _RESTART_WINDOW_S 

418 while registration.crash_history and registration.crash_history[0] < cutoff: 

419 registration.crash_history.popleft() 

420 registration.crash_history.append(now) 

421 if len(registration.crash_history) > _RESTART_BUDGET: 

422 registration.degraded_until = now + _DEGRADED_COOLDOWN_S 

423 log.error( 

424 "Worker pool role=%s cooling down for %.0fs after %d crashes in %.0fs", 

425 role, 

426 _DEGRADED_COOLDOWN_S, 

427 len(registration.crash_history), 

428 _RESTART_WINDOW_S, 

429 ) 

430 if channel is not None: 

431 with contextlib.suppress(WorkerError): 

432 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S) 

433 

434 def reset_role_failures(self, role: WorkerRole) -> None: 

435 """Clear *role*'s crash history and any active cooldown. 

436 

437 Public hook for an explicit "force a retry now" affordance. The 

438 circuit breaker already auto-recovers after ``_DEGRADED_COOLDOWN_S``; 

439 this is the manual override for users who don't want to wait. 

440 """ 

441 registration = self._roles.get(role) 

442 if registration is None: 

443 return 

444 registration.crash_history.clear() 

445 registration.degraded_until = 0.0 

446 

447 def is_degraded(self, role: WorkerRole) -> bool: 

448 """Return True iff *role* is currently inside a crash cooldown.""" 

449 registration = self._roles.get(role) 

450 if registration is None: 

451 return False 

452 return registration.degraded_until > 0.0 and time.monotonic() < registration.degraded_until 

453 

454 async def reap_idle(self) -> tuple[WorkerRole, ...]: 

455 """Close any role idle longer than ``max_idle_s`` with zero in-flight. 

456 

457 Caller (typically a background async task) decides cadence; the 

458 pool does not own a recurring timer because the host TUI's loop 

459 is the authoritative scheduler. 

460 

461 Returns the role names that were reaped (informational; useful 

462 for tests). No-op when ``max_idle_s == 0``. 

463 """ 

464 if self._max_idle_s <= 0.0: 

465 return () 

466 now = time.monotonic() 

467 reaped: list[WorkerRole] = [] 

468 for role_name, registration in list(self._roles.items()): 

469 channel = registration.channel 

470 if channel is None or not channel.is_alive: 

471 continue 

472 if channel.in_flight > 0: 

473 continue 

474 if registration.last_used <= 0.0: 

475 continue 

476 if now - registration.last_used < self._max_idle_s: 

477 continue 

478 async with registration.spawn_lock: 

479 # Re-check inside the lock; another coroutine may have just used it. 

480 if channel.in_flight > 0: 

481 continue 

482 if now - registration.last_used < self._max_idle_s: 

483 continue 

484 registration.channel = None 

485 with contextlib.suppress(WorkerError): 

486 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S) 

487 log.info( 

488 "Worker pool reaped idle role=%s after %.0fs", 

489 role_name, 

490 now - registration.last_used, 

491 ) 

492 reaped.append(role_name) 

493 return tuple(reaped) 

494 

495 async def ping_role( 

496 self, 

497 role: WorkerRole, 

498 *, 

499 timeout: float | None = None, 

500 ) -> None: 

501 """Round-trip a ping/pong against *role*; raise on timeout / crash. 

502 

503 ``timeout`` defaults to the module-level ``_HEALTH_TIMEOUT_S``. 

504 Spawns the worker on first use, same as a real call. Caller 

505 (typically a background health monitor) decides cadence and 

506 whether to respond by reaping/restarting; this method only 

507 propagates the round-trip outcome. 

508 """ 

509 accessor = self.accessor(role) 

510 budget = timeout if timeout is not None else _HEALTH_TIMEOUT_S 

511 await accessor.ping(timeout=budget) 

512 

513 async def release(self, role: WorkerRole) -> None: 

514 """Close *role*'s live worker and forget the registration entirely. 

515 

516 Used by callers (notably ``LlamaCppProvider.invalidate_load_cache``) 

517 that want the next request to respawn with a fresh model picked 

518 from the current cfg. The role can be re-registered immediately 

519 after with :meth:`register`. No-op if the role is unregistered. 

520 """ 

521 registration = self._roles.pop(role, None) 

522 if registration is None: 

523 return 

524 channel = registration.channel 

525 registration.channel = None 

526 if channel is not None: 

527 with contextlib.suppress(WorkerError): 

528 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S) 

529 

530 def _raise_if_shutdown(self) -> None: 

531 if self._shutdown: 

532 raise PoolShutdownError() 

533 

534 

535class PoolRuntime: 

536 """Background asyncio loop dedicated to a single :class:`WorkerPool`. 

537 

538 Sync callers (``LlamaCppProvider.embed`` and friends) invoke pool 

539 coroutines via :meth:`run_sync`, which submits them onto this loop 

540 and blocks the caller's thread for the result. Because every pool 

541 operation runs on the same loop, the per-role asyncio.Lock instances 

542 inside :class:`_Role` retain their semantics across concurrent 

543 sync callers. 

544 

545 Constructed once per pool. :meth:`shutdown` stops the loop and 

546 joins the thread; subsequent :meth:`run_sync` calls raise 

547 :class:`PoolShutdownError`. 

548 """ 

549 

550 def __init__(self) -> None: 

551 self._loop: asyncio.AbstractEventLoop | None = None 

552 self._thread: threading.Thread | None = None 

553 self._ready = threading.Event() 

554 self._stopped = False 

555 self._lock = threading.Lock() 

556 

557 def start(self) -> None: 

558 """Spin up the background thread + loop. Idempotent.""" 

559 with self._lock: 

560 if self._thread is not None: 

561 return 

562 if self._stopped: 

563 raise PoolShutdownError() 

564 self._thread = threading.Thread( 

565 target=self._run_loop, 

566 name=_RUNTIME_THREAD_NAME, 

567 daemon=True, 

568 ) 

569 self._thread.start() 

570 self._ready.wait() 

571 

572 def _run_loop(self) -> None: 

573 loop = asyncio.new_event_loop() 

574 self._loop = loop 

575 asyncio.set_event_loop(loop) 

576 self._ready.set() 

577 try: 

578 loop.run_forever() 

579 finally: 

580 # Cancel and drain pending tasks while the loop is still alive. 

581 # Otherwise an in-flight RoleAccessor.call awaiting an asyncio.Queue 

582 # gets GC'd after loop.close() and its CancelledError cleanup tries 

583 # to call_soon on a closed loop, spamming "Event loop is closed" 

584 # tracebacks during Ctrl-C shutdown. 

585 try: 

586 pending = [t for t in asyncio.all_tasks(loop) if not t.done()] 

587 for task in pending: 

588 task.cancel() 

589 if pending: 

590 loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) 

591 loop.run_until_complete(loop.shutdown_asyncgens()) 

592 except Exception: 

593 log.exception("Pool runtime loop drain failed; closing anyway") 

594 finally: 

595 loop.close() 

596 

597 def run_sync(self, coro: Coroutine[Any, Any, _T], *, timeout: float | None = None) -> _T: 

598 """Submit *coro* to the background loop and block for the result. 

599 

600 On timeout, cancels the underlying asyncio task before raising so 

601 the loop does not log "Task was destroyed but it is pending". 

602 """ 

603 if self._stopped: 

604 coro.close() 

605 raise PoolShutdownError() 

606 if self._thread is None: 

607 self.start() 

608 loop = self._loop 

609 assert loop is not None # _ready signaled, loop is set # noqa: S101 

610 future: Future[_T] = asyncio.run_coroutine_threadsafe(coro, loop) 

611 try: 

612 return future.result(timeout=timeout) 

613 except BaseException: 

614 future.cancel() 

615 raise 

616 

617 def submit(self, coro: Coroutine[Any, Any, _T]) -> Future[_T]: 

618 """Schedule *coro* on the background loop without blocking the caller. 

619 

620 Returns the :class:`concurrent.futures.Future` for the call so the 

621 caller can await it (via :func:`asyncio.wrap_future` from another 

622 loop) or cancel it. Used by the Services-owned health ticker so a 

623 long pool ping does not stall the bg-loop. 

624 """ 

625 if self._stopped: 

626 coro.close() 

627 raise PoolShutdownError() 

628 if self._thread is None: 

629 self.start() 

630 loop = self._loop 

631 assert loop is not None # _ready signaled, loop is set # noqa: S101 

632 return asyncio.run_coroutine_threadsafe(coro, loop) 

633 

634 def shutdown(self, *, timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_S) -> None: 

635 """Stop the loop and join the thread. Idempotent.""" 

636 with self._lock: 

637 if self._stopped: 

638 return 

639 self._stopped = True 

640 loop = self._loop 

641 thread = self._thread 

642 if loop is not None: 

643 loop.call_soon_threadsafe(loop.stop) 

644 if thread is not None: 

645 thread.join(timeout=timeout) 

646 

647 

648def shutdown_pool_runtime( 

649 pool: WorkerPool, 

650 runtime: PoolRuntime, 

651 ticker: HealthTickerHandle, 

652 *, 

653 drain_timeout_s: float = 10.0, 

654 runtime_timeout_s: float = 5.0, 

655) -> None: 

656 """Stop the health ticker, drain the pool through the runtime, then stop the runtime. 

657 

658 Order matters: cancel the ticker first so it cannot schedule a fresh 

659 pool op against a draining runtime; then drain the pool via the 

660 runtime; then stop the runtime thread. Idempotent. 

661 """ 

662 from lilbee.providers.worker.health_ticker import stop_health_ticker 

663 

664 stop_health_ticker(ticker) 

665 try: 

666 runtime.run_sync(pool.shutdown(), timeout=drain_timeout_s) 

667 except (TimeoutError, RuntimeError, OSError) as exc: 

668 log.warning("Pool shutdown raised %s; forcing runtime stop", exc) 

669 runtime.shutdown(timeout=runtime_timeout_s) 

670 

671 

672__all__ = [ 

673 "PoolRuntime", 

674 "PoolShutdownError", 

675 "RoleAccessor", 

676 "WorkerPool", 

677 "shutdown_pool_runtime", 

678]