Coverage for src / lilbee / app / services.py: 100%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Typed service container: single point of access for all singletons. 

2 

3All runtime dependencies (provider, store, embedder, reranker, concepts, 

4clusterer, searcher, worker pool) are created lazily on first call to 

5``get_services()`` and cached for the process lifetime. Tests call 

6``reset_services()`` between runs. 

7""" 

8 

9from __future__ import annotations 

10 

11import asyncio 

12import atexit 

13import threading 

14from collections.abc import Callable 

15from dataclasses import dataclass, field 

16from typing import TYPE_CHECKING 

17 

18from lilbee.providers.worker.pool import shutdown_pool_runtime 

19 

20_RELOAD_CLOSE_TIMEOUT_S = 5.0 

21"""Wall-clock budget for closing a detached worker channel during reload_role. 

22 

23Matches ``_DEFAULT_SHUTDOWN_TIMEOUT_S`` in ``providers.worker.pool``: a worker 

24that does not ack SHUTDOWN within this window is terminated so the new model 

25load is not blocked. 

26""" 

27 

28if TYPE_CHECKING: 

29 from lilbee.catalog.hf_client import HfClient 

30 from lilbee.data.store import Store 

31 from lilbee.modelhub.model_manager import ModelManager 

32 from lilbee.modelhub.registry import ModelRegistry 

33 from lilbee.providers.base import LLMProvider 

34 from lilbee.providers.worker.health_ticker import HealthTickerHandle 

35 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool 

36 from lilbee.providers.worker.transport import WorkerRole 

37 from lilbee.retrieval.clustering import Clusterer 

38 from lilbee.retrieval.concepts import ConceptGraph 

39 from lilbee.retrieval.embedder import Embedder 

40 from lilbee.retrieval.query import Searcher 

41 from lilbee.retrieval.reranker import Reranker 

42 from lilbee.runtime.ingest_lock import IngestLockRegistry 

43 

44 

45@dataclass 

46class CrawlerSyncState: 

47 """Process-wide sync coordination state (lock + last-run timestamp).""" 

48 

49 lock: threading.Lock = field(default_factory=threading.Lock) 

50 last_run: float = 0.0 

51 

52 

53@dataclass(frozen=True) 

54class Services: 

55 """Holds all runtime service instances. 

56 

57 The worker pool sits on Services (not on the provider) so any 

58 subsystem can reach it for cancellation, health checks, or 

59 diagnostics without crossing into ``LlamaCppProvider``'s private 

60 API. ``cancel_inference()`` is the canonical entry point used by 

61 Ctrl+C and the chat-stream cancel action. 

62 """ 

63 

64 provider: LLMProvider 

65 store: Store 

66 embedder: Embedder 

67 reranker: Reranker 

68 concepts: ConceptGraph 

69 clusterer: Clusterer 

70 searcher: Searcher 

71 registry: ModelRegistry 

72 hf_client: HfClient 

73 ingest_lock_registry: IngestLockRegistry 

74 model_manager: ModelManager 

75 crawler_semaphore: asyncio.Semaphore | None 

76 crawler_sync_state: CrawlerSyncState 

77 worker_pool: WorkerPool 

78 pool_runtime: PoolRuntime 

79 pool_health_ticker: HealthTickerHandle 

80 

81 def cancel_inference(self) -> None: 

82 """Flip the abort flag on every registered worker pool role. Idempotent.""" 

83 for role_name in self.worker_pool.registered_roles: 

84 self.worker_pool.accessor(role_name).cancel() 

85 

86 def reload_role(self, role_name: WorkerRole) -> None: 

87 """Drop *role_name*'s current worker so the next call lazy-respawns with cfg. 

88 

89 Detaches the channel synchronously (subsequent calls see no live worker), 

90 then closes the old channel in the background on the pool runtime so the 

91 caller's event loop is not stalled. Other roles' workers and any 

92 in-flight stream they own are untouched. Use when only one role-bound 

93 model setting has changed (e.g. embedding_model). 

94 """ 

95 channel = self.worker_pool.detach_channel(role_name) 

96 if channel is None: 

97 return 

98 

99 async def _close() -> None: 

100 await channel.close(timeout=_RELOAD_CLOSE_TIMEOUT_S) 

101 

102 self.pool_runtime.submit(_close()) 

103 

104 def add_pool_listener( 

105 self, 

106 *, 

107 on_spawning: Callable[[WorkerRole], None] | None = None, 

108 on_spawned: Callable[[WorkerRole], None] | None = None, 

109 ) -> None: 

110 """Subscribe to worker spawn lifecycle events. 

111 

112 Forwards directly to :meth:`WorkerPool.add_listener`. The TUI uses this 

113 to surface "Starting <role> worker..." / "<role> worker ready" 

114 notifications during the cold-start window. 

115 """ 

116 self.worker_pool.add_listener(on_spawning=on_spawning, on_spawned=on_spawned) 

117 

118 

119_svc: Services | None = None 

120"""Cached singleton, set on first ``get_services()`` call. 

121 

122Concurrency contract: lilbee runs the asyncio loop on a single worker 

123thread + Textual's main thread. ``get_services()`` is idempotent (the 

124``if _svc is not None: return`` early-out covers re-entry from a 

125background thread). Tests that need a custom container call 

126``set_services(make_mock_services(...))`` explicitly; ``peek_services()`` 

127is the read-only inspector for cleanup fixtures. The Services dataclass 

128itself is logically immutable post-construction (its fields are 

129references to long-lived service objects), so concurrent reads are safe 

130without a lock. 

131""" 

132 

133 

134def get_services() -> Services: 

135 """Return the cached Services singleton, creating on first call. 

136 

137 Service modules are imported inside the function to keep CLI 

138 startup fast: ``services`` is on every CLI import path, and the 

139 concrete service modules transitively pull in heavy libraries 

140 (llama-cpp, lancedb, kreuzberg). Deferring the loads until first 

141 ``get_services()`` call makes ``lilbee --help`` and TUI splash 

142 render in milliseconds instead of seconds. 

143 """ 

144 global _svc 

145 if _svc is not None: 

146 return _svc 

147 

148 from lilbee.catalog.hf_client import HfClient 

149 from lilbee.core.config import cfg 

150 from lilbee.data.store import Store 

151 from lilbee.modelhub.model_manager import ModelManager 

152 from lilbee.modelhub.registry import ModelRegistry 

153 from lilbee.providers.factory import create_provider 

154 from lilbee.providers.worker.health_ticker import start_health_ticker 

155 from lilbee.providers.worker.pool import PoolRuntime, WorkerPool 

156 from lilbee.providers.worker.transport import default_spawner 

157 from lilbee.retrieval.clustering import Clusterer 

158 from lilbee.retrieval.concepts import ConceptGraph 

159 from lilbee.retrieval.embedder import Embedder 

160 from lilbee.retrieval.query import Searcher 

161 from lilbee.retrieval.reranker import Reranker 

162 from lilbee.runtime.asyncio_loop import get_loop 

163 from lilbee.runtime.ingest_lock import IngestLockRegistry 

164 

165 worker_pool = WorkerPool( 

166 spawner=default_spawner(), 

167 max_idle_s=cfg.worker_pool_max_idle_s, 

168 ) 

169 pool_runtime = PoolRuntime() 

170 provider = create_provider(cfg) 

171 store = Store(cfg) 

172 embedder = Embedder(cfg, provider) 

173 reranker = Reranker(cfg) 

174 concepts = ConceptGraph(cfg, store) 

175 clusterer = Clusterer(cfg, store) 

176 registry = ModelRegistry(cfg.models_dir) 

177 searcher = Searcher(cfg, provider, store, embedder, reranker, concepts) 

178 hf_client = HfClient() 

179 ingest_lock_registry = IngestLockRegistry() 

180 model_manager = ModelManager(cfg.models_dir, cfg.remote_base_url) 

181 crawler_semaphore = ( 

182 asyncio.Semaphore(cfg.crawl_max_concurrent) if cfg.crawl_max_concurrent > 0 else None 

183 ) 

184 crawler_sync_state = CrawlerSyncState() 

185 pool_health_ticker: HealthTickerHandle = start_health_ticker( 

186 worker_pool, pool_runtime, get_loop() 

187 ) 

188 _svc = Services( 

189 provider=provider, 

190 store=store, 

191 embedder=embedder, 

192 reranker=reranker, 

193 concepts=concepts, 

194 clusterer=clusterer, 

195 searcher=searcher, 

196 registry=registry, 

197 hf_client=hf_client, 

198 ingest_lock_registry=ingest_lock_registry, 

199 model_manager=model_manager, 

200 crawler_semaphore=crawler_semaphore, 

201 crawler_sync_state=crawler_sync_state, 

202 worker_pool=worker_pool, 

203 pool_runtime=pool_runtime, 

204 pool_health_ticker=pool_health_ticker, 

205 ) 

206 # Eager start is the default: pay 1-3 s per worker at TUI mount so the 

207 # first user action lands on a warm pool. Roles whose model is unset are 

208 # skipped, so a setup with only chat + embed never spawns rerank or 

209 # vision. Set ``cfg.worker_pool_eager_start = false`` for headless 

210 # scripts where mount time matters more than first-call latency. 

211 if cfg.worker_pool_eager_start: 

212 from contextlib import suppress 

213 

214 with suppress(Exception): 

215 provider.warm_up_pool() 

216 pool_runtime.start() 

217 pool_runtime.run_sync(worker_pool.start_eager(), timeout=30.0) 

218 return _svc 

219 

220 

221def set_services(services: Services | None) -> None: 

222 """Replace the cached Services singleton (for testing).""" 

223 global _svc 

224 _svc = services 

225 

226 

227def peek_services() -> Services | None: 

228 """Return the cached Services container, or None if not yet initialized. 

229 

230 Public read-only accessor for test cleanup helpers that need to 

231 inspect the singleton without forcing initialization. 

232 """ 

233 return _svc 

234 

235 

236def reset_services() -> None: 

237 """Shut down and discard all cached instances.""" 

238 global _svc 

239 if _svc is not None: 

240 shutdown_pool_runtime(_svc.worker_pool, _svc.pool_runtime, _svc.pool_health_ticker) 

241 _svc.provider.shutdown() 

242 _svc.store.close() 

243 _svc = None 

244 

245 

246def reset_store() -> None: 

247 """Close and rebuild only the Store and its dependents; keep providers loaded. 

248 

249 Used after a data-dir wipe (``/reset``) where the LanceDB handle is invalid 

250 but the loaded llama-cpp/embedder/reranker models are still good. Avoids the 

251 multi-second reload cost of ``reset_services()``. 

252 """ 

253 global _svc 

254 if _svc is None: 

255 return 

256 from dataclasses import replace 

257 

258 from lilbee.core.config import cfg 

259 from lilbee.data.store import Store 

260 from lilbee.retrieval.clustering import Clusterer 

261 from lilbee.retrieval.concepts import ConceptGraph 

262 from lilbee.retrieval.query import Searcher 

263 

264 _svc.store.close() 

265 store = Store(cfg) 

266 concepts = ConceptGraph(cfg, store) 

267 clusterer = Clusterer(cfg, store) 

268 searcher = Searcher(cfg, _svc.provider, store, _svc.embedder, _svc.reranker, concepts) 

269 _svc = replace( 

270 _svc, 

271 store=store, 

272 concepts=concepts, 

273 clusterer=clusterer, 

274 searcher=searcher, 

275 ) 

276 

277 

278atexit.register(reset_services)