Coverage for src / lilbee / modelhub / model_manager / core.py: 100%
142 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""ModelManager: native and SDK-backed model lifecycle operations."""
3import logging
4import time
5from collections.abc import Callable
6from pathlib import Path
8from lilbee.catalog.types import ModelSource
9from lilbee.core.config import DEFAULT_HTTP_TIMEOUT
10from lilbee.core.security import validate_path_within
11from lilbee.modelhub.model_manager.types import ModelNotFoundError
12from lilbee.modelhub.registry import ModelRegistry
13from lilbee.providers.local_servers import LOCAL_SERVERS, local_server_for_key
14from lilbee.providers.model_ref import parse_model_ref
16log = logging.getLogger(__name__)
18_INSTALLED_CACHE_TTL_SECONDS = 30.0
21def _prefixed_source(model: str) -> ModelSource | None:
22 """Map a provider-prefixed ref to its source, or ``None`` for a bare ref.
24 Local-server prefixes (``ollama/``, ``lm_studio/``) map to that server's
25 source; API-provider prefixes are FRONTIER. A bare name returns ``None``
26 so the caller falls back to backend membership.
27 """
28 for spec in LOCAL_SERVERS:
29 if model.startswith(spec.wire_prefix):
30 return ModelSource(spec.key)
31 try:
32 ref = parse_model_ref(model)
33 except ValueError:
34 return None
35 return ModelSource.FRONTIER if ref.is_api else None
38class ModelManager:
39 """Manages model lifecycle with distinct sources."""
41 def __init__(self, models_dir: Path) -> None:
42 self._models_dir = models_dir
43 self._registry = ModelRegistry(self._models_dir)
44 # Memoize list_installed results to avoid walking the registry
45 # filesystem and hitting the backend HTTP endpoint on every call.
46 # The catalog filter path fires this per request. Time-based TTL
47 # plus explicit invalidation on pull/remove keeps freshness.
48 self._installed_cache: dict[ModelSource | None, tuple[float, list[str]]] = {}
49 # Identity cache: refs + hf_repos of installed natives. The catalog
50 # screen reads this to mark rows as installed without re-walking
51 # the registry on every screen mount (~150-300 ms saved).
52 self._native_identities_cache: tuple[float, frozenset[str]] | None = None
54 def list_installed(self, source: ModelSource | None = None) -> list[str]:
55 """List installed model names. ``source=None`` lists all sources.
57 Memoized with a ``_INSTALLED_CACHE_TTL_SECONDS`` TTL and
58 invalidated eagerly by ``pull``/``remove``.
59 """
60 now = time.monotonic()
61 cached = self._installed_cache.get(source)
62 if cached is not None:
63 cached_at, cached_result = cached
64 if now - cached_at < _INSTALLED_CACHE_TTL_SECONDS:
65 return cached_result
67 if source is None:
68 native = set(self._list_native())
69 remote = set(self._list_remote())
70 result = sorted(native | remote)
71 elif source is ModelSource.NATIVE:
72 result = self._list_native()
73 else:
74 result = self._list_remote()
76 self._installed_cache[source] = (now, result)
77 return result
79 def list_native_identities(self) -> frozenset[str]:
80 """Return refs + hf_repos of installed native models.
82 Same TTL as ``list_installed``. The catalog screen reads this to
83 mark catalog rows as installed without re-walking the registry
84 on every screen mount.
85 """
86 now = time.monotonic()
87 if self._native_identities_cache is not None:
88 cached_at, cached_result = self._native_identities_cache
89 if now - cached_at < _INSTALLED_CACHE_TTL_SECONDS:
90 return cached_result
91 identities: set[str] = set()
92 try:
93 for m in self._registry.list_installed():
94 identities.add(m.ref)
95 identities.add(m.hf_repo)
96 except Exception:
97 log.debug("ModelRegistry.list_installed failed", exc_info=True)
98 result = frozenset(identities)
99 self._native_identities_cache = (now, result)
100 return result
102 def _invalidate_installed_cache(self) -> None:
103 """Drop all cached list_installed results."""
104 self._installed_cache.clear()
105 self._native_identities_cache = None
107 def _list_native(self) -> list[str]:
108 """List native models from the registry only."""
109 return sorted(m.ref for m in self._registry.list_installed())
111 def _list_remote(self) -> list[str]:
112 """List model names across every configured local server (Ollama, LM Studio).
114 Reuses the discovery dispatch so each listing endpoint matches its
115 server (Ollama ``/api/tags`` vs LM Studio ``/v1/models``). Returns
116 ``[]`` when the backends are unreachable.
117 """
118 # circular: discovery -> app.services -> model_manager.__init__ -> core
119 from lilbee.modelhub.model_manager.discovery import classify_all_remote_models
121 models = classify_all_remote_models(timeout=DEFAULT_HTTP_TIMEOUT)
122 return [m.name for m in models]
124 def is_installed(self, model: str, source: ModelSource | None = None) -> bool:
125 """Check if model exists in specified source."""
126 if source is None:
127 return self._is_native(model) or self._is_remote(model)
128 if source is ModelSource.NATIVE:
129 return self._is_native(model)
130 return self._is_remote(model)
132 def _is_native(self, model: str) -> bool:
133 if self._registry.is_installed(model):
134 return True
135 try:
136 validate_path_within(self._models_dir / model, self._models_dir)
137 except ValueError:
138 return False
139 return (self._models_dir / model).is_file()
141 def _is_remote(self, model: str) -> bool:
142 return model in self.list_installed(ModelSource.REMOTE)
144 def get_source(self, model: str) -> ModelSource | None:
145 """Return the granular source a model lives in. Native takes precedence.
147 A provider-prefixed ref classifies without a network call; a bare name
148 that a backend reports installed is ``REMOTE`` (the prefix is what names
149 the specific server). ``None`` when the model is in no known source.
150 """
151 if self._is_native(model):
152 return ModelSource.NATIVE
153 prefixed = _prefixed_source(model)
154 if prefixed is not None:
155 return prefixed
156 if self._is_remote(model):
157 return ModelSource.REMOTE
158 return None
160 def pull(
161 self,
162 model: str,
163 source: ModelSource,
164 *,
165 on_bytes: Callable[[int, int], None] | None = None,
166 allow_unsupported: bool = False,
167 ) -> Path | None:
168 """Download a native GGUF model and return its path.
170 lilbee pulls native models only. Local servers (Ollama, LM Studio)
171 are read-only: their models are managed in their own app and surface
172 here once present, so a non-native *source* is refused.
174 Native pulls of architectures the bundled llama.cpp doesn't support
175 are refused with ``UnsupportedArchError`` unless *allow_unsupported*
176 is True. *on_bytes* receives (downloaded_bytes, total_bytes) progress.
177 """
178 if source is not ModelSource.NATIVE:
179 spec = local_server_for_key(source.value)
180 where = spec.display_name if spec is not None else "the configured server"
181 raise ValueError(
182 f"lilbee runs {where} models but doesn't download them. "
183 f"Add the model in {where}, then pick it here."
184 )
185 if not allow_unsupported:
186 self._enforce_arch_compat(model)
187 try:
188 return self._pull_native(model, on_bytes=on_bytes)
189 finally:
190 self._invalidate_installed_cache()
192 def _enforce_arch_compat(self, ref: str) -> None:
193 """Raise UnsupportedArchError if *ref*'s architecture isn't in the supported set."""
194 from lilbee.app.services import get_services
195 from lilbee.catalog.compat import (
196 ModelCompat,
197 UnsupportedArchError,
198 classify,
199 resolve_arch_for_pull,
200 )
202 arch = resolve_arch_for_pull(ref, get_services().hf_client)
203 if classify(arch) is ModelCompat.UNSUPPORTED:
204 raise UnsupportedArchError(ref, arch)
206 def _pull_native(
207 self,
208 model: str,
209 *,
210 on_bytes: Callable[[int, int], None] | None = None,
211 ) -> Path:
212 """Download a featured or ad-hoc HuggingFace model to the native GGUF directory."""
213 # heavy: lilbee.catalog (>50ms; huggingface_hub fanout)
214 from lilbee.catalog import download_model, resolve_pull_target
215 from lilbee.modelhub.registry import register_downloaded_model
217 entry = resolve_pull_target(model)
218 if entry is None:
219 raise ModelNotFoundError(
220 f"Model '{model}' not recognized. "
221 "Pass a HuggingFace repo id (owner/name) or a featured model name."
222 )
223 path = download_model(entry, on_progress=on_bytes, on_complete=register_downloaded_model)
224 log.info("Downloaded %s to %s", model, path)
225 return path
227 def remove(self, model: str, source: ModelSource | None = None) -> bool:
228 """Remove an installed native model. Returns True if removed.
230 lilbee removes only native GGUF models it downloaded. Local servers
231 (Ollama, LM Studio) are read-only: a model that lives on one is refused
232 (mirrors ``pull``), since its lifecycle is managed in that app. A bare
233 ``source`` is resolved so a local-server ref is caught either way.
234 """
235 effective = source if source is not None else self.get_source(model)
236 if effective is not None and effective is not ModelSource.NATIVE:
237 spec = local_server_for_key(effective.value)
238 where = spec.display_name if spec is not None else "the configured server"
239 raise ValueError(
240 f"lilbee runs {where} models but doesn't remove them. "
241 f"Manage them in {where} instead."
242 )
243 try:
244 return self._remove_native(model)
245 finally:
246 self._invalidate_installed_cache()
248 def _remove_native(self, model: str) -> bool:
249 if self._registry.remove(model):
250 log.info("Removed native model %s from registry", model)
251 return True
252 try:
253 path = validate_path_within(self._models_dir / model, self._models_dir)
254 except ValueError:
255 log.warning("Path traversal blocked: %s escapes %s", model, self._models_dir)
256 return False
257 if path.is_file():
258 path.unlink()
259 log.info("Removed native model %s", model)
260 return True
261 return False