Coverage for src / lilbee / providers / worker / embed_worker.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Long-lived embed worker subprocess body.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6from typing import Any 

7 

8from lilbee.providers.worker.transport import RoleConfig 

9from lilbee.providers.worker.transport_pipe import _serialize_exception 

10from lilbee.providers.worker.wire_kinds import WireKind 

11from lilbee.providers.worker.worker_runtime import Reply, WorkerLoopState, run_worker 

12 

13 

14class _EmbedSession: 

15 """Lazy-loaded Llama embedder, kept alive for the worker's lifetime. 

16 

17 The model loads on the first ``embed`` request rather than at spawn 

18 time so the parent's lazy-spawn cost stays bounded by spawn itself 

19 plus pickle of the role config; the heavy llama-cpp init is paid on 

20 first real use. 

21 """ 

22 

23 def __init__(self, role_config: RoleConfig) -> None: 

24 self._role_config = role_config 

25 self._llm: Any = None 

26 

27 def embed(self, texts: list[str]) -> list[list[float]]: 

28 """Embed *texts*, loading the model on first call.""" 

29 if self._llm is None: 

30 self._llm = self._load() 

31 return self._embed_batch(self._llm, texts) 

32 

33 def _load(self) -> Any: 

34 from lilbee.providers.llama_cpp.provider import load_llama 

35 from lilbee.providers.model_cache import LoaderMode 

36 

37 return load_llama(self._role_config.model_path, mode=LoaderMode.EMBED) 

38 

39 @staticmethod 

40 def _embed_batch(llm: Any, texts: list[str]) -> list[list[float]]: 

41 # circular: lilbee.providers.llama_cpp.__init__ eagerly imports 

42 # provider.py, which imports this worker module. Function-local 

43 # import keeps that cycle from firing at module-load time. 

44 from lilbee.providers.llama_cpp.batching import embed_batch 

45 

46 return embed_batch(llm, texts) 

47 

48 def close(self) -> None: 

49 """Release the loaded model, if any. Idempotent.""" 

50 if self._llm is None: 

51 return 

52 with contextlib.suppress(Exception): 

53 self._llm.close() 

54 self._llm = None 

55 

56 

57def _handle_embed(reply: Reply, payload: Any, state: WorkerLoopState) -> None: 

58 """Run one embed request and send the typed reply (or error).""" 

59 if not isinstance(payload, list): 

60 try: 

61 raise TypeError(f"embed payload must be list[str], got {type(payload).__name__}") 

62 except TypeError as exc: 

63 reply.send(WireKind.ERROR, _serialize_exception(exc)) 

64 return 

65 session: _EmbedSession = state.session 

66 try: 

67 vectors = session.embed(payload) 

68 except Exception as exc: 

69 reply.send(WireKind.ERROR, _serialize_exception(exc)) 

70 return 

71 reply.send(WireKind.RESULT, vectors) 

72 

73 

74def embed_worker_main( 

75 data_conn: Any, health_conn: Any, abort_flag: Any, role_config: RoleConfig 

76) -> None: 

77 """Embed worker entrypoint: load llama-cpp lazily, serve requests until shutdown.""" 

78 run_worker( 

79 data_conn, 

80 health_conn, 

81 abort_flag, 

82 role_config, 

83 session_factory=lambda role_cfg, _abort: _EmbedSession(role_cfg), 

84 kind_handlers={WireKind.EMBED: _handle_embed}, 

85 ) 

86 

87 

88__all__ = ["embed_worker_main"]