Coverage for src / lilbee / app / services.py: 100%
115 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Typed service container: single point of access for all singletons.
3All runtime dependencies (provider, store, embedder, reranker, concepts,
4clusterer, searcher, worker pool) are created lazily on first call to
5``get_services()`` and cached for the process lifetime. Tests call
6``reset_services()`` between runs.
7"""
9from __future__ import annotations
11import asyncio
12import atexit
13import threading
14from collections.abc import Callable
15from dataclasses import dataclass, field
16from typing import TYPE_CHECKING
18from lilbee.providers.worker.pool import shutdown_pool_runtime
20_RELOAD_CLOSE_TIMEOUT_S = 5.0
21"""Wall-clock budget for closing a detached worker channel during reload_role.
23Matches ``_DEFAULT_SHUTDOWN_TIMEOUT_S`` in ``providers.worker.pool``: a worker
24that does not ack SHUTDOWN within this window is terminated so the new model
25load is not blocked.
26"""
28if TYPE_CHECKING:
29 from lilbee.catalog.hf_client import HfClient
30 from lilbee.data.store import Store
31 from lilbee.modelhub.model_manager import ModelManager
32 from lilbee.modelhub.registry import ModelRegistry
33 from lilbee.providers.base import LLMProvider
34 from lilbee.providers.worker.health_ticker import HealthTickerHandle
35 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool
36 from lilbee.providers.worker.transport import WorkerRole
37 from lilbee.retrieval.clustering import Clusterer
38 from lilbee.retrieval.concepts import ConceptGraph
39 from lilbee.retrieval.embedder import Embedder
40 from lilbee.retrieval.query import Searcher
41 from lilbee.retrieval.reranker import Reranker
42 from lilbee.runtime.ingest_lock import IngestLockRegistry
45@dataclass
46class CrawlerSyncState:
47 """Process-wide sync coordination state (lock + last-run timestamp)."""
49 lock: threading.Lock = field(default_factory=threading.Lock)
50 last_run: float = 0.0
53@dataclass(frozen=True)
54class Services:
55 """Holds all runtime service instances.
57 The worker pool sits on Services (not on the provider) so any
58 subsystem can reach it for cancellation, health checks, or
59 diagnostics without crossing into ``LlamaCppProvider``'s private
60 API. ``cancel_inference()`` is the canonical entry point used by
61 Ctrl+C and the chat-stream cancel action.
62 """
64 provider: LLMProvider
65 store: Store
66 embedder: Embedder
67 reranker: Reranker
68 concepts: ConceptGraph
69 clusterer: Clusterer
70 searcher: Searcher
71 registry: ModelRegistry
72 hf_client: HfClient
73 ingest_lock_registry: IngestLockRegistry
74 model_manager: ModelManager
75 crawler_semaphore: asyncio.Semaphore | None
76 crawler_sync_state: CrawlerSyncState
77 worker_pool: WorkerPool
78 pool_runtime: PoolRuntime
79 pool_health_ticker: HealthTickerHandle
81 def cancel_inference(self) -> None:
82 """Flip the abort flag on every registered worker pool role. Idempotent."""
83 for role_name in self.worker_pool.registered_roles:
84 self.worker_pool.accessor(role_name).cancel()
86 def reload_role(self, role_name: WorkerRole) -> None:
87 """Drop *role_name*'s current worker so the next call lazy-respawns with cfg.
89 Detaches the channel synchronously (subsequent calls see no live worker),
90 then closes the old channel in the background on the pool runtime so the
91 caller's event loop is not stalled. Other roles' workers and any
92 in-flight stream they own are untouched. Use when only one role-bound
93 model setting has changed (e.g. embedding_model).
94 """
95 channel = self.worker_pool.detach_channel(role_name)
96 if channel is None:
97 return
99 async def _close() -> None:
100 await channel.close(timeout=_RELOAD_CLOSE_TIMEOUT_S)
102 self.pool_runtime.submit(_close())
104 def add_pool_listener(
105 self,
106 *,
107 on_spawning: Callable[[WorkerRole], None] | None = None,
108 on_spawned: Callable[[WorkerRole], None] | None = None,
109 ) -> None:
110 """Subscribe to worker spawn lifecycle events.
112 Forwards directly to :meth:`WorkerPool.add_listener`. The TUI uses this
113 to surface "Starting <role> worker..." / "<role> worker ready"
114 notifications during the cold-start window.
115 """
116 self.worker_pool.add_listener(on_spawning=on_spawning, on_spawned=on_spawned)
119_svc: Services | None = None
120"""Cached singleton, set on first ``get_services()`` call.
122Concurrency contract: lilbee runs the asyncio loop on a single worker
123thread + Textual's main thread. ``get_services()`` is idempotent (the
124``if _svc is not None: return`` early-out covers re-entry from a
125background thread). Tests that need a custom container call
126``set_services(make_mock_services(...))`` explicitly; ``peek_services()``
127is the read-only inspector for cleanup fixtures. The Services dataclass
128itself is logically immutable post-construction (its fields are
129references to long-lived service objects), so concurrent reads are safe
130without a lock.
131"""
134def get_services() -> Services:
135 """Return the cached Services singleton, creating on first call.
137 Service modules are imported inside the function to keep CLI
138 startup fast: ``services`` is on every CLI import path, and the
139 concrete service modules transitively pull in heavy libraries
140 (llama-cpp, lancedb, kreuzberg). Deferring the loads until first
141 ``get_services()`` call makes ``lilbee --help`` and TUI splash
142 render in milliseconds instead of seconds.
143 """
144 global _svc
145 if _svc is not None:
146 return _svc
148 from lilbee.catalog.hf_client import HfClient
149 from lilbee.core.config import cfg
150 from lilbee.data.store import Store
151 from lilbee.modelhub.model_manager import ModelManager
152 from lilbee.modelhub.registry import ModelRegistry
153 from lilbee.providers.factory import create_provider
154 from lilbee.providers.worker.health_ticker import start_health_ticker
155 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool
156 from lilbee.providers.worker.transport import default_spawner
157 from lilbee.retrieval.clustering import Clusterer
158 from lilbee.retrieval.concepts import ConceptGraph
159 from lilbee.retrieval.embedder import Embedder
160 from lilbee.retrieval.query import Searcher
161 from lilbee.retrieval.reranker import Reranker
162 from lilbee.runtime.asyncio_loop import get_loop
163 from lilbee.runtime.ingest_lock import IngestLockRegistry
165 worker_pool = WorkerPool(
166 spawner=default_spawner(),
167 max_idle_s=cfg.worker_pool_max_idle_s,
168 )
169 pool_runtime = PoolRuntime()
170 provider = create_provider(cfg)
171 store = Store(cfg)
172 embedder = Embedder(cfg, provider)
173 reranker = Reranker(cfg)
174 concepts = ConceptGraph(cfg, store)
175 clusterer = Clusterer(cfg, store)
176 registry = ModelRegistry(cfg.models_dir)
177 searcher = Searcher(cfg, provider, store, embedder, reranker, concepts)
178 hf_client = HfClient()
179 ingest_lock_registry = IngestLockRegistry()
180 model_manager = ModelManager(cfg.models_dir, cfg.remote_base_url)
181 crawler_semaphore = (
182 asyncio.Semaphore(cfg.crawl_max_concurrent) if cfg.crawl_max_concurrent > 0 else None
183 )
184 crawler_sync_state = CrawlerSyncState()
185 pool_health_ticker: HealthTickerHandle = start_health_ticker(
186 worker_pool, pool_runtime, get_loop()
187 )
188 _svc = Services(
189 provider=provider,
190 store=store,
191 embedder=embedder,
192 reranker=reranker,
193 concepts=concepts,
194 clusterer=clusterer,
195 searcher=searcher,
196 registry=registry,
197 hf_client=hf_client,
198 ingest_lock_registry=ingest_lock_registry,
199 model_manager=model_manager,
200 crawler_semaphore=crawler_semaphore,
201 crawler_sync_state=crawler_sync_state,
202 worker_pool=worker_pool,
203 pool_runtime=pool_runtime,
204 pool_health_ticker=pool_health_ticker,
205 )
206 # Eager start is the default: pay 1-3 s per worker at TUI mount so the
207 # first user action lands on a warm pool. Roles whose model is unset are
208 # skipped, so a setup with only chat + embed never spawns rerank or
209 # vision. Set ``cfg.worker_pool_eager_start = false`` for headless
210 # scripts where mount time matters more than first-call latency.
211 if cfg.worker_pool_eager_start:
212 from contextlib import suppress
214 with suppress(Exception):
215 provider.warm_up_pool()
216 pool_runtime.start()
217 pool_runtime.run_sync(worker_pool.start_eager(), timeout=30.0)
218 return _svc
221def set_services(services: Services | None) -> None:
222 """Replace the cached Services singleton (for testing)."""
223 global _svc
224 _svc = services
227def peek_services() -> Services | None:
228 """Return the cached Services container, or None if not yet initialized.
230 Public read-only accessor for test cleanup helpers that need to
231 inspect the singleton without forcing initialization.
232 """
233 return _svc
236def reset_services() -> None:
237 """Shut down and discard all cached instances."""
238 global _svc
239 if _svc is not None:
240 shutdown_pool_runtime(_svc.worker_pool, _svc.pool_runtime, _svc.pool_health_ticker)
241 _svc.provider.shutdown()
242 _svc.store.close()
243 _svc = None
246def reset_store() -> None:
247 """Close and rebuild only the Store and its dependents; keep providers loaded.
249 Used after a data-dir wipe (``/reset``) where the LanceDB handle is invalid
250 but the loaded llama-cpp/embedder/reranker models are still good. Avoids the
251 multi-second reload cost of ``reset_services()``.
252 """
253 global _svc
254 if _svc is None:
255 return
256 from dataclasses import replace
258 from lilbee.core.config import cfg
259 from lilbee.data.store import Store
260 from lilbee.retrieval.clustering import Clusterer
261 from lilbee.retrieval.concepts import ConceptGraph
262 from lilbee.retrieval.query import Searcher
264 _svc.store.close()
265 store = Store(cfg)
266 concepts = ConceptGraph(cfg, store)
267 clusterer = Clusterer(cfg, store)
268 searcher = Searcher(cfg, _svc.provider, store, _svc.embedder, _svc.reranker, concepts)
269 _svc = replace(
270 _svc,
271 store=store,
272 concepts=concepts,
273 clusterer=clusterer,
274 searcher=searcher,
275 )
278atexit.register(reset_services)