Coverage for src / lilbee / providers / worker / transport.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Bidirectional channel and spawner protocols for worker IPC. 

2 

3Concrete impl lives in :mod:`lilbee.providers.worker.transport_pipe`. 

4""" 

5 

6from __future__ import annotations 

7 

8from collections.abc import AsyncIterator, Awaitable, Callable 

9from dataclasses import dataclass 

10from enum import StrEnum 

11from pathlib import Path 

12from typing import Any, Literal, Protocol, runtime_checkable 

13 

14from lilbee.providers.worker.wire_kinds import WireKind 

15 

16 

17class WorkerRole(StrEnum): 

18 """Worker pool role identifier; addresses one llama.cpp worker process.""" 

19 

20 EMBED = "embed" 

21 RERANK = "rerank" 

22 CHAT = "chat" 

23 VISION = "vision" 

24 

25 

26OcrBackend = Literal["vision"] 

27"""Backends supported by the PDF-OCR worker. Tesseract runs inline, not pooled.""" 

28 

29WorkerEntrypoint = Callable[..., None] 

30"""Signature of a worker subprocess main function. 

31 

32Concrete signature is ``(child_conn, abort_flag, role_config) -> None`` for 

33the pipe transport. Kept as ``Callable[..., None]`` here so other transports 

34(zmq, in-process for tests) can supply their own argument shape without the 

35Protocol leaking mp-specific types. 

36""" 

37 

38 

39@dataclass(frozen=True) 

40class RoleConfig: 

41 """Spawn-time configuration handed to a worker subprocess. 

42 

43 Must be picklable: it crosses the process boundary at spawn. ``role`` is 

44 the short identifier (``embed``, ``chat``, ``rerank``, ``vision``). 

45 ``model_path`` is the absolute on-disk path to the GGUF file the worker 

46 should load. ``mode`` is the loader hint (``"embed"``, ``"chat"``, 

47 ``"vision"``) consumed by ``providers.model_cache``. ``extras`` carries 

48 any additional pickle-friendly payload a specific role needs (kept open 

49 so adding fields does not change the Protocol signature). 

50 """ 

51 

52 role: WorkerRole 

53 model_path: Path 

54 mode: str 

55 extras: dict[str, Any] | None = None 

56 

57 

58@dataclass(frozen=True) 

59class ChatRequest: 

60 """Pickle-friendly chat request that crosses the parent->worker pipe. 

61 

62 Replaces the inline ``{"messages": ..., "stream": ..., ...}`` dict 

63 so a typo on either side surfaces as a type error (or attribute 

64 miss) instead of a silent ``payload.get("missing", default)`` that 

65 masks the bug. ``messages`` is the standard llama-cpp message list; 

66 ``stream`` decides between single-result and chunked replies; 

67 ``options`` is the post-``filter_options`` kwarg dict the worker 

68 forwards to ``create_chat_completion``; ``model`` triggers a 

69 transparent reload inside the worker if it differs from the 

70 role-config model. 

71 """ 

72 

73 messages: list[dict[str, str]] 

74 stream: bool = False 

75 options: dict[str, Any] | None = None 

76 model: str | None = None 

77 

78 

79@dataclass(frozen=True) 

80class VisionRequest: 

81 """Pickle-friendly vision-OCR request that crosses the parent->worker pipe. 

82 

83 Replaces the inline ``{"png_bytes": ..., "model": ..., "prompt": ...}`` 

84 dict. ``model`` is optional: ``None`` means "use the role-config 

85 model unchanged". 

86 """ 

87 

88 png_bytes: bytes 

89 prompt: str = "" 

90 model: str | None = None 

91 

92 

93@dataclass(frozen=True) 

94class PdfOcrRequest: 

95 """Pickle-friendly multi-page PDF-OCR request. 

96 

97 ``path`` is a string so it pickles cheaply across the parent->worker 

98 pipe; the worker re-wraps it as a :class:`pathlib.Path`. ``model`` 

99 overrides ``cfg.vision_model`` for the call when non-empty. The 

100 parent enforces wall-clock budgets via the stream-drain timeout; 

101 the worker only validates the payload shape. 

102 """ 

103 

104 path: str 

105 backend: OcrBackend 

106 model: str = "" 

107 

108 

109@dataclass(frozen=True) 

110class RerankPayload: 

111 """Pickle-friendly rerank request. 

112 

113 Replaces the bare ``(query, candidates)`` tuple so the worker can 

114 type-check the shape via attribute access instead of length / index 

115 juggling. 

116 """ 

117 

118 query: str 

119 candidates: list[str] 

120 

121 

122@dataclass(frozen=True) 

123class WorkerHandle: 

124 """Opaque handle to a spawned worker, returned alongside the channel. 

125 

126 Carries the bookkeeping the pool needs for restart-on-crash and idle 

127 reaping without exposing transport-specific types (``mp.Process``, 

128 ``threading.Thread``, etc.) to the pool. ``pid`` is informational and 

129 may be ``None`` for transports that do not have a single OS process 

130 (e.g. a hypothetical in-process test transport). 

131 """ 

132 

133 pid: int | None 

134 role: WorkerRole 

135 

136 

137@runtime_checkable 

138class WorkerChannel(Protocol): 

139 """Bidirectional message channel to one running worker. 

140 

141 Lifecycle: built by a :class:`WorkerSpawner`, kept alive for the 

142 worker's lifetime, torn down via :meth:`close`. Methods are ordered 

143 by the typical call sequence (call/stream during inference, ping for 

144 health, cancel/clear_abort to interrupt, close on shutdown). 

145 

146 Call-ordering contract 

147 ---------------------- 

148 

149 1. The spawner returns a channel ready for ``call`` / ``stream`` / 

150 ``ping`` immediately. There is no ``initialize`` step. 

151 2. ``is_alive`` and ``in_flight`` are safe to read at any time, 

152 including concurrently with an in-flight ``call`` / ``stream``. 

153 3. ``call``, ``stream``, and ``ping`` may run concurrently with each 

154 other (each acquires its own send/recv lock internally), but the 

155 worker process serializes them on the wire. 

156 4. ``cancel`` / ``clear_abort`` are best-effort and never block on a 

157 lock; safe to call from any thread, including during a streaming 

158 ``__anext__`` so the consumer can interrupt itself. 

159 5. ``close`` is final. After it returns, ``call`` / ``stream`` / 

160 ``ping`` must raise :class:`WorkerError`. ``close`` is idempotent. 

161 """ 

162 

163 @property 

164 def is_alive(self) -> bool: 

165 """Return True iff the worker process is still running.""" 

166 ... 

167 

168 @property 

169 def pid(self) -> int | None: 

170 """OS process id of the worker, or None for transports without one.""" 

171 ... 

172 

173 @property 

174 def in_flight(self) -> int: 

175 """Number of requests sent but not yet fully replied to. 

176 

177 The pool's idle reaper checks this is zero before timing out a 

178 worker. A pending ``stream()`` counts as in-flight until its 

179 terminator (``stream_end`` / ``error``) arrives. 

180 """ 

181 ... 

182 

183 def call(self, kind: WireKind, payload: Any, *, timeout: float) -> Awaitable[Any]: 

184 """Send one request, await one reply. Raises on worker error or timeout.""" 

185 ... 

186 

187 def stream(self, kind: WireKind, payload: Any) -> AsyncIterator[Any]: 

188 """Send one request, yield streamed chunks until the worker terminates the stream.""" 

189 ... 

190 

191 def ping(self, *, timeout: float) -> Awaitable[None]: 

192 """Send ping, await pong. Raises on timeout (worker considered hung).""" 

193 ... 

194 

195 def cancel(self) -> None: 

196 """Flip the worker's abort flag. 

197 

198 Best-effort: in-flight ``stream_chunk`` messages already in the 

199 pipe will still drain (typically a few extra tokens). The 

200 user-facing toast should say "Cancelling..." until the worker 

201 confirms with a terminator. 

202 """ 

203 ... 

204 

205 def clear_abort(self) -> None: 

206 """Reset the abort flag to 0 so the next request runs to completion.""" 

207 ... 

208 

209 def close(self, *, timeout: float) -> Awaitable[None]: 

210 """Send shutdown, await graceful exit, terminate stragglers past *timeout*.""" 

211 ... 

212 

213 

214@runtime_checkable 

215class WorkerSpawner(Protocol): 

216 """Spawns worker subprocesses and returns their channels. 

217 

218 One spawner instance per :class:`WorkerPool`; each call to 

219 :meth:`spawn` produces one new worker. The spawner owns transport- 

220 specific knowledge (which mp.Pipe end the child gets, which port a 

221 zmq worker should bind, etc.); the pool only sees Protocols. 

222 """ 

223 

224 def spawn( 

225 self, 

226 worker_main: WorkerEntrypoint, 

227 role_config: RoleConfig, 

228 ) -> tuple[WorkerChannel, WorkerHandle]: 

229 """Start a worker process and return its channel + handle.""" 

230 ... 

231 

232 

233def default_spawner() -> WorkerSpawner: 

234 """Return a fresh :class:`PipeSpawner`. Lazy import to avoid a transport_pipe cycle.""" 

235 from lilbee.providers.worker.transport_pipe import PipeSpawner 

236 

237 return PipeSpawner() 

238 

239 

240__all__ = [ 

241 "ChatRequest", 

242 "OcrBackend", 

243 "PdfOcrRequest", 

244 "RerankPayload", 

245 "RoleConfig", 

246 "VisionRequest", 

247 "WorkerChannel", 

248 "WorkerEntrypoint", 

249 "WorkerHandle", 

250 "WorkerRole", 

251 "WorkerSpawner", 

252 "default_spawner", 

253]