Coverage for src / lilbee / providers / worker / worker_runtime.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Cross-role helpers for worker subprocesses.
3Bootstraps the per-role workers (embed, chat, rerank, vision) and runs
4the recv loop. Health pings and graceful shutdown live on a dedicated
5daemon thread so a long inference call cannot starve liveness or stop
6the parent from terminating the worker.
7"""
9from __future__ import annotations
11import contextlib
12import logging
13import os
14import sys
15import threading
16from collections.abc import Callable
17from dataclasses import dataclass
18from typing import Any
20from lilbee.providers.worker.transport import RoleConfig, WorkerRole
21from lilbee.providers.worker.transport_pipe import _serialize_exception
22from lilbee.providers.worker.wire_kinds import WireKind
24log = logging.getLogger(__name__)
26# How often the main data loop wakes from poll() to re-check the shutdown
27# flag. 100 ms is well under any user-visible cancel budget but rare enough
28# not to show up as CPU noise.
29_DATA_POLL_INTERVAL_S = 0.1
31#: Subdirectory of ``cfg.data_root`` where per-role worker logs land.
32#: Shared with :mod:`lilbee.providers.worker.transport_pipe` so the parent's
33#: :class:`WorkerCrashError` points at the exact file the worker wrote.
34WORKER_LOGS_DIR_NAME = "logs"
37def redirect_stdio_to_devnull() -> None: # pragma: no cover - subprocess fd swap
38 """Send stdout/stderr to /dev/null so llama-cpp's C-level prints stay quiet."""
39 devnull_fd = os.open(os.devnull, os.O_RDWR)
40 os.dup2(devnull_fd, 1)
41 os.dup2(devnull_fd, 2)
42 os.close(devnull_fd)
43 sys.stdout = open(os.devnull, "w") # noqa: SIM115
44 sys.stderr = open(os.devnull, "w") # noqa: SIM115
47def configure_worker_logging(role: WorkerRole) -> None:
48 """Append worker logs to ``$LILBEE_DATA/logs/worker-<role>.log``.
50 ``LILBEE_DATA`` is canonicalized at cfg construction
51 (:func:`lilbee.core.config.model._build_cfg`) and at every write
52 site that switches the data root, so the env is the single source
53 of truth.
54 """
55 data_dir = os.environ.get("LILBEE_DATA")
56 if not data_dir:
57 return
58 logs_dir = os.path.join(data_dir, WORKER_LOGS_DIR_NAME)
59 with contextlib.suppress(OSError):
60 os.makedirs(logs_dir, exist_ok=True)
61 log_path = os.path.join(logs_dir, f"worker-{role}.log")
62 handler = logging.FileHandler(log_path)
63 handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
64 root = logging.getLogger()
65 root.addHandler(handler)
66 root.setLevel(logging.INFO)
69class Reply:
70 """Sender for the in-flight call's response frames on the data pipe.
72 Handlers receive a Reply instance and emit response frames via
73 ``reply.send(kind, payload)``. The parent's :class:`PipeChannel`
74 holds the call lock for the full request/reply or request/stream
75 window, so a frame sent here belongs to the call the worker is
76 currently servicing by construction.
77 """
79 def __init__(self, conn: Any) -> None:
80 self._conn = conn
82 def send(self, kind: WireKind, payload: Any) -> None:
83 """Send one response frame on the data pipe."""
84 self._conn.send((kind, payload))
87@dataclass
88class WorkerLoopState:
89 """Per-loop state shared by the dispatcher and the role's handlers.
91 Holds the role's lazy-loaded session. Each handler receives a
92 reference and pulls its model via ``state.session``.
93 """
95 session: Any
98KindHandler = Callable[[Reply, Any, WorkerLoopState], None]
99"""Per-role handler signature: ``(reply, payload, state) -> None``.
101Handlers reach their role-specific session via ``state.session`` and
102emit response frames via ``reply.send(kind, payload)``.
103"""
106def run_worker(
107 data_conn: Any,
108 health_conn: Any,
109 abort_flag: Any,
110 role_config: RoleConfig,
111 *,
112 session_factory: Callable[[RoleConfig, Any], Any],
113 kind_handlers: dict[WireKind, KindHandler],
114) -> None:
115 """Bootstrap stdio + logging, then run the recv loop until shutdown.
117 The control plane (ping, shutdown) travels on the health pipe and is
118 served by a daemon thread. The main loop polls the data pipe with a
119 short timeout so the shutdown flag set by the heartbeat thread fires
120 promptly even when no data frame is pending. Unknown data-pipe kinds
121 reply with a serialized ``ValueError``.
122 """
123 redirect_stdio_to_devnull()
124 configure_worker_logging(role_config.role)
125 log.info(
126 "%s worker online (pid=%s, model=%s)",
127 role_config.role,
128 os.getpid(),
129 role_config.model_path,
130 )
131 state = WorkerLoopState(session=session_factory(role_config, abort_flag))
132 shutdown_event = threading.Event()
133 heartbeat = _start_heartbeat_thread(health_conn, role_config.role, shutdown_event)
134 try:
135 while not shutdown_event.is_set():
136 if not _handle_data_frame(
137 data_conn,
138 state,
139 kind_handlers,
140 role_config.role,
141 shutdown_event,
142 ):
143 break
144 finally:
145 shutdown_event.set()
146 state.session.close()
147 with contextlib.suppress(Exception):
148 data_conn.close()
149 with contextlib.suppress(Exception):
150 health_conn.close()
151 heartbeat.join(timeout=1.0)
154def _start_heartbeat_thread(
155 health_conn: Any, role: WorkerRole, shutdown_event: threading.Event
156) -> threading.Thread:
157 """Spawn the daemon thread that owns the health pipe."""
158 thread = threading.Thread(
159 target=_heartbeat_loop,
160 args=(health_conn, role, shutdown_event),
161 name=f"lilbee-worker-{role}-heartbeat",
162 daemon=True,
163 )
164 thread.start()
165 return thread
168def _heartbeat_loop(health_conn: Any, role: WorkerRole, shutdown_event: threading.Event) -> None:
169 """Serve ping/shutdown on the health pipe until the parent closes it.
171 On SHUTDOWN, sets the shutdown event so the main thread exits its
172 poll loop within ``_DATA_POLL_INTERVAL_S`` regardless of whether a
173 data frame is pending. The ACK reply is best-effort: the parent's
174 close() suppresses pipe errors so a torn-down health connection
175 does not surface as a noisy supervisor warning.
176 """
177 while True:
178 try:
179 kind, _ = health_conn.recv()
180 except (EOFError, OSError):
181 shutdown_event.set()
182 return
183 if kind == WireKind.PING:
184 try:
185 health_conn.send((WireKind.PONG, None))
186 except (BrokenPipeError, OSError):
187 shutdown_event.set()
188 return
189 continue
190 if kind == WireKind.SHUTDOWN:
191 with contextlib.suppress(BrokenPipeError, OSError):
192 health_conn.send((WireKind.ACK, None))
193 shutdown_event.set()
194 return
195 log.warning("%s worker dropped unexpected health-pipe kind %r", role, kind)
198def _handle_data_frame(
199 data_conn: Any,
200 state: WorkerLoopState,
201 kind_handlers: dict[WireKind, KindHandler],
202 role: WorkerRole,
203 shutdown_event: threading.Event,
204) -> bool:
205 """Read and dispatch one data-pipe frame. Return False to stop the loop.
207 Uses ``poll()`` with a short timeout so a shutdown flag set by the
208 heartbeat thread takes effect within one poll interval even when the
209 data pipe is idle. A long-running handler (a multi-second chat stream)
210 is interrupted only after the handler returns; the join in
211 ``PipeChannel.close`` falls back to ``terminate()`` past the close
212 timeout for that case.
213 """
214 while not data_conn.poll(_DATA_POLL_INTERVAL_S):
215 if shutdown_event.is_set():
216 return False
217 try:
218 kind, payload = data_conn.recv()
219 except EOFError:
220 return False
221 reply = Reply(data_conn)
222 handler = kind_handlers.get(kind)
223 if handler is not None:
224 handler(reply, payload, state)
225 return True
226 try:
227 raise ValueError(f"{role} worker received unknown kind {kind!r}")
228 except ValueError as exc:
229 reply.send(WireKind.ERROR, _serialize_exception(exc))
230 return True
233__all__ = [
234 "KindHandler",
235 "Reply",
236 "WorkerLoopState",
237 "configure_worker_logging",
238 "redirect_stdio_to_devnull",
239 "run_worker",
240]