Coverage for src / lilbee / providers / worker / pool.py: 100%
324 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Lifecycle for per-role inference subprocess workers.
3Owns the embed, chat, rerank, and vision worker processes. Lifecycle
4contract, restart-budget policy, idle reaping, and health pings are
5documented in ``docs/architecture.md`` under "Inference worker pool".
6"""
8from __future__ import annotations
10import asyncio
11import contextlib
12import logging
13import threading
14import time
15from collections import deque
16from collections.abc import AsyncIterator, Callable, Coroutine
17from concurrent.futures import Future
18from dataclasses import dataclass, field
19from typing import TYPE_CHECKING, Any, TypeVar
21from lilbee.providers.worker.transport import (
22 RoleConfig,
23 WorkerChannel,
24 WorkerEntrypoint,
25 WorkerRole,
26 WorkerSpawner,
27)
28from lilbee.providers.worker.transport_pipe import (
29 PipeSpawner,
30 WorkerCrashError,
31 WorkerError,
32)
33from lilbee.providers.worker.wire_kinds import WireKind
35if TYPE_CHECKING:
36 # circular: health_ticker -> pool via WorkerPool/PoolRuntime
37 from lilbee.providers.worker.health_ticker import HealthTickerHandle
39log = logging.getLogger(__name__)
41_T = TypeVar("_T")
44_DEFAULT_SHUTDOWN_TIMEOUT_S = 5.0
45_DEFAULT_CALL_TIMEOUT_S = 300.0
46_DEFAULT_MAX_IDLE_S = 0.0 # 0 = no idle reaping by default
47_HEALTH_TIMEOUT_S = 5.0
48_RESTART_BUDGET = 3
49_RESTART_WINDOW_S = 60.0
50# After tripping the crash budget the role enters a cooldown instead of a
51# permanent disable. When the cooldown expires the next call gets one
52# "half-open" attempt; success clears the crash history, failure re-arms the
53# cooldown. Matches the classic circuit-breaker pattern.
54_DEGRADED_COOLDOWN_S = 60.0
55_RUNTIME_THREAD_NAME = "lilbee-worker-pool-loop"
58class PoolShutdownError(WorkerError):
59 """Raised when a caller tries to use a pool that has been shut down."""
61 def __init__(self) -> None:
62 super().__init__(
63 "PoolShutdownError",
64 "Inference pool is shutting down. Please wait for current tasks to finish.",
65 "",
66 )
69class RoleDegradedError(WorkerError):
70 """Raised when a role is in cooldown after a burst of crashes.
72 Carries the cooldown deadline so the caller can hint the user when the
73 pool will accept another attempt. Older builds said "Restart lilbee to
74 recover"; today the pool auto-retries after ``_DEGRADED_COOLDOWN_S``.
75 """
77 def __init__(self, role: WorkerRole, attempts: int, window_s: float, retry_in_s: float) -> None:
78 super().__init__(
79 "RoleDegradedError",
80 (
81 f"The {role} worker crashed {attempts} times in the last "
82 f"{window_s:.0f}s and is cooling down. Retry in "
83 f"{retry_in_s:.0f}s."
84 ),
85 "",
86 )
87 self.role = role
88 self.retry_in_s = retry_in_s
91@dataclass
92class _Role:
93 """Per-role registration: how to spawn it plus its live channel (if any).
95 ``degraded_until`` is the monotonic timestamp at which the role exits
96 cooldown; 0.0 means "not degraded". Compared to ``time.monotonic()``
97 inside ``_ensure_channel`` to decide whether the next call gets the
98 half-open attempt.
99 """
101 name: WorkerRole
102 worker_main: WorkerEntrypoint
103 config_factory: Callable[[], RoleConfig]
104 channel: WorkerChannel | None = None
105 spawn_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
106 last_used: float = 0.0
107 crash_history: deque[float] = field(default_factory=deque)
108 degraded_until: float = 0.0
111class RoleAccessor:
112 """Per-role facade returned by ``pool.<role>``.
114 Wraps the lazy-spawn dance so callers write ``await pool.embed.call(...)``
115 instead of ``await pool._spawn_then_call("embed", ...)``. Per-role
116 workers in subsequent commits add typed convenience methods (e.g.
117 ``await pool.embed.batch(texts)``) on top of the generic ``call`` /
118 ``stream`` here.
119 """
121 def __init__(self, pool: WorkerPool, role: WorkerRole) -> None:
122 self._pool = pool
123 self._role = role
125 async def call(
126 self,
127 kind: WireKind,
128 payload: object,
129 *,
130 timeout: float = _DEFAULT_CALL_TIMEOUT_S,
131 ) -> object:
132 """Lazy-spawn the worker on first call, then dispatch one request."""
133 channel = await self._pool._ensure_channel(self._role)
134 try:
135 result = await channel.call(kind, payload, timeout=timeout)
136 except WorkerCrashError:
137 await self._pool._on_crash(self._role)
138 raise
139 self._pool._stamp_used(self._role)
140 return result
142 def stream(self, kind: WireKind, payload: object) -> object:
143 """Lazy-spawn (synchronously async) and return the channel's async iterator.
145 The returned object is the ``stream`` async iterator from the
146 underlying :class:`WorkerChannel`; the spawn step is folded into
147 the iterator's first ``__anext__`` via :func:`_spawn_and_stream`.
148 """
149 return _spawn_and_stream(self._pool, self._role, kind, payload)
151 async def ping(self, *, timeout: float) -> None:
152 """Health check: lazy-spawn if needed, then ping over the dedicated health pipe.
154 Pings travel on a separate pipe from data so they cannot interleave
155 with stream chunks; no in-flight skip needed.
156 """
157 channel = await self._pool._ensure_channel(self._role)
158 try:
159 await channel.ping(timeout=timeout)
160 except WorkerCrashError:
161 await self._pool._on_crash(self._role)
162 raise
163 self._pool._stamp_used(self._role)
165 def cancel(self) -> None:
166 """Flip the worker's abort flag if it is alive; no-op otherwise."""
167 channel = self._pool._channel_if_alive(self._role)
168 if channel is not None:
169 channel.cancel()
171 def clear_abort(self) -> None:
172 """Reset the worker's abort flag if it is alive; no-op otherwise."""
173 channel = self._pool._channel_if_alive(self._role)
174 if channel is not None:
175 channel.clear_abort()
177 @property
178 def is_alive(self) -> bool:
179 """True iff the worker has been spawned and its process is alive."""
180 return self._pool._channel_if_alive(self._role) is not None
183async def _spawn_and_stream(
184 pool: WorkerPool,
185 role: WorkerRole,
186 kind: WireKind,
187 payload: object,
188) -> AsyncIterator[object]:
189 """Async generator that spawns the worker on first iteration, then streams."""
190 channel = await pool._ensure_channel(role)
191 try:
192 async for chunk in channel.stream(kind, payload):
193 yield chunk
194 except WorkerCrashError:
195 await pool._on_crash(role)
196 raise
197 pool._stamp_used(role)
200class WorkerPool:
201 """Owns every long-lived worker process.
203 Constructor takes an explicit *spawner* so tests can plug in an
204 in-process fake (see ``tests/test_worker_pool.py``). Production callers
205 pass nothing and get the default :class:`PipeSpawner`.
207 Roles are registered with :meth:`register` before the first call; the
208 pool itself does not import the per-role worker entrypoint modules so
209 we keep the worker-side and parent-side code paths clearly separated.
210 Registration is intentionally explicit: callers (typically ``Services``)
211 decide which roles exist in this process.
212 """
214 def __init__(
215 self,
216 *,
217 spawner: WorkerSpawner | None = None,
218 max_idle_s: float = _DEFAULT_MAX_IDLE_S,
219 ) -> None:
220 self._spawner: WorkerSpawner = spawner if spawner is not None else PipeSpawner()
221 self._roles: dict[WorkerRole, _Role] = {}
222 self._shutdown = False
223 self._shutdown_lock = asyncio.Lock()
224 self._max_idle_s = max_idle_s
225 # Listeners fire around every spawn, including respawns after a
226 # crash. Both default to empty so headless callers (CLI, server,
227 # tests) pay nothing.
228 self._on_role_spawning: list[Callable[[WorkerRole], None]] = []
229 self._on_role_spawned: list[Callable[[WorkerRole], None]] = []
231 def add_listener(
232 self,
233 *,
234 on_spawning: Callable[[WorkerRole], None] | None = None,
235 on_spawned: Callable[[WorkerRole], None] | None = None,
236 ) -> None:
237 """Register callbacks fired immediately before / after a role spawn.
239 Both arguments are optional; pass either or both. The callbacks are
240 invoked from the pool runtime thread, so consumers wanting to touch a
241 UI must marshal to their own loop (e.g. ``app.call_from_thread``).
242 """
243 if on_spawning is not None:
244 self._on_role_spawning.append(on_spawning)
245 if on_spawned is not None:
246 self._on_role_spawned.append(on_spawned)
248 def register(
249 self,
250 role: WorkerRole,
251 worker_main: WorkerEntrypoint,
252 config_factory: Callable[[], RoleConfig],
253 ) -> RoleAccessor:
254 """Register a role's worker entrypoint and return its accessor.
256 ``config_factory`` is called every time the role spawns (lazy or
257 on restart) so model swaps in cfg propagate without an explicit
258 invalidation: the next spawn picks up whatever
259 ``config_factory()`` returns.
260 """
261 if role in self._roles:
262 raise ValueError(f"Role {role!r} is already registered on this pool.")
263 self._roles[role] = _Role(
264 name=role,
265 worker_main=worker_main,
266 config_factory=config_factory,
267 )
268 return RoleAccessor(self, role)
270 def accessor(self, role: WorkerRole) -> RoleAccessor:
271 """Return the :class:`RoleAccessor` for *role*; must already be registered."""
272 if role not in self._roles:
273 raise KeyError(f"Role {role!r} is not registered on this pool.")
274 return RoleAccessor(self, role)
276 @property
277 def registered_roles(self) -> tuple[WorkerRole, ...]:
278 """Names of every role registered on this pool, in registration order."""
279 return tuple(self._roles)
281 async def start_eager(self) -> None:
282 """Spawn every registered role concurrently; raise on first spawn failure.
284 Optional. Most callers rely on lazy spawn via the accessors. Use
285 this when you want to absorb the per-worker cold-start cost up
286 front (e.g. just after the TUI mounts so the first user action
287 does not also pay for spawn).
288 """
289 self._raise_if_shutdown()
290 await asyncio.gather(*(self._ensure_channel(role) for role in self._roles))
292 async def shutdown(self, *, timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_S) -> None:
293 """Send shutdown to every live worker, terminate stragglers past *timeout*.
295 Idempotent. Safe to register on ``atexit``.
296 """
297 async with self._shutdown_lock:
298 if self._shutdown:
299 return
300 self._shutdown = True
301 live: list[WorkerChannel] = [
302 role.channel for role in self._roles.values() if role.channel is not None
303 ]
304 for role in self._roles.values():
305 role.channel = None
306 await asyncio.gather(
307 *(channel.close(timeout=timeout) for channel in live),
308 return_exceptions=True,
309 )
311 async def _ensure_channel(self, role: WorkerRole) -> WorkerChannel:
312 """Return the role's live channel, spawning on first use, cooldown, or crash.
314 Half-open behavior: if a role is cooling down past its
315 ``degraded_until`` deadline, one respawn attempt is allowed. A
316 successful spawn clears the cooldown; a fresh crash extends it. See
317 ``_DEGRADED_COOLDOWN_S``.
318 """
319 self._raise_if_shutdown()
320 registration = self._roles.get(role)
321 if registration is None:
322 raise KeyError(f"Role {role!r} is not registered on this pool.")
323 self._refuse_or_clear_cooldown(role, registration)
324 if registration.channel is not None and registration.channel.is_alive:
325 return registration.channel
326 async with registration.spawn_lock:
327 if registration.channel is not None and registration.channel.is_alive:
328 return registration.channel
329 self._refuse_or_clear_cooldown(role, registration)
330 self._raise_if_shutdown()
331 self._fire_listeners(self._on_role_spawning, role)
332 channel, _handle = await asyncio.get_running_loop().run_in_executor(
333 None,
334 self._spawner.spawn,
335 registration.worker_main,
336 registration.config_factory(),
337 )
338 registration.channel = channel
339 registration.last_used = time.monotonic()
340 self._fire_listeners(self._on_role_spawned, role)
341 log.info("Worker pool spawned role=%s pid=%s", role, channel.pid)
342 return channel
344 def _fire_listeners(
345 self, listeners: list[Callable[[WorkerRole], None]], role: WorkerRole
346 ) -> None:
347 """Invoke every registered listener; one bad listener does not break the pool."""
348 for listener in listeners:
349 try:
350 listener(role)
351 except Exception:
352 log.exception("Worker pool listener for role=%s raised", role)
354 def _stamp_used(self, role: WorkerRole) -> None:
355 """Update *role*'s ``last_used`` timestamp; called by the accessor."""
356 registration = self._roles.get(role)
357 if registration is not None:
358 registration.last_used = time.monotonic()
360 def _channel_if_alive(self, role: WorkerRole) -> WorkerChannel | None:
361 """Return the role's live channel without spawning; None if absent or dead."""
362 registration = self._roles.get(role)
363 if registration is None or registration.channel is None:
364 return None
365 if not registration.channel.is_alive:
366 return None
367 return registration.channel
369 def detach_channel(self, role: WorkerRole) -> WorkerChannel | None:
370 """Atomically clear *role*'s channel; return the prior live channel or None.
372 Caller owns the returned channel and closes it (typically by submitting
373 ``channel.close(...)`` on the pool runtime). Other roles are untouched.
374 The next ``_ensure_channel`` call for this role lazy-respawns with the
375 current ``config_factory()`` snapshot.
376 """
377 registration = self._roles.get(role)
378 if registration is None:
379 return None
380 channel = registration.channel
381 registration.channel = None
382 return channel
384 def _refuse_or_clear_cooldown(self, role: WorkerRole, registration: _Role) -> None:
385 """Raise ``RoleDegradedError`` if still cooling down; clear if past deadline.
387 Centralizes the half-open check so both early bail-outs in
388 ``_ensure_channel`` apply identical logic.
389 """
390 if registration.degraded_until <= 0.0:
391 return
392 now = time.monotonic()
393 if now >= registration.degraded_until:
394 # Cooldown expired: half-open. Reset crash history so the next
395 # spawn attempt is judged on its own merit; if it crashes the
396 # role re-enters cooldown via ``_on_crash``.
397 registration.degraded_until = 0.0
398 registration.crash_history.clear()
399 log.info("Worker pool role=%s cooldown elapsed; allowing one retry", role)
400 return
401 raise RoleDegradedError(
402 role,
403 len(registration.crash_history),
404 _RESTART_WINDOW_S,
405 registration.degraded_until - now,
406 )
408 async def _on_crash(self, role: WorkerRole) -> None:
409 """Drop a crashed channel; arm a cooldown if the restart budget is exhausted."""
410 registration = self._roles.get(role)
411 if registration is None:
412 return
413 async with registration.spawn_lock:
414 channel = registration.channel
415 registration.channel = None
416 now = time.monotonic()
417 cutoff = now - _RESTART_WINDOW_S
418 while registration.crash_history and registration.crash_history[0] < cutoff:
419 registration.crash_history.popleft()
420 registration.crash_history.append(now)
421 if len(registration.crash_history) > _RESTART_BUDGET:
422 registration.degraded_until = now + _DEGRADED_COOLDOWN_S
423 log.error(
424 "Worker pool role=%s cooling down for %.0fs after %d crashes in %.0fs",
425 role,
426 _DEGRADED_COOLDOWN_S,
427 len(registration.crash_history),
428 _RESTART_WINDOW_S,
429 )
430 if channel is not None:
431 with contextlib.suppress(WorkerError):
432 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S)
434 def reset_role_failures(self, role: WorkerRole) -> None:
435 """Clear *role*'s crash history and any active cooldown.
437 Public hook for an explicit "force a retry now" affordance. The
438 circuit breaker already auto-recovers after ``_DEGRADED_COOLDOWN_S``;
439 this is the manual override for users who don't want to wait.
440 """
441 registration = self._roles.get(role)
442 if registration is None:
443 return
444 registration.crash_history.clear()
445 registration.degraded_until = 0.0
447 def is_degraded(self, role: WorkerRole) -> bool:
448 """Return True iff *role* is currently inside a crash cooldown."""
449 registration = self._roles.get(role)
450 if registration is None:
451 return False
452 return registration.degraded_until > 0.0 and time.monotonic() < registration.degraded_until
454 async def reap_idle(self) -> tuple[WorkerRole, ...]:
455 """Close any role idle longer than ``max_idle_s`` with zero in-flight.
457 Caller (typically a background async task) decides cadence; the
458 pool does not own a recurring timer because the host TUI's loop
459 is the authoritative scheduler.
461 Returns the role names that were reaped (informational; useful
462 for tests). No-op when ``max_idle_s == 0``.
463 """
464 if self._max_idle_s <= 0.0:
465 return ()
466 now = time.monotonic()
467 reaped: list[WorkerRole] = []
468 for role_name, registration in list(self._roles.items()):
469 channel = registration.channel
470 if channel is None or not channel.is_alive:
471 continue
472 if channel.in_flight > 0:
473 continue
474 if registration.last_used <= 0.0:
475 continue
476 if now - registration.last_used < self._max_idle_s:
477 continue
478 async with registration.spawn_lock:
479 # Re-check inside the lock; another coroutine may have just used it.
480 if channel.in_flight > 0:
481 continue
482 if now - registration.last_used < self._max_idle_s:
483 continue
484 registration.channel = None
485 with contextlib.suppress(WorkerError):
486 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S)
487 log.info(
488 "Worker pool reaped idle role=%s after %.0fs",
489 role_name,
490 now - registration.last_used,
491 )
492 reaped.append(role_name)
493 return tuple(reaped)
495 async def ping_role(
496 self,
497 role: WorkerRole,
498 *,
499 timeout: float | None = None,
500 ) -> None:
501 """Round-trip a ping/pong against *role*; raise on timeout / crash.
503 ``timeout`` defaults to the module-level ``_HEALTH_TIMEOUT_S``.
504 Spawns the worker on first use, same as a real call. Caller
505 (typically a background health monitor) decides cadence and
506 whether to respond by reaping/restarting; this method only
507 propagates the round-trip outcome.
508 """
509 accessor = self.accessor(role)
510 budget = timeout if timeout is not None else _HEALTH_TIMEOUT_S
511 await accessor.ping(timeout=budget)
513 async def release(self, role: WorkerRole) -> None:
514 """Close *role*'s live worker and forget the registration entirely.
516 Used by callers (notably ``LlamaCppProvider.invalidate_load_cache``)
517 that want the next request to respawn with a fresh model picked
518 from the current cfg. The role can be re-registered immediately
519 after with :meth:`register`. No-op if the role is unregistered.
520 """
521 registration = self._roles.pop(role, None)
522 if registration is None:
523 return
524 channel = registration.channel
525 registration.channel = None
526 if channel is not None:
527 with contextlib.suppress(WorkerError):
528 await channel.close(timeout=_DEFAULT_SHUTDOWN_TIMEOUT_S)
530 def _raise_if_shutdown(self) -> None:
531 if self._shutdown:
532 raise PoolShutdownError()
535class PoolRuntime:
536 """Background asyncio loop dedicated to a single :class:`WorkerPool`.
538 Sync callers (``LlamaCppProvider.embed`` and friends) invoke pool
539 coroutines via :meth:`run_sync`, which submits them onto this loop
540 and blocks the caller's thread for the result. Because every pool
541 operation runs on the same loop, the per-role asyncio.Lock instances
542 inside :class:`_Role` retain their semantics across concurrent
543 sync callers.
545 Constructed once per pool. :meth:`shutdown` stops the loop and
546 joins the thread; subsequent :meth:`run_sync` calls raise
547 :class:`PoolShutdownError`.
548 """
550 def __init__(self) -> None:
551 self._loop: asyncio.AbstractEventLoop | None = None
552 self._thread: threading.Thread | None = None
553 self._ready = threading.Event()
554 self._stopped = False
555 self._lock = threading.Lock()
557 def start(self) -> None:
558 """Spin up the background thread + loop. Idempotent."""
559 with self._lock:
560 if self._thread is not None:
561 return
562 if self._stopped:
563 raise PoolShutdownError()
564 self._thread = threading.Thread(
565 target=self._run_loop,
566 name=_RUNTIME_THREAD_NAME,
567 daemon=True,
568 )
569 self._thread.start()
570 self._ready.wait()
572 def _run_loop(self) -> None:
573 loop = asyncio.new_event_loop()
574 self._loop = loop
575 asyncio.set_event_loop(loop)
576 self._ready.set()
577 try:
578 loop.run_forever()
579 finally:
580 # Cancel and drain pending tasks while the loop is still alive.
581 # Otherwise an in-flight RoleAccessor.call awaiting an asyncio.Queue
582 # gets GC'd after loop.close() and its CancelledError cleanup tries
583 # to call_soon on a closed loop, spamming "Event loop is closed"
584 # tracebacks during Ctrl-C shutdown.
585 try:
586 pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
587 for task in pending:
588 task.cancel()
589 if pending:
590 loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
591 loop.run_until_complete(loop.shutdown_asyncgens())
592 except Exception:
593 log.exception("Pool runtime loop drain failed; closing anyway")
594 finally:
595 loop.close()
597 def run_sync(self, coro: Coroutine[Any, Any, _T], *, timeout: float | None = None) -> _T:
598 """Submit *coro* to the background loop and block for the result.
600 On timeout, cancels the underlying asyncio task before raising so
601 the loop does not log "Task was destroyed but it is pending".
602 """
603 if self._stopped:
604 coro.close()
605 raise PoolShutdownError()
606 if self._thread is None:
607 self.start()
608 loop = self._loop
609 assert loop is not None # _ready signaled, loop is set # noqa: S101
610 future: Future[_T] = asyncio.run_coroutine_threadsafe(coro, loop)
611 try:
612 return future.result(timeout=timeout)
613 except BaseException:
614 future.cancel()
615 raise
617 def submit(self, coro: Coroutine[Any, Any, _T]) -> Future[_T]:
618 """Schedule *coro* on the background loop without blocking the caller.
620 Returns the :class:`concurrent.futures.Future` for the call so the
621 caller can await it (via :func:`asyncio.wrap_future` from another
622 loop) or cancel it. Used by the Services-owned health ticker so a
623 long pool ping does not stall the bg-loop.
624 """
625 if self._stopped:
626 coro.close()
627 raise PoolShutdownError()
628 if self._thread is None:
629 self.start()
630 loop = self._loop
631 assert loop is not None # _ready signaled, loop is set # noqa: S101
632 return asyncio.run_coroutine_threadsafe(coro, loop)
634 def shutdown(self, *, timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_S) -> None:
635 """Stop the loop and join the thread. Idempotent."""
636 with self._lock:
637 if self._stopped:
638 return
639 self._stopped = True
640 loop = self._loop
641 thread = self._thread
642 if loop is not None:
643 loop.call_soon_threadsafe(loop.stop)
644 if thread is not None:
645 thread.join(timeout=timeout)
648def shutdown_pool_runtime(
649 pool: WorkerPool,
650 runtime: PoolRuntime,
651 ticker: HealthTickerHandle,
652 *,
653 drain_timeout_s: float = 10.0,
654 runtime_timeout_s: float = 5.0,
655) -> None:
656 """Stop the health ticker, drain the pool through the runtime, then stop the runtime.
658 Order matters: cancel the ticker first so it cannot schedule a fresh
659 pool op against a draining runtime; then drain the pool via the
660 runtime; then stop the runtime thread. Idempotent.
661 """
662 from lilbee.providers.worker.health_ticker import stop_health_ticker
664 stop_health_ticker(ticker)
665 try:
666 runtime.run_sync(pool.shutdown(), timeout=drain_timeout_s)
667 except (TimeoutError, RuntimeError, OSError) as exc:
668 log.warning("Pool shutdown raised %s; forcing runtime stop", exc)
669 runtime.shutdown(timeout=runtime_timeout_s)
672__all__ = [
673 "PoolRuntime",
674 "PoolShutdownError",
675 "RoleAccessor",
676 "WorkerPool",
677 "shutdown_pool_runtime",
678]