Coverage for src / lilbee / modelhub / registry.py: 100%
245 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Manifest store keyed by ``(hf_repo, gguf_filename)`` over the HF cache.
3Canonical ref: ``<hf_repo>/<gguf_filename>``. Two quants of the same
4repo are two distinct installations. Manifests live at
5``manifests/<repo--repo>/<filename>.json``; blobs at
6``models--<repo--repo>/blobs/<sha>``.
7"""
9from __future__ import annotations
11import hashlib
12import json
13import logging
14import os
15import re
16import shutil
17import tempfile
18from dataclasses import asdict, dataclass
19from pathlib import Path
20from typing import TYPE_CHECKING
22from lilbee.catalog.refs import format_native_gguf_ref
23from lilbee.core.config.model import cfg
24from lilbee.core.security import validate_path_within
26if TYPE_CHECKING:
27 from lilbee.catalog.models import CatalogModel
28 from lilbee.catalog.types import ModelTask
30log = logging.getLogger(__name__)
32_HASH_CHUNK_SIZE = 8192 # bytes read per iteration when hashing
33_REPO_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+$")
34_FILENAME_RE = re.compile(r"^[a-zA-Z0-9._-]+\.gguf$")
36REPO_DIR_SEPARATOR = "--"
39def _validate_hf_repo(hf_repo: str) -> str:
40 """Validate that a HuggingFace repo id has the form ``org/name``."""
41 if not hf_repo or not _REPO_SEGMENT_RE.match(hf_repo) or ".." in hf_repo:
42 raise ValueError(f"Invalid hf_repo: {hf_repo!r}")
43 return hf_repo
46def _validate_gguf_filename(filename: str) -> str:
47 """Validate that a filename is a safe ``.gguf`` basename (no path separators)."""
48 if not filename or not _FILENAME_RE.match(filename) or ".." in filename:
49 raise ValueError(f"Invalid gguf_filename: {filename!r}")
50 return filename
53_REF_SHAPE_HINT = "Use '<org>/<repo>/<filename>.gguf'."
56def parse_hf_ref(ref: str) -> tuple[str, str]:
57 """Split ``<org>/<repo>/<file>.gguf`` into ``(hf_repo, gguf_filename)``."""
58 if not ref.endswith(".gguf") or "/" not in ref:
59 raise ValueError(f"Model ref {ref!r} is not a HuggingFace ref. {_REF_SHAPE_HINT}")
60 hf_repo, gguf_filename = ref.rsplit("/", 1)
61 return _validate_hf_repo(hf_repo), _validate_gguf_filename(gguf_filename)
64def repo_to_dir(hf_repo: str) -> str:
65 """Encode an HF repo for use as a directory name (HF cache convention)."""
66 return hf_repo.replace("/", REPO_DIR_SEPARATOR)
69@dataclass
70class ModelManifest:
71 """One installed model's metadata. Identity: ``(hf_repo, gguf_filename)``."""
73 hf_repo: str
74 gguf_filename: str
75 size_bytes: int
76 task: ModelTask
77 downloaded_at: str # ISO 8601
78 blob: str | None = None # SHA-256 hex of the blob in the HF cache; None pre-install
80 @property
81 def ref(self) -> str:
82 return format_native_gguf_ref(self.hf_repo, self.gguf_filename)
85def _sha256_file(path: Path) -> str:
86 """Compute SHA-256 hex digest of a file."""
87 h = hashlib.sha256()
88 with path.open("rb") as f:
89 while True:
90 chunk = f.read(_HASH_CHUNK_SIZE)
91 if not chunk:
92 break
93 h.update(chunk)
94 return h.hexdigest()
97class ModelRegistry:
98 """Read/write manifests and resolve refs to blobs in the HF cache."""
100 def __init__(self, models_dir: Path) -> None:
101 self._root = models_dir
102 self._manifests_dir = models_dir / "manifests"
104 def _repo_cache_dir(self, hf_repo: str) -> Path:
105 """The HuggingFace cache directory for *hf_repo* under this registry root."""
106 return self._root / f"models--{repo_to_dir(hf_repo)}"
108 def resolve(self, ref: str) -> Path:
109 """Return the blob path for *ref*; ``KeyError`` if not installed.
111 The canonical *ref* is ``<org>/<repo>/<file>.gguf`` resolved via the
112 lilbee manifest. Two other shapes are accepted as a backwards-compat
113 concession for builds already published (whose on-disk layout differs),
114 not as the intended contract: a bare ``<org>/<repo>`` (older builds
115 persisted these into ``config.toml``) resolves to the one quant of that
116 repo that's installed, and a manifest that's missing / unparseable /
117 blob-less falls back to whatever GGUF ``huggingface_hub`` reports the
118 cache holds for that ref. The HF cache layout is stable, so this lets an
119 upgrade keep working without anyone purging their lilbee data dir; it is
120 deliberately the exception here, not a pattern to follow elsewhere.
121 """
122 if not ref.endswith(".gguf") and ref.count("/") == 1:
123 return self._resolve_repo_only(_validate_hf_repo(ref))
124 hf_repo, gguf_filename = parse_hf_ref(ref)
125 manifest = self._read_manifest(hf_repo, gguf_filename)
126 if manifest is not None and manifest.blob is not None:
127 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob
128 if blob_file.exists():
129 return blob_file
130 recovered = self._find_cached_gguf(hf_repo, gguf_filename)
131 if recovered is not None:
132 self._reregister_from_cache(hf_repo, gguf_filename, recovered)
133 return recovered
134 if manifest is None:
135 raise KeyError(f"Model {ref} not installed")
136 # Manifest present but neither it nor the cache yields a blob; keep the
137 # specific diagnostic so a corrupted cache stays debuggable.
138 cache_path = self._repo_cache_dir(manifest.hf_repo)
139 if not cache_path.exists():
140 raise KeyError(f"Cache folder missing for {ref}: {cache_path.name}")
141 if manifest.blob is None:
142 raise KeyError(f"Manifest for {ref} has no blob hash; install incomplete")
143 raise KeyError(f"Blob file missing for {ref}: {manifest.blob}")
145 def _resolve_repo_only(self, hf_repo: str) -> Path:
146 """Resolve a bare ``<org>/<repo>`` ref to the GGUF of that repo on disk.
148 Older builds persisted bare repo refs for the chat / embedding model.
149 Prefers a current-format manifest under the repo; otherwise asks
150 ``huggingface_hub`` what GGUFs the cache holds for the repo and returns
151 the first one (alphabetical for determinism if more than one quant is
152 installed).
153 """
154 manifest_dir = self._manifests_dir / repo_to_dir(hf_repo)
155 if manifest_dir.is_dir():
156 for mf in sorted(manifest_dir.glob("*.gguf.json")):
157 manifest = self._load_manifest_file(mf)
158 if manifest is None or manifest.blob is None:
159 continue
160 blob = self._repo_cache_dir(hf_repo) / "blobs" / manifest.blob
161 if blob.exists():
162 return blob
163 for filename in sorted(self._cached_gguf_names(hf_repo)):
164 recovered = self._find_cached_gguf(hf_repo, filename)
165 if recovered is not None:
166 self._reregister_from_cache(hf_repo, filename, recovered)
167 return recovered
168 raise KeyError(f"Model {hf_repo} not installed")
170 def _cached_gguf_names(self, hf_repo: str) -> set[str]:
171 """``.gguf`` filenames the HuggingFace cache holds for *hf_repo*."""
172 if not self._root.is_dir():
173 return set()
174 from huggingface_hub import scan_cache_dir
176 info = scan_cache_dir(self._root)
177 return {
178 f.file_name
179 for repo in info.repos
180 if repo.repo_id == hf_repo
181 for rev in repo.revisions
182 for f in rev.files
183 if f.file_name.endswith(".gguf")
184 }
186 def _find_cached_gguf(self, hf_repo: str, gguf_filename: str) -> Path | None:
187 """Return the cached blob path for ``hf_repo``/``gguf_filename``, or None.
189 Uses ``huggingface_hub.try_to_load_from_cache`` so we honor whatever
190 cache layout HF uses, then resolves the returned snapshot symlink to the
191 blob, bounded to the cache directory.
192 """
193 from huggingface_hub import try_to_load_from_cache
195 hit = try_to_load_from_cache(
196 repo_id=hf_repo, filename=gguf_filename, cache_dir=str(self._root)
197 )
198 if not isinstance(hit, str): # None (not cached) or the _CACHED_NO_EXIST sentinel
199 return None
200 resolved = Path(hit).resolve()
201 try:
202 validate_path_within(resolved, self._root)
203 except ValueError:
204 return None
205 return resolved
207 def _reregister_from_cache(self, hf_repo: str, gguf_filename: str, blob_path: Path) -> None:
208 """Write a fresh manifest for a model just recovered from the HF cache.
210 ``list_installed`` only walks ``manifests/``, so a cache-recovered model
211 is resolvable but otherwise invisible (``lilbee model list``, the TUI
212 catalog, the pull command's "already installed" check) until a manifest
213 exists. The ``task`` comes from the featured catalog; for a non-catalog
214 ref it's unknown, so the rewrite is skipped. Best-effort: a read-only
215 models dir or a write race must not break the resolve that succeeded.
216 """
217 from datetime import UTC, datetime
219 ref = format_native_gguf_ref(hf_repo, gguf_filename)
220 try:
221 from lilbee.catalog import (
222 find_catalog_entry,
223 ) # deferred: lilbee.catalog is a heavy import
225 entry = find_catalog_entry(ref)
226 if entry is None:
227 return
228 self._write_manifest(
229 ModelManifest(
230 hf_repo=hf_repo,
231 gguf_filename=gguf_filename,
232 size_bytes=blob_path.stat().st_size,
233 task=entry.task,
234 downloaded_at=datetime.now(UTC).isoformat(),
235 blob=blob_path.name, # the blob's filename is its sha in the HF cache
236 )
237 )
238 log.info("Recovered manifest for %s from the model cache", ref)
239 except Exception: # cache-warming write; the resolve already returned a path
240 log.debug("Could not re-register %s from the model cache", ref, exc_info=True)
242 def is_installed(self, ref: str) -> bool:
243 """Return True if a model is installed and its blob is present."""
244 try:
245 self.resolve(ref)
246 return True
247 except (KeyError, ValueError):
248 return False
250 def install(
251 self,
252 hf_repo: str,
253 gguf_filename: str,
254 source_path: Path,
255 manifest: ModelManifest,
256 ) -> Path:
257 """Write a manifest, copying *source_path* into the HF cache if needed."""
258 import shutil
260 digest = _sha256_file(source_path)
261 cache_path = self._repo_cache_dir(hf_repo)
262 blobs_dir = cache_path / "blobs"
263 blob_path = blobs_dir / digest
264 if not blob_path.exists():
265 blobs_dir.mkdir(parents=True, exist_ok=True)
266 shutil.copy2(source_path, blob_path)
268 updated = ModelManifest(
269 hf_repo=hf_repo,
270 gguf_filename=gguf_filename,
271 size_bytes=manifest.size_bytes,
272 task=manifest.task,
273 downloaded_at=manifest.downloaded_at,
274 blob=digest,
275 )
276 self._write_manifest(updated)
277 return blob_path
279 def remove(self, ref: str) -> bool:
280 """Remove a manifest and its backing blob.
282 The blob is shared via SHA-256 digest, so it only goes away
283 when no other installed manifest references the same digest.
284 Empty cache directories (``blobs/``, the per-repo ``models--``
285 folder, and the per-repo manifest folder) are pruned so a
286 deleted model leaves no orphan bytes behind.
287 """
288 try:
289 hf_repo, gguf_filename = parse_hf_ref(ref)
290 except ValueError:
291 return False
292 manifest = self._read_manifest(hf_repo, gguf_filename)
293 if manifest is None:
294 return False
295 manifest_path = self._manifest_path(hf_repo, gguf_filename)
296 manifest_path.unlink()
297 repo_dir = manifest_path.parent
298 if repo_dir.exists() and not any(repo_dir.iterdir()):
299 repo_dir.rmdir()
300 if manifest.blob is not None:
301 self._gc_blob(manifest.hf_repo, manifest.blob)
302 log.info("Removed model %s", ref)
303 return True
305 def _gc_blob(self, hf_repo: str, digest: str) -> None:
306 """Drop blob bytes and HuggingFace cache cruft now that *digest*
307 and possibly the whole repo are unused.
309 When the per-repo ``models--<repo>/`` directory has no installed
310 manifests left, the whole directory is wiped so HF's ``refs/``,
311 ``snapshots/``, and stale ``blobs/`` all go with it. Otherwise
312 only the specific blob file is removed when no remaining
313 manifest still references its digest.
314 """
315 cache_path = self._repo_cache_dir(hf_repo)
316 try:
317 validate_path_within(cache_path, self._root)
318 except ValueError:
319 log.warning("Refusing to remove cache outside models_dir: %s", cache_path)
320 return
321 siblings = [m for m in self.list_installed() if m.hf_repo == hf_repo]
322 if not siblings:
323 if cache_path.exists():
324 shutil.rmtree(cache_path)
325 return
326 if any(m.blob == digest for m in siblings):
327 return
328 blob_file = cache_path / "blobs" / digest
329 if blob_file.exists():
330 blob_file.unlink()
332 def list_installed(self) -> list[ModelManifest]:
333 """Return manifests for models whose blob is fully present on disk.
335 A manifest with a null blob field or a missing blob file is the
336 residue of a canceled or partial download. Surfacing it would
337 let the picker offer an unusable selection, so the read filter
338 lives here at the source instead of in every UI caller.
339 """
340 manifests: list[ModelManifest] = []
341 if not self._manifests_dir.exists():
342 return manifests
343 for repo_dir in sorted(self._manifests_dir.iterdir()):
344 if not repo_dir.is_dir():
345 continue
346 for tag_file in sorted(repo_dir.glob("*.gguf.json")):
347 manifest = self._load_manifest_file(tag_file)
348 if manifest is not None and self._blob_present(manifest):
349 manifests.append(manifest)
350 return manifests
352 def _blob_present(self, manifest: ModelManifest) -> bool:
353 """True iff *manifest* points at an existing blob file."""
354 if manifest.blob is None:
355 return False
356 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob
357 return blob_file.exists()
359 def get_manifest(self, ref: str) -> ModelManifest | None:
360 """Return the manifest for *ref* or None if not installed."""
361 try:
362 hf_repo, gguf_filename = parse_hf_ref(ref)
363 except ValueError:
364 return None
365 return self._read_manifest(hf_repo, gguf_filename)
367 def _manifest_path(self, hf_repo: str, gguf_filename: str) -> Path:
368 repo = _validate_hf_repo(hf_repo)
369 filename = _validate_gguf_filename(gguf_filename)
370 path = self._manifests_dir / repo_to_dir(repo) / f"{filename}.json"
371 validate_path_within(path, self._manifests_dir)
372 return path
374 def _read_manifest(self, hf_repo: str, gguf_filename: str) -> ModelManifest | None:
375 return self._load_manifest_file(self._manifest_path(hf_repo, gguf_filename))
377 def _write_manifest(self, manifest: ModelManifest) -> None:
378 path = self._manifest_path(manifest.hf_repo, manifest.gguf_filename)
379 path.parent.mkdir(parents=True, exist_ok=True)
380 data = json.dumps(asdict(manifest), indent=2)
381 tmp_path: str | None = None
382 try:
383 with tempfile.NamedTemporaryFile(
384 dir=path.parent, suffix=".tmp", mode="w", delete=False
385 ) as tmp:
386 tmp_path = tmp.name
387 tmp.write(data)
388 os.replace(tmp_path, path)
389 except BaseException:
390 if tmp_path is not None:
391 Path(tmp_path).unlink(missing_ok=True)
392 raise
394 def _load_manifest_file(self, path: Path) -> ModelManifest | None:
395 if not path.exists():
396 return None
397 try:
398 data = json.loads(path.read_text())
399 return ModelManifest(**data)
400 except (json.JSONDecodeError, TypeError, KeyError):
401 log.warning("Corrupt manifest: %s", path)
402 return None
405def register_downloaded_model(entry: CatalogModel, file_path: Path) -> None:
406 """Write a registry manifest for a freshly downloaded GGUF.
408 A failed manifest write is logged, not raised, when the GGUF is still in the
409 HF cache (``resolve`` recovers from it); if it isn't, the download itself is
410 broken and the failure propagates so the caller reports it.
411 """
412 from datetime import UTC, datetime
414 registry = ModelRegistry(cfg.models_dir)
415 manifest = ModelManifest(
416 hf_repo=entry.hf_repo,
417 gguf_filename=file_path.name,
418 size_bytes=file_path.stat().st_size,
419 task=entry.task,
420 downloaded_at=datetime.now(UTC).isoformat(),
421 )
422 try:
423 registry.install(entry.hf_repo, file_path.name, file_path, manifest)
424 log.info("Registered %s/%s in manifest", entry.hf_repo, file_path.name)
425 except Exception:
426 ref = format_native_gguf_ref(entry.hf_repo, file_path.name)
427 if not registry.is_installed(ref):
428 raise
429 log.warning(
430 "Manifest write failed for %s; recovered via the model cache", ref, exc_info=True
431 )