Coverage for src / lilbee / providers / worker / transport.py: 100%
50 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Bidirectional channel and spawner protocols for worker IPC.
3Concrete impl lives in :mod:`lilbee.providers.worker.transport_pipe`.
4"""
6from __future__ import annotations
8from collections.abc import AsyncIterator, Awaitable, Callable
9from dataclasses import dataclass
10from enum import StrEnum
11from pathlib import Path
12from typing import Any, Literal, Protocol, runtime_checkable
14from lilbee.providers.worker.wire_kinds import WireKind
17class WorkerRole(StrEnum):
18 """Worker pool role identifier; addresses one llama.cpp worker process."""
20 EMBED = "embed"
21 RERANK = "rerank"
22 CHAT = "chat"
23 VISION = "vision"
26OcrBackend = Literal["vision"]
27"""Backends supported by the PDF-OCR worker. Tesseract runs inline, not pooled."""
29WorkerEntrypoint = Callable[..., None]
30"""Signature of a worker subprocess main function.
32Concrete signature is ``(child_conn, abort_flag, role_config) -> None`` for
33the pipe transport. Kept as ``Callable[..., None]`` here so other transports
34(zmq, in-process for tests) can supply their own argument shape without the
35Protocol leaking mp-specific types.
36"""
39@dataclass(frozen=True)
40class RoleConfig:
41 """Spawn-time configuration handed to a worker subprocess.
43 Must be picklable: it crosses the process boundary at spawn. ``role`` is
44 the short identifier (``embed``, ``chat``, ``rerank``, ``vision``).
45 ``model_path`` is the absolute on-disk path to the GGUF file the worker
46 should load. ``mode`` is the loader hint (``"embed"``, ``"chat"``,
47 ``"vision"``) consumed by ``providers.model_cache``. ``extras`` carries
48 any additional pickle-friendly payload a specific role needs (kept open
49 so adding fields does not change the Protocol signature).
50 """
52 role: WorkerRole
53 model_path: Path
54 mode: str
55 extras: dict[str, Any] | None = None
58@dataclass(frozen=True)
59class ChatRequest:
60 """Pickle-friendly chat request that crosses the parent->worker pipe.
62 Replaces the inline ``{"messages": ..., "stream": ..., ...}`` dict
63 so a typo on either side surfaces as a type error (or attribute
64 miss) instead of a silent ``payload.get("missing", default)`` that
65 masks the bug. ``messages`` is the standard llama-cpp message list;
66 ``stream`` decides between single-result and chunked replies;
67 ``options`` is the post-``filter_options`` kwarg dict the worker
68 forwards to ``create_chat_completion``; ``model`` triggers a
69 transparent reload inside the worker if it differs from the
70 role-config model.
71 """
73 messages: list[dict[str, str]]
74 stream: bool = False
75 options: dict[str, Any] | None = None
76 model: str | None = None
79@dataclass(frozen=True)
80class VisionRequest:
81 """Pickle-friendly vision-OCR request that crosses the parent->worker pipe.
83 Replaces the inline ``{"png_bytes": ..., "model": ..., "prompt": ...}``
84 dict. ``model`` is optional: ``None`` means "use the role-config
85 model unchanged".
86 """
88 png_bytes: bytes
89 prompt: str = ""
90 model: str | None = None
93@dataclass(frozen=True)
94class PdfOcrRequest:
95 """Pickle-friendly multi-page PDF-OCR request.
97 ``path`` is a string so it pickles cheaply across the parent->worker
98 pipe; the worker re-wraps it as a :class:`pathlib.Path`. ``model``
99 overrides ``cfg.vision_model`` for the call when non-empty. The
100 parent enforces wall-clock budgets via the stream-drain timeout;
101 the worker only validates the payload shape.
102 """
104 path: str
105 backend: OcrBackend
106 model: str = ""
109@dataclass(frozen=True)
110class RerankPayload:
111 """Pickle-friendly rerank request.
113 Replaces the bare ``(query, candidates)`` tuple so the worker can
114 type-check the shape via attribute access instead of length / index
115 juggling.
116 """
118 query: str
119 candidates: list[str]
122@dataclass(frozen=True)
123class WorkerHandle:
124 """Opaque handle to a spawned worker, returned alongside the channel.
126 Carries the bookkeeping the pool needs for restart-on-crash and idle
127 reaping without exposing transport-specific types (``mp.Process``,
128 ``threading.Thread``, etc.) to the pool. ``pid`` is informational and
129 may be ``None`` for transports that do not have a single OS process
130 (e.g. a hypothetical in-process test transport).
131 """
133 pid: int | None
134 role: WorkerRole
137@runtime_checkable
138class WorkerChannel(Protocol):
139 """Bidirectional message channel to one running worker.
141 Lifecycle: built by a :class:`WorkerSpawner`, kept alive for the
142 worker's lifetime, torn down via :meth:`close`. Methods are ordered
143 by the typical call sequence (call/stream during inference, ping for
144 health, cancel/clear_abort to interrupt, close on shutdown).
146 Call-ordering contract
147 ----------------------
149 1. The spawner returns a channel ready for ``call`` / ``stream`` /
150 ``ping`` immediately. There is no ``initialize`` step.
151 2. ``is_alive`` and ``in_flight`` are safe to read at any time,
152 including concurrently with an in-flight ``call`` / ``stream``.
153 3. ``call``, ``stream``, and ``ping`` may run concurrently with each
154 other (each acquires its own send/recv lock internally), but the
155 worker process serializes them on the wire.
156 4. ``cancel`` / ``clear_abort`` are best-effort and never block on a
157 lock; safe to call from any thread, including during a streaming
158 ``__anext__`` so the consumer can interrupt itself.
159 5. ``close`` is final. After it returns, ``call`` / ``stream`` /
160 ``ping`` must raise :class:`WorkerError`. ``close`` is idempotent.
161 """
163 @property
164 def is_alive(self) -> bool:
165 """Return True iff the worker process is still running."""
166 ...
168 @property
169 def pid(self) -> int | None:
170 """OS process id of the worker, or None for transports without one."""
171 ...
173 @property
174 def in_flight(self) -> int:
175 """Number of requests sent but not yet fully replied to.
177 The pool's idle reaper checks this is zero before timing out a
178 worker. A pending ``stream()`` counts as in-flight until its
179 terminator (``stream_end`` / ``error``) arrives.
180 """
181 ...
183 def call(self, kind: WireKind, payload: Any, *, timeout: float) -> Awaitable[Any]:
184 """Send one request, await one reply. Raises on worker error or timeout."""
185 ...
187 def stream(self, kind: WireKind, payload: Any) -> AsyncIterator[Any]:
188 """Send one request, yield streamed chunks until the worker terminates the stream."""
189 ...
191 def ping(self, *, timeout: float) -> Awaitable[None]:
192 """Send ping, await pong. Raises on timeout (worker considered hung)."""
193 ...
195 def cancel(self) -> None:
196 """Flip the worker's abort flag.
198 Best-effort: in-flight ``stream_chunk`` messages already in the
199 pipe will still drain (typically a few extra tokens). The
200 user-facing toast should say "Cancelling..." until the worker
201 confirms with a terminator.
202 """
203 ...
205 def clear_abort(self) -> None:
206 """Reset the abort flag to 0 so the next request runs to completion."""
207 ...
209 def close(self, *, timeout: float) -> Awaitable[None]:
210 """Send shutdown, await graceful exit, terminate stragglers past *timeout*."""
211 ...
214@runtime_checkable
215class WorkerSpawner(Protocol):
216 """Spawns worker subprocesses and returns their channels.
218 One spawner instance per :class:`WorkerPool`; each call to
219 :meth:`spawn` produces one new worker. The spawner owns transport-
220 specific knowledge (which mp.Pipe end the child gets, which port a
221 zmq worker should bind, etc.); the pool only sees Protocols.
222 """
224 def spawn(
225 self,
226 worker_main: WorkerEntrypoint,
227 role_config: RoleConfig,
228 ) -> tuple[WorkerChannel, WorkerHandle]:
229 """Start a worker process and return its channel + handle."""
230 ...
233def default_spawner() -> WorkerSpawner:
234 """Return a fresh :class:`PipeSpawner`. Lazy import to avoid a transport_pipe cycle."""
235 from lilbee.providers.worker.transport_pipe import PipeSpawner
237 return PipeSpawner()
240__all__ = [
241 "ChatRequest",
242 "OcrBackend",
243 "PdfOcrRequest",
244 "RerankPayload",
245 "RoleConfig",
246 "VisionRequest",
247 "WorkerChannel",
248 "WorkerEntrypoint",
249 "WorkerHandle",
250 "WorkerRole",
251 "WorkerSpawner",
252 "default_spawner",
253]