Coverage for src / lilbee / providers / worker / embed_worker.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Long-lived embed worker subprocess body."""
3from __future__ import annotations
5import contextlib
6from typing import Any
8from lilbee.providers.worker.transport import RoleConfig
9from lilbee.providers.worker.transport_pipe import _serialize_exception
10from lilbee.providers.worker.wire_kinds import WireKind
11from lilbee.providers.worker.worker_runtime import Reply, WorkerLoopState, run_worker
14class _EmbedSession:
15 """Lazy-loaded Llama embedder, kept alive for the worker's lifetime.
17 The model loads on the first ``embed`` request rather than at spawn
18 time so the parent's lazy-spawn cost stays bounded by spawn itself
19 plus pickle of the role config; the heavy llama-cpp init is paid on
20 first real use.
21 """
23 def __init__(self, role_config: RoleConfig) -> None:
24 self._role_config = role_config
25 self._llm: Any = None
27 def embed(self, texts: list[str]) -> list[list[float]]:
28 """Embed *texts*, loading the model on first call."""
29 if self._llm is None:
30 self._llm = self._load()
31 return self._embed_batch(self._llm, texts)
33 def _load(self) -> Any:
34 from lilbee.providers.llama_cpp.provider import load_llama
35 from lilbee.providers.model_cache import LoaderMode
37 return load_llama(self._role_config.model_path, mode=LoaderMode.EMBED)
39 @staticmethod
40 def _embed_batch(llm: Any, texts: list[str]) -> list[list[float]]:
41 # circular: lilbee.providers.llama_cpp.__init__ eagerly imports
42 # provider.py, which imports this worker module. Function-local
43 # import keeps that cycle from firing at module-load time.
44 from lilbee.providers.llama_cpp.batching import embed_batch
46 return embed_batch(llm, texts)
48 def close(self) -> None:
49 """Release the loaded model, if any. Idempotent."""
50 if self._llm is None:
51 return
52 with contextlib.suppress(Exception):
53 self._llm.close()
54 self._llm = None
57def _handle_embed(reply: Reply, payload: Any, state: WorkerLoopState) -> None:
58 """Run one embed request and send the typed reply (or error)."""
59 if not isinstance(payload, list):
60 try:
61 raise TypeError(f"embed payload must be list[str], got {type(payload).__name__}")
62 except TypeError as exc:
63 reply.send(WireKind.ERROR, _serialize_exception(exc))
64 return
65 session: _EmbedSession = state.session
66 try:
67 vectors = session.embed(payload)
68 except Exception as exc:
69 reply.send(WireKind.ERROR, _serialize_exception(exc))
70 return
71 reply.send(WireKind.RESULT, vectors)
74def embed_worker_main(
75 data_conn: Any, health_conn: Any, abort_flag: Any, role_config: RoleConfig
76) -> None:
77 """Embed worker entrypoint: load llama-cpp lazily, serve requests until shutdown."""
78 run_worker(
79 data_conn,
80 health_conn,
81 abort_flag,
82 role_config,
83 session_factory=lambda role_cfg, _abort: _EmbedSession(role_cfg),
84 kind_handlers={WireKind.EMBED: _handle_embed},
85 )
88__all__ = ["embed_worker_main"]