Coverage for src / lilbee / modelhub / model_manager / core.py: 100%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""ModelManager: native and SDK-backed model lifecycle operations.""" 

2 

3import logging 

4import time 

5from collections.abc import Callable 

6from pathlib import Path 

7 

8from lilbee.catalog.types import ModelSource 

9from lilbee.core.config import DEFAULT_HTTP_TIMEOUT 

10from lilbee.core.security import validate_path_within 

11from lilbee.modelhub.model_manager.types import ModelNotFoundError 

12from lilbee.modelhub.registry import ModelRegistry 

13from lilbee.providers.local_servers import LOCAL_SERVERS, local_server_for_key 

14from lilbee.providers.model_ref import parse_model_ref 

15 

16log = logging.getLogger(__name__) 

17 

18_INSTALLED_CACHE_TTL_SECONDS = 30.0 

19 

20 

21def _prefixed_source(model: str) -> ModelSource | None: 

22 """Map a provider-prefixed ref to its source, or ``None`` for a bare ref. 

23 

24 Local-server prefixes (``ollama/``, ``lm_studio/``) map to that server's 

25 source; API-provider prefixes are FRONTIER. A bare name returns ``None`` 

26 so the caller falls back to backend membership. 

27 """ 

28 for spec in LOCAL_SERVERS: 

29 if model.startswith(spec.wire_prefix): 

30 return ModelSource(spec.key) 

31 try: 

32 ref = parse_model_ref(model) 

33 except ValueError: 

34 return None 

35 return ModelSource.FRONTIER if ref.is_api else None 

36 

37 

38class ModelManager: 

39 """Manages model lifecycle with distinct sources.""" 

40 

41 def __init__(self, models_dir: Path) -> None: 

42 self._models_dir = models_dir 

43 self._registry = ModelRegistry(self._models_dir) 

44 # Memoize list_installed results to avoid walking the registry 

45 # filesystem and hitting the backend HTTP endpoint on every call. 

46 # The catalog filter path fires this per request. Time-based TTL 

47 # plus explicit invalidation on pull/remove keeps freshness. 

48 self._installed_cache: dict[ModelSource | None, tuple[float, list[str]]] = {} 

49 # Identity cache: refs + hf_repos of installed natives. The catalog 

50 # screen reads this to mark rows as installed without re-walking 

51 # the registry on every screen mount (~150-300 ms saved). 

52 self._native_identities_cache: tuple[float, frozenset[str]] | None = None 

53 

54 def list_installed(self, source: ModelSource | None = None) -> list[str]: 

55 """List installed model names. ``source=None`` lists all sources. 

56 

57 Memoized with a ``_INSTALLED_CACHE_TTL_SECONDS`` TTL and 

58 invalidated eagerly by ``pull``/``remove``. 

59 """ 

60 now = time.monotonic() 

61 cached = self._installed_cache.get(source) 

62 if cached is not None: 

63 cached_at, cached_result = cached 

64 if now - cached_at < _INSTALLED_CACHE_TTL_SECONDS: 

65 return cached_result 

66 

67 if source is None: 

68 native = set(self._list_native()) 

69 remote = set(self._list_remote()) 

70 result = sorted(native | remote) 

71 elif source is ModelSource.NATIVE: 

72 result = self._list_native() 

73 else: 

74 result = self._list_remote() 

75 

76 self._installed_cache[source] = (now, result) 

77 return result 

78 

79 def list_native_identities(self) -> frozenset[str]: 

80 """Return refs + hf_repos of installed native models. 

81 

82 Same TTL as ``list_installed``. The catalog screen reads this to 

83 mark catalog rows as installed without re-walking the registry 

84 on every screen mount. 

85 """ 

86 now = time.monotonic() 

87 if self._native_identities_cache is not None: 

88 cached_at, cached_result = self._native_identities_cache 

89 if now - cached_at < _INSTALLED_CACHE_TTL_SECONDS: 

90 return cached_result 

91 identities: set[str] = set() 

92 try: 

93 for m in self._registry.list_installed(): 

94 identities.add(m.ref) 

95 identities.add(m.hf_repo) 

96 except Exception: 

97 log.debug("ModelRegistry.list_installed failed", exc_info=True) 

98 result = frozenset(identities) 

99 self._native_identities_cache = (now, result) 

100 return result 

101 

102 def _invalidate_installed_cache(self) -> None: 

103 """Drop all cached list_installed results.""" 

104 self._installed_cache.clear() 

105 self._native_identities_cache = None 

106 

107 def _list_native(self) -> list[str]: 

108 """List native models from the registry only.""" 

109 return sorted(m.ref for m in self._registry.list_installed()) 

110 

111 def _list_remote(self) -> list[str]: 

112 """List model names across every configured local server (Ollama, LM Studio). 

113 

114 Reuses the discovery dispatch so each listing endpoint matches its 

115 server (Ollama ``/api/tags`` vs LM Studio ``/v1/models``). Returns 

116 ``[]`` when the backends are unreachable. 

117 """ 

118 # circular: discovery -> app.services -> model_manager.__init__ -> core 

119 from lilbee.modelhub.model_manager.discovery import classify_all_remote_models 

120 

121 models = classify_all_remote_models(timeout=DEFAULT_HTTP_TIMEOUT) 

122 return [m.name for m in models] 

123 

124 def is_installed(self, model: str, source: ModelSource | None = None) -> bool: 

125 """Check if model exists in specified source.""" 

126 if source is None: 

127 return self._is_native(model) or self._is_remote(model) 

128 if source is ModelSource.NATIVE: 

129 return self._is_native(model) 

130 return self._is_remote(model) 

131 

132 def _is_native(self, model: str) -> bool: 

133 if self._registry.is_installed(model): 

134 return True 

135 try: 

136 validate_path_within(self._models_dir / model, self._models_dir) 

137 except ValueError: 

138 return False 

139 return (self._models_dir / model).is_file() 

140 

141 def _is_remote(self, model: str) -> bool: 

142 return model in self.list_installed(ModelSource.REMOTE) 

143 

144 def get_source(self, model: str) -> ModelSource | None: 

145 """Return the granular source a model lives in. Native takes precedence. 

146 

147 A provider-prefixed ref classifies without a network call; a bare name 

148 that a backend reports installed is ``REMOTE`` (the prefix is what names 

149 the specific server). ``None`` when the model is in no known source. 

150 """ 

151 if self._is_native(model): 

152 return ModelSource.NATIVE 

153 prefixed = _prefixed_source(model) 

154 if prefixed is not None: 

155 return prefixed 

156 if self._is_remote(model): 

157 return ModelSource.REMOTE 

158 return None 

159 

160 def pull( 

161 self, 

162 model: str, 

163 source: ModelSource, 

164 *, 

165 on_bytes: Callable[[int, int], None] | None = None, 

166 allow_unsupported: bool = False, 

167 ) -> Path | None: 

168 """Download a native GGUF model and return its path. 

169 

170 lilbee pulls native models only. Local servers (Ollama, LM Studio) 

171 are read-only: their models are managed in their own app and surface 

172 here once present, so a non-native *source* is refused. 

173 

174 Native pulls of architectures the bundled llama.cpp doesn't support 

175 are refused with ``UnsupportedArchError`` unless *allow_unsupported* 

176 is True. *on_bytes* receives (downloaded_bytes, total_bytes) progress. 

177 """ 

178 if source is not ModelSource.NATIVE: 

179 spec = local_server_for_key(source.value) 

180 where = spec.display_name if spec is not None else "the configured server" 

181 raise ValueError( 

182 f"lilbee runs {where} models but doesn't download them. " 

183 f"Add the model in {where}, then pick it here." 

184 ) 

185 if not allow_unsupported: 

186 self._enforce_arch_compat(model) 

187 try: 

188 return self._pull_native(model, on_bytes=on_bytes) 

189 finally: 

190 self._invalidate_installed_cache() 

191 

192 def _enforce_arch_compat(self, ref: str) -> None: 

193 """Raise UnsupportedArchError if *ref*'s architecture isn't in the supported set.""" 

194 from lilbee.app.services import get_services 

195 from lilbee.catalog.compat import ( 

196 ModelCompat, 

197 UnsupportedArchError, 

198 classify, 

199 resolve_arch_for_pull, 

200 ) 

201 

202 arch = resolve_arch_for_pull(ref, get_services().hf_client) 

203 if classify(arch) is ModelCompat.UNSUPPORTED: 

204 raise UnsupportedArchError(ref, arch) 

205 

206 def _pull_native( 

207 self, 

208 model: str, 

209 *, 

210 on_bytes: Callable[[int, int], None] | None = None, 

211 ) -> Path: 

212 """Download a featured or ad-hoc HuggingFace model to the native GGUF directory.""" 

213 # heavy: lilbee.catalog (>50ms; huggingface_hub fanout) 

214 from lilbee.catalog import download_model, resolve_pull_target 

215 from lilbee.modelhub.registry import register_downloaded_model 

216 

217 entry = resolve_pull_target(model) 

218 if entry is None: 

219 raise ModelNotFoundError( 

220 f"Model '{model}' not recognized. " 

221 "Pass a HuggingFace repo id (owner/name) or a featured model name." 

222 ) 

223 path = download_model(entry, on_progress=on_bytes, on_complete=register_downloaded_model) 

224 log.info("Downloaded %s to %s", model, path) 

225 return path 

226 

227 def remove(self, model: str, source: ModelSource | None = None) -> bool: 

228 """Remove an installed native model. Returns True if removed. 

229 

230 lilbee removes only native GGUF models it downloaded. Local servers 

231 (Ollama, LM Studio) are read-only: a model that lives on one is refused 

232 (mirrors ``pull``), since its lifecycle is managed in that app. A bare 

233 ``source`` is resolved so a local-server ref is caught either way. 

234 """ 

235 effective = source if source is not None else self.get_source(model) 

236 if effective is not None and effective is not ModelSource.NATIVE: 

237 spec = local_server_for_key(effective.value) 

238 where = spec.display_name if spec is not None else "the configured server" 

239 raise ValueError( 

240 f"lilbee runs {where} models but doesn't remove them. " 

241 f"Manage them in {where} instead." 

242 ) 

243 try: 

244 return self._remove_native(model) 

245 finally: 

246 self._invalidate_installed_cache() 

247 

248 def _remove_native(self, model: str) -> bool: 

249 if self._registry.remove(model): 

250 log.info("Removed native model %s from registry", model) 

251 return True 

252 try: 

253 path = validate_path_within(self._models_dir / model, self._models_dir) 

254 except ValueError: 

255 log.warning("Path traversal blocked: %s escapes %s", model, self._models_dir) 

256 return False 

257 if path.is_file(): 

258 path.unlink() 

259 log.info("Removed native model %s", model) 

260 return True 

261 return False