Coverage for src / lilbee / app / services.py: 100%
118 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Typed service container: single point of access for all singletons.
3All runtime dependencies (provider, store, embedder, reranker, concepts,
4clusterer, searcher, worker pool) are created lazily on first call to
5``get_services()`` and cached for the process lifetime. Tests call
6``reset_services()`` between runs.
7"""
9from __future__ import annotations
11import asyncio
12import atexit
13import logging
14import threading
15from collections.abc import Callable
16from dataclasses import dataclass, field
17from typing import TYPE_CHECKING
19from lilbee.providers.worker.pool import shutdown_pool_runtime
21log = logging.getLogger(__name__)
23_RELOAD_CLOSE_TIMEOUT_S = 5.0
24"""Wall-clock budget for closing a detached worker channel during reload_role.
26Matches ``_DEFAULT_SHUTDOWN_TIMEOUT_S`` in ``providers.worker.pool``: a worker
27that does not ack SHUTDOWN within this window is terminated so the new model
28load is not blocked.
29"""
31if TYPE_CHECKING:
32 from lilbee.catalog.hf_client import HfClient
33 from lilbee.data.store import Store
34 from lilbee.modelhub.model_manager import ModelManager
35 from lilbee.modelhub.registry import ModelRegistry
36 from lilbee.providers.base import LLMProvider
37 from lilbee.providers.worker.health_ticker import HealthTickerHandle
38 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool
39 from lilbee.providers.worker.transport import WorkerRole
40 from lilbee.retrieval.clustering import Clusterer
41 from lilbee.retrieval.concepts import ConceptGraph
42 from lilbee.retrieval.embedder import Embedder
43 from lilbee.retrieval.query import Searcher
44 from lilbee.retrieval.reranker import Reranker
45 from lilbee.runtime.ingest_lock import IngestLockRegistry
48@dataclass
49class CrawlerSyncState:
50 """Process-wide sync coordination state (lock + last-run timestamp)."""
52 lock: threading.Lock = field(default_factory=threading.Lock)
53 last_run: float = 0.0
56@dataclass(frozen=True)
57class Services:
58 """Holds all runtime service instances.
60 The worker pool sits on Services (not on the provider) so any
61 subsystem can reach it for cancellation, health checks, or
62 diagnostics without crossing into ``LlamaCppProvider``'s private
63 API. ``cancel_inference()`` is the canonical entry point used by
64 Ctrl+C and the chat-stream cancel action.
65 """
67 provider: LLMProvider
68 store: Store
69 embedder: Embedder
70 reranker: Reranker
71 concepts: ConceptGraph
72 clusterer: Clusterer
73 searcher: Searcher
74 registry: ModelRegistry
75 hf_client: HfClient
76 ingest_lock_registry: IngestLockRegistry
77 model_manager: ModelManager
78 crawler_semaphore: asyncio.Semaphore | None
79 crawler_sync_state: CrawlerSyncState
80 worker_pool: WorkerPool
81 pool_runtime: PoolRuntime
82 pool_health_ticker: HealthTickerHandle
84 def cancel_inference(self) -> None:
85 """Flip the abort flag on every registered worker pool role. Idempotent."""
86 for role_name in self.worker_pool.registered_roles:
87 self.worker_pool.accessor(role_name).cancel()
89 def reload_role(self, role_name: WorkerRole) -> None:
90 """Drop *role_name*'s current worker so the next call lazy-respawns with cfg.
92 Detaches the channel synchronously (subsequent calls see no live worker),
93 then closes the old channel in the background on the pool runtime so the
94 caller's event loop is not stalled. Other roles' workers and any
95 in-flight stream they own are untouched. Use when only one role-bound
96 model setting has changed (e.g. embedding_model).
97 """
98 channel = self.worker_pool.detach_channel(role_name)
99 if channel is None:
100 return
102 async def _close() -> None:
103 await channel.close(timeout=_RELOAD_CLOSE_TIMEOUT_S)
105 self.pool_runtime.submit(_close())
107 def add_pool_listener(
108 self,
109 *,
110 on_spawning: Callable[[WorkerRole], None] | None = None,
111 on_spawned: Callable[[WorkerRole], None] | None = None,
112 ) -> None:
113 """Subscribe to worker spawn lifecycle events.
115 Forwards directly to :meth:`WorkerPool.add_listener`. The TUI uses this
116 to surface "Starting <role> worker..." / "<role> worker ready"
117 notifications during the cold-start window.
118 """
119 self.worker_pool.add_listener(on_spawning=on_spawning, on_spawned=on_spawned)
122_svc: Services | None = None
123"""Cached singleton, set on first ``get_services()`` call.
125Concurrency contract: lilbee runs the asyncio loop on a single worker
126thread + Textual's main thread. ``get_services()`` is idempotent (the
127``if _svc is not None: return`` early-out covers re-entry from a
128background thread). Tests that need a custom container call
129``set_services(make_mock_services(...))`` explicitly; ``peek_services()``
130is the read-only inspector for cleanup fixtures. The Services dataclass
131itself is logically immutable post-construction (its fields are
132references to long-lived service objects), so concurrent reads are safe
133without a lock.
134"""
137def get_services() -> Services:
138 """Return the cached Services singleton, creating on first call.
140 Service modules are imported inside the function to keep CLI
141 startup fast: ``services`` is on every CLI import path, and the
142 concrete service modules transitively pull in heavy libraries
143 (llama-cpp, lancedb, kreuzberg). Deferring the loads until first
144 ``get_services()`` call makes ``lilbee --help`` and TUI splash
145 render in milliseconds instead of seconds.
146 """
147 global _svc
148 if _svc is not None:
149 return _svc
151 from lilbee.catalog.hf_client import HfClient
152 from lilbee.core.config import cfg
153 from lilbee.data.store import Store
154 from lilbee.modelhub.model_manager import ModelManager
155 from lilbee.modelhub.registry import ModelRegistry
156 from lilbee.providers.factory import create_provider
157 from lilbee.providers.worker.health_ticker import start_health_ticker
158 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool
159 from lilbee.providers.worker.transport import default_spawner
160 from lilbee.retrieval.clustering import Clusterer
161 from lilbee.retrieval.concepts import ConceptGraph
162 from lilbee.retrieval.embedder import Embedder
163 from lilbee.retrieval.query import Searcher
164 from lilbee.retrieval.reranker import Reranker
165 from lilbee.runtime.asyncio_loop import get_loop
166 from lilbee.runtime.ingest_lock import IngestLockRegistry
168 worker_pool = WorkerPool(
169 spawner=default_spawner(),
170 max_idle_s=cfg.worker_pool_max_idle_s,
171 )
172 pool_runtime = PoolRuntime()
173 provider = create_provider(cfg)
174 store = Store(cfg)
175 embedder = Embedder(cfg, provider)
176 reranker = Reranker(cfg)
177 concepts = ConceptGraph(cfg, store)
178 clusterer = Clusterer(cfg, store)
179 registry = ModelRegistry(cfg.models_dir)
180 searcher = Searcher(cfg, provider, store, embedder, reranker, concepts)
181 hf_client = HfClient()
182 ingest_lock_registry = IngestLockRegistry()
183 model_manager = ModelManager(cfg.models_dir)
184 crawler_semaphore = (
185 asyncio.Semaphore(cfg.crawl_max_concurrent) if cfg.crawl_max_concurrent > 0 else None
186 )
187 crawler_sync_state = CrawlerSyncState()
188 pool_health_ticker: HealthTickerHandle = start_health_ticker(
189 worker_pool, pool_runtime, get_loop()
190 )
191 _svc = Services(
192 provider=provider,
193 store=store,
194 embedder=embedder,
195 reranker=reranker,
196 concepts=concepts,
197 clusterer=clusterer,
198 searcher=searcher,
199 registry=registry,
200 hf_client=hf_client,
201 ingest_lock_registry=ingest_lock_registry,
202 model_manager=model_manager,
203 crawler_semaphore=crawler_semaphore,
204 crawler_sync_state=crawler_sync_state,
205 worker_pool=worker_pool,
206 pool_runtime=pool_runtime,
207 pool_health_ticker=pool_health_ticker,
208 )
209 # Eager start is the default: pay 1-3 s per worker at TUI mount so the
210 # first user action lands on a warm pool. Roles whose model is unset are
211 # skipped, so a setup with only chat + embed never spawns rerank or
212 # vision. Set ``cfg.worker_pool_eager_start = false`` for headless
213 # scripts where mount time matters more than first-call latency.
214 if cfg.worker_pool_eager_start:
215 try:
216 provider.warm_up_pool()
217 pool_runtime.start()
218 pool_runtime.run_sync(worker_pool.start_eager(), timeout=30.0)
219 except Exception:
220 log.warning(
221 "Eager worker-pool start failed; workers will retry on first use",
222 exc_info=True,
223 )
224 return _svc
227def set_services(services: Services | None) -> None:
228 """Replace the cached Services singleton (for testing)."""
229 global _svc
230 _svc = services
233def peek_services() -> Services | None:
234 """Return the cached Services container, or None if not yet initialized.
236 Public read-only accessor for test cleanup helpers that need to
237 inspect the singleton without forcing initialization.
238 """
239 return _svc
242def reset_services() -> None:
243 """Shut down and discard all cached instances."""
244 global _svc
245 if _svc is not None:
246 shutdown_pool_runtime(_svc.worker_pool, _svc.pool_runtime, _svc.pool_health_ticker)
247 _svc.provider.shutdown()
248 _svc.store.close()
249 _svc = None
252def reset_store() -> None:
253 """Close and rebuild only the Store and its dependents; keep providers loaded.
255 Used after a data-dir wipe (``/reset``) where the LanceDB handle is invalid
256 but the loaded llama-cpp/embedder/reranker models are still good. Avoids the
257 multi-second reload cost of ``reset_services()``.
258 """
259 global _svc
260 if _svc is None:
261 return
262 from dataclasses import replace
264 from lilbee.core.config import cfg
265 from lilbee.data.store import Store
266 from lilbee.retrieval.clustering import Clusterer
267 from lilbee.retrieval.concepts import ConceptGraph
268 from lilbee.retrieval.query import Searcher
270 _svc.store.close()
271 store = Store(cfg)
272 concepts = ConceptGraph(cfg, store)
273 clusterer = Clusterer(cfg, store)
274 searcher = Searcher(cfg, _svc.provider, store, _svc.embedder, _svc.reranker, concepts)
275 _svc = replace(
276 _svc,
277 store=store,
278 concepts=concepts,
279 clusterer=clusterer,
280 searcher=searcher,
281 )
284atexit.register(reset_services)