Coverage for src / lilbee / modelhub / registry.py: 100%
265 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Manifest store keyed by ``(hf_repo, gguf_filename)`` over the HF cache.
3Canonical ref: ``<hf_repo>/<gguf_filename>``. Two quants of the same
4repo are two distinct installations. Manifests live at
5``manifests/<repo--repo>/<filename>.json``; blobs at
6``models--<repo--repo>/blobs/<sha>``.
7"""
9from __future__ import annotations
11import hashlib
12import json
13import logging
14import os
15import re
16import shutil
17import tempfile
18from dataclasses import asdict, dataclass
19from pathlib import Path
20from typing import TYPE_CHECKING
22from lilbee.catalog.refs import format_native_gguf_ref, is_bare_hf_repo
23from lilbee.core.config.model import cfg
24from lilbee.core.security import validate_path_within
26if TYPE_CHECKING:
27 from lilbee.catalog.models import CatalogModel
28 from lilbee.catalog.types import ModelTask
30log = logging.getLogger(__name__)
32_HASH_CHUNK_SIZE = 8192 # bytes read per iteration when hashing
33_REPO_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+$")
34_FILENAME_RE = re.compile(r"^[a-zA-Z0-9._-]+\.gguf$")
36REPO_DIR_SEPARATOR = "--"
39def _validate_hf_repo(hf_repo: str) -> str:
40 """Validate that a HuggingFace repo id has the form ``org/name``."""
41 if not hf_repo or not _REPO_SEGMENT_RE.match(hf_repo) or ".." in hf_repo:
42 raise ValueError(f"Invalid hf_repo: {hf_repo!r}")
43 return hf_repo
46def _validate_gguf_filename(filename: str) -> str:
47 """Validate that a filename is a safe ``.gguf`` basename (no path separators)."""
48 if not filename or not _FILENAME_RE.match(filename) or ".." in filename:
49 raise ValueError(f"Invalid gguf_filename: {filename!r}")
50 return filename
53_REF_SHAPE_HINT = "Use '<org>/<repo>/<filename>.gguf'."
56def parse_hf_ref(ref: str) -> tuple[str, str]:
57 """Split ``<org>/<repo>/<file>.gguf`` into ``(hf_repo, gguf_filename)``."""
58 if not ref.endswith(".gguf") or "/" not in ref:
59 raise ValueError(f"Model ref {ref!r} is not a HuggingFace ref. {_REF_SHAPE_HINT}")
60 hf_repo, gguf_filename = ref.rsplit("/", 1)
61 return _validate_hf_repo(hf_repo), _validate_gguf_filename(gguf_filename)
64def repo_to_dir(hf_repo: str) -> str:
65 """Encode an HF repo for use as a directory name (HF cache convention)."""
66 return hf_repo.replace("/", REPO_DIR_SEPARATOR)
69@dataclass
70class ModelManifest:
71 """One installed model's metadata. Identity: ``(hf_repo, gguf_filename)``."""
73 hf_repo: str
74 gguf_filename: str
75 size_bytes: int
76 task: ModelTask
77 downloaded_at: str # ISO 8601
78 blob: str | None = None # SHA-256 hex of the blob in the HF cache; None pre-install
80 @property
81 def ref(self) -> str:
82 return format_native_gguf_ref(self.hf_repo, self.gguf_filename)
85def _copy_atomic(source_path: Path, blob_path: Path) -> None:
86 """Copy *source_path* to *blob_path* via a temp file + atomic rename.
88 A crash mid-copy leaves only the temp file, never a partial blob at
89 the final digest path that callers would treat as complete.
90 """
91 fd, tmp_name = tempfile.mkstemp(dir=str(blob_path.parent), suffix=".part")
92 tmp_path = Path(tmp_name)
93 try:
94 with os.fdopen(fd, "wb") as dst, source_path.open("rb") as src:
95 shutil.copyfileobj(src, dst)
96 os.replace(tmp_path, blob_path)
97 except BaseException:
98 tmp_path.unlink(missing_ok=True)
99 raise
102def _blob_size_matches(blob_file: Path, expected_size: int) -> bool:
103 """True iff *blob_file* exists and its byte size equals *expected_size*.
105 A blob shorter than the manifest's recorded size is a truncated /
106 interrupted download and must not count as installed.
107 """
108 try:
109 return blob_file.stat().st_size == expected_size
110 except OSError:
111 return False
114def _sha256_file(path: Path) -> str:
115 """Compute SHA-256 hex digest of a file."""
116 h = hashlib.sha256()
117 with path.open("rb") as f:
118 while True:
119 chunk = f.read(_HASH_CHUNK_SIZE)
120 if not chunk:
121 break
122 h.update(chunk)
123 return h.hexdigest()
126class ModelRegistry:
127 """Read/write manifests and resolve refs to blobs in the HF cache."""
129 def __init__(self, models_dir: Path) -> None:
130 self._root = models_dir
131 self._manifests_dir = models_dir / "manifests"
133 def _repo_cache_dir(self, hf_repo: str) -> Path:
134 """The HuggingFace cache directory for *hf_repo* under this registry root."""
135 return self._root / f"models--{repo_to_dir(hf_repo)}"
137 def resolve(self, ref: str) -> Path:
138 """Return the blob path for *ref*; ``KeyError`` if not installed.
140 The canonical *ref* is ``<org>/<repo>/<file>.gguf`` resolved via the
141 lilbee manifest. Two other shapes are accepted as a backwards-compat
142 concession for builds already published (whose on-disk layout differs),
143 not as the intended contract: a bare ``<org>/<repo>`` (older builds
144 persisted these into ``config.toml``) resolves to the one quant of that
145 repo that's installed, and a manifest that's missing / unparseable /
146 blob-less falls back to whatever GGUF ``huggingface_hub`` reports the
147 cache holds for that ref. The HF cache layout is stable, so this lets an
148 upgrade keep working without anyone purging their lilbee data dir; it is
149 deliberately the exception here, not a pattern to follow elsewhere.
150 """
151 if is_bare_hf_repo(ref):
152 return self._resolve_repo_only(_validate_hf_repo(ref))
153 hf_repo, gguf_filename = parse_hf_ref(ref)
154 manifest = self._read_manifest(hf_repo, gguf_filename)
155 if manifest is not None and manifest.blob is not None:
156 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob
157 if _blob_size_matches(blob_file, manifest.size_bytes):
158 return blob_file
159 recovered = self._find_cached_gguf(hf_repo, gguf_filename)
160 if recovered is not None:
161 self._reregister_from_cache(hf_repo, gguf_filename, recovered)
162 return recovered
163 if manifest is None:
164 raise KeyError(f"Model {ref} not installed")
165 # Manifest present but neither it nor the cache yields a blob; keep the
166 # specific diagnostic so a corrupted cache stays debuggable.
167 cache_path = self._repo_cache_dir(manifest.hf_repo)
168 if not cache_path.exists():
169 raise KeyError(f"Cache folder missing for {ref}: {cache_path.name}")
170 if manifest.blob is None:
171 raise KeyError(f"Manifest for {ref} has no blob hash; install incomplete")
172 blob_file = cache_path / "blobs" / manifest.blob
173 if blob_file.exists():
174 raise KeyError(
175 f"Blob for {ref} is truncated: {blob_file.stat().st_size} of "
176 f"{manifest.size_bytes} bytes; re-download required"
177 )
178 raise KeyError(f"Blob file missing for {ref}: {manifest.blob}")
180 def _resolve_repo_only(self, hf_repo: str) -> Path:
181 """Resolve a bare ``<org>/<repo>`` ref to the GGUF of that repo on disk.
183 Older builds persisted bare repo refs for the chat / embedding model.
184 Prefers a current-format manifest under the repo; otherwise asks
185 ``huggingface_hub`` what GGUFs the cache holds for the repo and returns
186 the first one (alphabetical for determinism if more than one quant is
187 installed).
188 """
189 manifest_dir = self._manifests_dir / repo_to_dir(hf_repo)
190 if manifest_dir.is_dir():
191 for mf in sorted(manifest_dir.glob("*.gguf.json")):
192 manifest = self._load_manifest_file(mf)
193 if manifest is None or manifest.blob is None:
194 continue
195 blob = self._repo_cache_dir(hf_repo) / "blobs" / manifest.blob
196 if blob.exists():
197 return blob
198 for filename in sorted(self._cached_gguf_names(hf_repo)):
199 recovered = self._find_cached_gguf(hf_repo, filename)
200 if recovered is not None:
201 self._reregister_from_cache(hf_repo, filename, recovered)
202 return recovered
203 raise KeyError(f"Model {hf_repo} not installed")
205 def _cached_gguf_names(self, hf_repo: str) -> set[str]:
206 """``.gguf`` filenames the HuggingFace cache holds for *hf_repo*."""
207 if not self._root.is_dir():
208 return set()
209 from huggingface_hub import scan_cache_dir
211 info = scan_cache_dir(self._root)
212 return {
213 f.file_name
214 for repo in info.repos
215 if repo.repo_id == hf_repo
216 for rev in repo.revisions
217 for f in rev.files
218 if f.file_name.endswith(".gguf")
219 }
221 def _find_cached_gguf(self, hf_repo: str, gguf_filename: str) -> Path | None:
222 """Return the cached blob path for ``hf_repo``/``gguf_filename``, or None.
224 Uses ``huggingface_hub.try_to_load_from_cache`` so we honor whatever
225 cache layout HF uses, then resolves the returned snapshot symlink to the
226 blob, bounded to the cache directory.
227 """
228 from huggingface_hub import try_to_load_from_cache
230 hit = try_to_load_from_cache(
231 repo_id=hf_repo, filename=gguf_filename, cache_dir=str(self._root)
232 )
233 if not isinstance(hit, str): # None (not cached) or the _CACHED_NO_EXIST sentinel
234 return None
235 resolved = Path(hit).resolve()
236 try:
237 validate_path_within(resolved, self._root)
238 except ValueError:
239 return None
240 return resolved
242 def _reregister_from_cache(self, hf_repo: str, gguf_filename: str, blob_path: Path) -> None:
243 """Write a fresh manifest for a model just recovered from the HF cache.
245 ``list_installed`` only walks ``manifests/``, so a cache-recovered model
246 is resolvable but otherwise invisible (``lilbee model list``, the TUI
247 catalog, the pull command's "already installed" check) until a manifest
248 exists. The ``task`` comes from the featured catalog; for a non-catalog
249 ref it's unknown, so the rewrite is skipped. Best-effort: a read-only
250 models dir or a write race must not break the resolve that succeeded.
251 """
252 from datetime import UTC, datetime
254 ref = format_native_gguf_ref(hf_repo, gguf_filename)
255 try:
256 from lilbee.catalog import (
257 find_catalog_entry,
258 ) # deferred: lilbee.catalog is a heavy import
260 entry = find_catalog_entry(ref)
261 if entry is None:
262 return
263 self._write_manifest(
264 ModelManifest(
265 hf_repo=hf_repo,
266 gguf_filename=gguf_filename,
267 size_bytes=blob_path.stat().st_size,
268 task=entry.task,
269 downloaded_at=datetime.now(UTC).isoformat(),
270 blob=blob_path.name, # the blob's filename is its sha in the HF cache
271 )
272 )
273 log.info("Recovered manifest for %s from the model cache", ref)
274 except Exception: # cache-warming write; the resolve already returned a path
275 log.debug("Could not re-register %s from the model cache", ref, exc_info=True)
277 def is_installed(self, ref: str) -> bool:
278 """Return True if a model is installed and its blob is present."""
279 try:
280 self.resolve(ref)
281 return True
282 except (KeyError, ValueError):
283 return False
285 def install(
286 self,
287 hf_repo: str,
288 gguf_filename: str,
289 source_path: Path,
290 manifest: ModelManifest,
291 ) -> Path:
292 """Write a manifest, copying *source_path* into the HF cache if needed."""
293 digest = _sha256_file(source_path)
294 cache_path = self._repo_cache_dir(hf_repo)
295 blobs_dir = cache_path / "blobs"
296 blob_path = blobs_dir / digest
297 if not blob_path.exists():
298 blobs_dir.mkdir(parents=True, exist_ok=True)
299 _copy_atomic(source_path, blob_path)
301 updated = ModelManifest(
302 hf_repo=hf_repo,
303 gguf_filename=gguf_filename,
304 # Record the size install actually wrote, not the caller's claim,
305 # so the on-disk size check has a trustworthy reference.
306 size_bytes=source_path.stat().st_size,
307 task=manifest.task,
308 downloaded_at=manifest.downloaded_at,
309 blob=digest,
310 )
311 self._write_manifest(updated)
312 return blob_path
314 def remove(self, ref: str) -> bool:
315 """Remove a manifest and its backing blob.
317 The blob is shared via SHA-256 digest, so it only goes away
318 when no other installed manifest references the same digest.
319 Empty cache directories (``blobs/``, the per-repo ``models--``
320 folder, and the per-repo manifest folder) are pruned so a
321 deleted model leaves no orphan bytes behind.
322 """
323 try:
324 hf_repo, gguf_filename = parse_hf_ref(ref)
325 except ValueError:
326 return False
327 manifest = self._read_manifest(hf_repo, gguf_filename)
328 if manifest is None:
329 return False
330 manifest_path = self._manifest_path(hf_repo, gguf_filename)
331 manifest_path.unlink()
332 repo_dir = manifest_path.parent
333 if repo_dir.exists() and not any(repo_dir.iterdir()):
334 repo_dir.rmdir()
335 if manifest.blob is not None:
336 self._gc_blob(manifest.hf_repo, manifest.blob)
337 log.info("Removed model %s", ref)
338 return True
340 def _gc_blob(self, hf_repo: str, digest: str) -> None:
341 """Drop blob bytes and HuggingFace cache cruft now that *digest*
342 and possibly the whole repo are unused.
344 When the per-repo ``models--<repo>/`` directory has no installed
345 manifests left, the whole directory is wiped so HF's ``refs/``,
346 ``snapshots/``, and stale ``blobs/`` all go with it. Otherwise
347 only the specific blob file is removed when no remaining
348 manifest still references its digest.
349 """
350 cache_path = self._repo_cache_dir(hf_repo)
351 try:
352 validate_path_within(cache_path, self._root)
353 except ValueError:
354 log.warning("Refusing to remove cache outside models_dir: %s", cache_path)
355 return
356 siblings = [m for m in self.list_installed() if m.hf_repo == hf_repo]
357 if not siblings:
358 if cache_path.exists():
359 shutil.rmtree(cache_path)
360 return
361 if any(m.blob == digest for m in siblings):
362 return
363 blob_file = cache_path / "blobs" / digest
364 if blob_file.exists():
365 blob_file.unlink()
367 def list_installed(self) -> list[ModelManifest]:
368 """Return manifests for models whose blob is fully present on disk.
370 A manifest with a null blob field or a missing blob file is the
371 residue of a canceled or partial download. Surfacing it would
372 let the picker offer an unusable selection, so the read filter
373 lives here at the source instead of in every UI caller.
374 """
375 manifests: list[ModelManifest] = []
376 if not self._manifests_dir.exists():
377 return manifests
378 for repo_dir in sorted(self._manifests_dir.iterdir()):
379 if not repo_dir.is_dir():
380 continue
381 for tag_file in sorted(repo_dir.glob("*.gguf.json")):
382 manifest = self._load_manifest_file(tag_file)
383 if manifest is not None and self._blob_present(manifest):
384 manifests.append(manifest)
385 return manifests
387 def _blob_present(self, manifest: ModelManifest) -> bool:
388 """True iff *manifest* points at a blob whose on-disk size matches."""
389 if manifest.blob is None:
390 return False
391 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob
392 return _blob_size_matches(blob_file, manifest.size_bytes)
394 def get_manifest(self, ref: str) -> ModelManifest | None:
395 """Return the manifest for *ref* or None if not installed."""
396 try:
397 hf_repo, gguf_filename = parse_hf_ref(ref)
398 except ValueError:
399 return None
400 return self._read_manifest(hf_repo, gguf_filename)
402 def installed_ref_for_repo(self, hf_repo: str) -> str | None:
403 """Full ``<repo>/<file>.gguf`` ref of an installed quant of *hf_repo*, or None.
405 Alphabetical-first when several quants are installed, matching
406 ``_resolve_repo_only``'s determinism.
407 """
408 refs = sorted(m.ref for m in self.list_installed() if m.hf_repo == hf_repo)
409 return refs[0] if refs else None
411 def _manifest_path(self, hf_repo: str, gguf_filename: str) -> Path:
412 repo = _validate_hf_repo(hf_repo)
413 filename = _validate_gguf_filename(gguf_filename)
414 path = self._manifests_dir / repo_to_dir(repo) / f"{filename}.json"
415 validate_path_within(path, self._manifests_dir)
416 return path
418 def _read_manifest(self, hf_repo: str, gguf_filename: str) -> ModelManifest | None:
419 return self._load_manifest_file(self._manifest_path(hf_repo, gguf_filename))
421 def _write_manifest(self, manifest: ModelManifest) -> None:
422 path = self._manifest_path(manifest.hf_repo, manifest.gguf_filename)
423 path.parent.mkdir(parents=True, exist_ok=True)
424 data = json.dumps(asdict(manifest), indent=2)
425 tmp_path: str | None = None
426 try:
427 with tempfile.NamedTemporaryFile(
428 dir=path.parent, suffix=".tmp", mode="w", delete=False
429 ) as tmp:
430 tmp_path = tmp.name
431 tmp.write(data)
432 os.replace(tmp_path, path)
433 except BaseException:
434 if tmp_path is not None:
435 Path(tmp_path).unlink(missing_ok=True)
436 raise
438 def _load_manifest_file(self, path: Path) -> ModelManifest | None:
439 if not path.exists():
440 return None
441 try:
442 data = json.loads(path.read_text())
443 return ModelManifest(**data)
444 except (json.JSONDecodeError, TypeError, KeyError):
445 log.warning("Corrupt manifest: %s", path)
446 return None
449def register_downloaded_model(entry: CatalogModel, file_path: Path) -> None:
450 """Write a registry manifest for a freshly downloaded GGUF.
452 A failed manifest write is logged, not raised, when the GGUF is still in the
453 HF cache (``resolve`` recovers from it); if it isn't, the download itself is
454 broken and the failure propagates so the caller reports it.
455 """
456 from datetime import UTC, datetime
458 registry = ModelRegistry(cfg.models_dir)
459 manifest = ModelManifest(
460 hf_repo=entry.hf_repo,
461 gguf_filename=file_path.name,
462 size_bytes=file_path.stat().st_size,
463 task=entry.task,
464 downloaded_at=datetime.now(UTC).isoformat(),
465 )
466 try:
467 registry.install(entry.hf_repo, file_path.name, file_path, manifest)
468 log.info("Registered %s/%s in manifest", entry.hf_repo, file_path.name)
469 except Exception:
470 ref = format_native_gguf_ref(entry.hf_repo, file_path.name)
471 if not registry.is_installed(ref):
472 raise
473 log.warning(
474 "Manifest write failed for %s; recovered via the model cache", ref, exc_info=True
475 )