Coverage for src / lilbee / providers / worker / worker_runtime.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Cross-role helpers for worker subprocesses. 

2 

3Bootstraps the per-role workers (embed, chat, rerank, vision) and runs 

4the recv loop. Health pings and graceful shutdown live on a dedicated 

5daemon thread so a long inference call cannot starve liveness or stop 

6the parent from terminating the worker. 

7""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import logging 

13import os 

14import sys 

15import threading 

16from collections.abc import Callable 

17from dataclasses import dataclass 

18from typing import Any 

19 

20from lilbee.providers.worker.transport import RoleConfig, WorkerRole 

21from lilbee.providers.worker.transport_pipe import _serialize_exception 

22from lilbee.providers.worker.wire_kinds import WireKind 

23 

24log = logging.getLogger(__name__) 

25 

26# How often the main data loop wakes from poll() to re-check the shutdown 

27# flag. 100 ms is well under any user-visible cancel budget but rare enough 

28# not to show up as CPU noise. 

29_DATA_POLL_INTERVAL_S = 0.1 

30 

31#: Subdirectory of ``cfg.data_root`` where per-role worker logs land. 

32#: Shared with :mod:`lilbee.providers.worker.transport_pipe` so the parent's 

33#: :class:`WorkerCrashError` points at the exact file the worker wrote. 

34WORKER_LOGS_DIR_NAME = "logs" 

35 

36 

37def redirect_stdio_to_devnull() -> None: # pragma: no cover - subprocess fd swap 

38 """Send stdout/stderr to /dev/null so llama-cpp's C-level prints stay quiet.""" 

39 devnull_fd = os.open(os.devnull, os.O_RDWR) 

40 os.dup2(devnull_fd, 1) 

41 os.dup2(devnull_fd, 2) 

42 os.close(devnull_fd) 

43 sys.stdout = open(os.devnull, "w") # noqa: SIM115 

44 sys.stderr = open(os.devnull, "w") # noqa: SIM115 

45 

46 

47def configure_worker_logging(role: WorkerRole) -> None: 

48 """Append worker logs to ``$LILBEE_DATA/logs/worker-<role>.log``. 

49 

50 ``LILBEE_DATA`` is canonicalized at cfg construction 

51 (:func:`lilbee.core.config.model._build_cfg`) and at every write 

52 site that switches the data root, so the env is the single source 

53 of truth. 

54 """ 

55 data_dir = os.environ.get("LILBEE_DATA") 

56 if not data_dir: 

57 return 

58 logs_dir = os.path.join(data_dir, WORKER_LOGS_DIR_NAME) 

59 with contextlib.suppress(OSError): 

60 os.makedirs(logs_dir, exist_ok=True) 

61 log_path = os.path.join(logs_dir, f"worker-{role}.log") 

62 handler = logging.FileHandler(log_path) 

63 handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")) 

64 root = logging.getLogger() 

65 root.addHandler(handler) 

66 root.setLevel(logging.INFO) 

67 

68 

69class Reply: 

70 """Sender for the in-flight call's response frames on the data pipe. 

71 

72 Handlers receive a Reply instance and emit response frames via 

73 ``reply.send(kind, payload)``. The parent's :class:`PipeChannel` 

74 holds the call lock for the full request/reply or request/stream 

75 window, so a frame sent here belongs to the call the worker is 

76 currently servicing by construction. 

77 """ 

78 

79 def __init__(self, conn: Any) -> None: 

80 self._conn = conn 

81 

82 def send(self, kind: WireKind, payload: Any) -> None: 

83 """Send one response frame on the data pipe.""" 

84 self._conn.send((kind, payload)) 

85 

86 

87@dataclass 

88class WorkerLoopState: 

89 """Per-loop state shared by the dispatcher and the role's handlers. 

90 

91 Holds the role's lazy-loaded session. Each handler receives a 

92 reference and pulls its model via ``state.session``. 

93 """ 

94 

95 session: Any 

96 

97 

98KindHandler = Callable[[Reply, Any, WorkerLoopState], None] 

99"""Per-role handler signature: ``(reply, payload, state) -> None``. 

100 

101Handlers reach their role-specific session via ``state.session`` and 

102emit response frames via ``reply.send(kind, payload)``. 

103""" 

104 

105 

106def run_worker( 

107 data_conn: Any, 

108 health_conn: Any, 

109 abort_flag: Any, 

110 role_config: RoleConfig, 

111 *, 

112 session_factory: Callable[[RoleConfig, Any], Any], 

113 kind_handlers: dict[WireKind, KindHandler], 

114) -> None: 

115 """Bootstrap stdio + logging, then run the recv loop until shutdown. 

116 

117 The control plane (ping, shutdown) travels on the health pipe and is 

118 served by a daemon thread. The main loop polls the data pipe with a 

119 short timeout so the shutdown flag set by the heartbeat thread fires 

120 promptly even when no data frame is pending. Unknown data-pipe kinds 

121 reply with a serialized ``ValueError``. 

122 """ 

123 redirect_stdio_to_devnull() 

124 configure_worker_logging(role_config.role) 

125 log.info( 

126 "%s worker online (pid=%s, model=%s)", 

127 role_config.role, 

128 os.getpid(), 

129 role_config.model_path, 

130 ) 

131 state = WorkerLoopState(session=session_factory(role_config, abort_flag)) 

132 shutdown_event = threading.Event() 

133 heartbeat = _start_heartbeat_thread(health_conn, role_config.role, shutdown_event) 

134 try: 

135 while not shutdown_event.is_set(): 

136 if not _handle_data_frame( 

137 data_conn, 

138 state, 

139 kind_handlers, 

140 role_config.role, 

141 shutdown_event, 

142 ): 

143 break 

144 finally: 

145 shutdown_event.set() 

146 state.session.close() 

147 with contextlib.suppress(Exception): 

148 data_conn.close() 

149 with contextlib.suppress(Exception): 

150 health_conn.close() 

151 heartbeat.join(timeout=1.0) 

152 

153 

154def _start_heartbeat_thread( 

155 health_conn: Any, role: WorkerRole, shutdown_event: threading.Event 

156) -> threading.Thread: 

157 """Spawn the daemon thread that owns the health pipe.""" 

158 thread = threading.Thread( 

159 target=_heartbeat_loop, 

160 args=(health_conn, role, shutdown_event), 

161 name=f"lilbee-worker-{role}-heartbeat", 

162 daemon=True, 

163 ) 

164 thread.start() 

165 return thread 

166 

167 

168def _heartbeat_loop(health_conn: Any, role: WorkerRole, shutdown_event: threading.Event) -> None: 

169 """Serve ping/shutdown on the health pipe until the parent closes it. 

170 

171 On SHUTDOWN, sets the shutdown event so the main thread exits its 

172 poll loop within ``_DATA_POLL_INTERVAL_S`` regardless of whether a 

173 data frame is pending. The ACK reply is best-effort: the parent's 

174 close() suppresses pipe errors so a torn-down health connection 

175 does not surface as a noisy supervisor warning. 

176 """ 

177 while True: 

178 try: 

179 kind, _ = health_conn.recv() 

180 except (EOFError, OSError): 

181 shutdown_event.set() 

182 return 

183 if kind == WireKind.PING: 

184 try: 

185 health_conn.send((WireKind.PONG, None)) 

186 except (BrokenPipeError, OSError): 

187 shutdown_event.set() 

188 return 

189 continue 

190 if kind == WireKind.SHUTDOWN: 

191 with contextlib.suppress(BrokenPipeError, OSError): 

192 health_conn.send((WireKind.ACK, None)) 

193 shutdown_event.set() 

194 return 

195 log.warning("%s worker dropped unexpected health-pipe kind %r", role, kind) 

196 

197 

198def _handle_data_frame( 

199 data_conn: Any, 

200 state: WorkerLoopState, 

201 kind_handlers: dict[WireKind, KindHandler], 

202 role: WorkerRole, 

203 shutdown_event: threading.Event, 

204) -> bool: 

205 """Read and dispatch one data-pipe frame. Return False to stop the loop. 

206 

207 Uses ``poll()`` with a short timeout so a shutdown flag set by the 

208 heartbeat thread takes effect within one poll interval even when the 

209 data pipe is idle. A long-running handler (a multi-second chat stream) 

210 is interrupted only after the handler returns; the join in 

211 ``PipeChannel.close`` falls back to ``terminate()`` past the close 

212 timeout for that case. 

213 """ 

214 while not data_conn.poll(_DATA_POLL_INTERVAL_S): 

215 if shutdown_event.is_set(): 

216 return False 

217 try: 

218 kind, payload = data_conn.recv() 

219 except EOFError: 

220 return False 

221 reply = Reply(data_conn) 

222 handler = kind_handlers.get(kind) 

223 if handler is not None: 

224 handler(reply, payload, state) 

225 return True 

226 try: 

227 raise ValueError(f"{role} worker received unknown kind {kind!r}") 

228 except ValueError as exc: 

229 reply.send(WireKind.ERROR, _serialize_exception(exc)) 

230 return True 

231 

232 

233__all__ = [ 

234 "KindHandler", 

235 "Reply", 

236 "WorkerLoopState", 

237 "configure_worker_logging", 

238 "redirect_stdio_to_devnull", 

239 "run_worker", 

240]