Coverage for src / lilbee / modelhub / registry.py: 100%

265 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Manifest store keyed by ``(hf_repo, gguf_filename)`` over the HF cache. 

2 

3Canonical ref: ``<hf_repo>/<gguf_filename>``. Two quants of the same 

4repo are two distinct installations. Manifests live at 

5``manifests/<repo--repo>/<filename>.json``; blobs at 

6``models--<repo--repo>/blobs/<sha>``. 

7""" 

8 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13import logging 

14import os 

15import re 

16import shutil 

17import tempfile 

18from dataclasses import asdict, dataclass 

19from pathlib import Path 

20from typing import TYPE_CHECKING 

21 

22from lilbee.catalog.refs import format_native_gguf_ref, is_bare_hf_repo 

23from lilbee.core.config.model import cfg 

24from lilbee.core.security import validate_path_within 

25 

26if TYPE_CHECKING: 

27 from lilbee.catalog.models import CatalogModel 

28 from lilbee.catalog.types import ModelTask 

29 

30log = logging.getLogger(__name__) 

31 

32_HASH_CHUNK_SIZE = 8192 # bytes read per iteration when hashing 

33_REPO_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+$") 

34_FILENAME_RE = re.compile(r"^[a-zA-Z0-9._-]+\.gguf$") 

35 

36REPO_DIR_SEPARATOR = "--" 

37 

38 

39def _validate_hf_repo(hf_repo: str) -> str: 

40 """Validate that a HuggingFace repo id has the form ``org/name``.""" 

41 if not hf_repo or not _REPO_SEGMENT_RE.match(hf_repo) or ".." in hf_repo: 

42 raise ValueError(f"Invalid hf_repo: {hf_repo!r}") 

43 return hf_repo 

44 

45 

46def _validate_gguf_filename(filename: str) -> str: 

47 """Validate that a filename is a safe ``.gguf`` basename (no path separators).""" 

48 if not filename or not _FILENAME_RE.match(filename) or ".." in filename: 

49 raise ValueError(f"Invalid gguf_filename: {filename!r}") 

50 return filename 

51 

52 

53_REF_SHAPE_HINT = "Use '<org>/<repo>/<filename>.gguf'." 

54 

55 

56def parse_hf_ref(ref: str) -> tuple[str, str]: 

57 """Split ``<org>/<repo>/<file>.gguf`` into ``(hf_repo, gguf_filename)``.""" 

58 if not ref.endswith(".gguf") or "/" not in ref: 

59 raise ValueError(f"Model ref {ref!r} is not a HuggingFace ref. {_REF_SHAPE_HINT}") 

60 hf_repo, gguf_filename = ref.rsplit("/", 1) 

61 return _validate_hf_repo(hf_repo), _validate_gguf_filename(gguf_filename) 

62 

63 

64def repo_to_dir(hf_repo: str) -> str: 

65 """Encode an HF repo for use as a directory name (HF cache convention).""" 

66 return hf_repo.replace("/", REPO_DIR_SEPARATOR) 

67 

68 

69@dataclass 

70class ModelManifest: 

71 """One installed model's metadata. Identity: ``(hf_repo, gguf_filename)``.""" 

72 

73 hf_repo: str 

74 gguf_filename: str 

75 size_bytes: int 

76 task: ModelTask 

77 downloaded_at: str # ISO 8601 

78 blob: str | None = None # SHA-256 hex of the blob in the HF cache; None pre-install 

79 

80 @property 

81 def ref(self) -> str: 

82 return format_native_gguf_ref(self.hf_repo, self.gguf_filename) 

83 

84 

85def _copy_atomic(source_path: Path, blob_path: Path) -> None: 

86 """Copy *source_path* to *blob_path* via a temp file + atomic rename. 

87 

88 A crash mid-copy leaves only the temp file, never a partial blob at 

89 the final digest path that callers would treat as complete. 

90 """ 

91 fd, tmp_name = tempfile.mkstemp(dir=str(blob_path.parent), suffix=".part") 

92 tmp_path = Path(tmp_name) 

93 try: 

94 with os.fdopen(fd, "wb") as dst, source_path.open("rb") as src: 

95 shutil.copyfileobj(src, dst) 

96 os.replace(tmp_path, blob_path) 

97 except BaseException: 

98 tmp_path.unlink(missing_ok=True) 

99 raise 

100 

101 

102def _blob_size_matches(blob_file: Path, expected_size: int) -> bool: 

103 """True iff *blob_file* exists and its byte size equals *expected_size*. 

104 

105 A blob shorter than the manifest's recorded size is a truncated / 

106 interrupted download and must not count as installed. 

107 """ 

108 try: 

109 return blob_file.stat().st_size == expected_size 

110 except OSError: 

111 return False 

112 

113 

114def _sha256_file(path: Path) -> str: 

115 """Compute SHA-256 hex digest of a file.""" 

116 h = hashlib.sha256() 

117 with path.open("rb") as f: 

118 while True: 

119 chunk = f.read(_HASH_CHUNK_SIZE) 

120 if not chunk: 

121 break 

122 h.update(chunk) 

123 return h.hexdigest() 

124 

125 

126class ModelRegistry: 

127 """Read/write manifests and resolve refs to blobs in the HF cache.""" 

128 

129 def __init__(self, models_dir: Path) -> None: 

130 self._root = models_dir 

131 self._manifests_dir = models_dir / "manifests" 

132 

133 def _repo_cache_dir(self, hf_repo: str) -> Path: 

134 """The HuggingFace cache directory for *hf_repo* under this registry root.""" 

135 return self._root / f"models--{repo_to_dir(hf_repo)}" 

136 

137 def resolve(self, ref: str) -> Path: 

138 """Return the blob path for *ref*; ``KeyError`` if not installed. 

139 

140 The canonical *ref* is ``<org>/<repo>/<file>.gguf`` resolved via the 

141 lilbee manifest. Two other shapes are accepted as a backwards-compat 

142 concession for builds already published (whose on-disk layout differs), 

143 not as the intended contract: a bare ``<org>/<repo>`` (older builds 

144 persisted these into ``config.toml``) resolves to the one quant of that 

145 repo that's installed, and a manifest that's missing / unparseable / 

146 blob-less falls back to whatever GGUF ``huggingface_hub`` reports the 

147 cache holds for that ref. The HF cache layout is stable, so this lets an 

148 upgrade keep working without anyone purging their lilbee data dir; it is 

149 deliberately the exception here, not a pattern to follow elsewhere. 

150 """ 

151 if is_bare_hf_repo(ref): 

152 return self._resolve_repo_only(_validate_hf_repo(ref)) 

153 hf_repo, gguf_filename = parse_hf_ref(ref) 

154 manifest = self._read_manifest(hf_repo, gguf_filename) 

155 if manifest is not None and manifest.blob is not None: 

156 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob 

157 if _blob_size_matches(blob_file, manifest.size_bytes): 

158 return blob_file 

159 recovered = self._find_cached_gguf(hf_repo, gguf_filename) 

160 if recovered is not None: 

161 self._reregister_from_cache(hf_repo, gguf_filename, recovered) 

162 return recovered 

163 if manifest is None: 

164 raise KeyError(f"Model {ref} not installed") 

165 # Manifest present but neither it nor the cache yields a blob; keep the 

166 # specific diagnostic so a corrupted cache stays debuggable. 

167 cache_path = self._repo_cache_dir(manifest.hf_repo) 

168 if not cache_path.exists(): 

169 raise KeyError(f"Cache folder missing for {ref}: {cache_path.name}") 

170 if manifest.blob is None: 

171 raise KeyError(f"Manifest for {ref} has no blob hash; install incomplete") 

172 blob_file = cache_path / "blobs" / manifest.blob 

173 if blob_file.exists(): 

174 raise KeyError( 

175 f"Blob for {ref} is truncated: {blob_file.stat().st_size} of " 

176 f"{manifest.size_bytes} bytes; re-download required" 

177 ) 

178 raise KeyError(f"Blob file missing for {ref}: {manifest.blob}") 

179 

180 def _resolve_repo_only(self, hf_repo: str) -> Path: 

181 """Resolve a bare ``<org>/<repo>`` ref to the GGUF of that repo on disk. 

182 

183 Older builds persisted bare repo refs for the chat / embedding model. 

184 Prefers a current-format manifest under the repo; otherwise asks 

185 ``huggingface_hub`` what GGUFs the cache holds for the repo and returns 

186 the first one (alphabetical for determinism if more than one quant is 

187 installed). 

188 """ 

189 manifest_dir = self._manifests_dir / repo_to_dir(hf_repo) 

190 if manifest_dir.is_dir(): 

191 for mf in sorted(manifest_dir.glob("*.gguf.json")): 

192 manifest = self._load_manifest_file(mf) 

193 if manifest is None or manifest.blob is None: 

194 continue 

195 blob = self._repo_cache_dir(hf_repo) / "blobs" / manifest.blob 

196 if blob.exists(): 

197 return blob 

198 for filename in sorted(self._cached_gguf_names(hf_repo)): 

199 recovered = self._find_cached_gguf(hf_repo, filename) 

200 if recovered is not None: 

201 self._reregister_from_cache(hf_repo, filename, recovered) 

202 return recovered 

203 raise KeyError(f"Model {hf_repo} not installed") 

204 

205 def _cached_gguf_names(self, hf_repo: str) -> set[str]: 

206 """``.gguf`` filenames the HuggingFace cache holds for *hf_repo*.""" 

207 if not self._root.is_dir(): 

208 return set() 

209 from huggingface_hub import scan_cache_dir 

210 

211 info = scan_cache_dir(self._root) 

212 return { 

213 f.file_name 

214 for repo in info.repos 

215 if repo.repo_id == hf_repo 

216 for rev in repo.revisions 

217 for f in rev.files 

218 if f.file_name.endswith(".gguf") 

219 } 

220 

221 def _find_cached_gguf(self, hf_repo: str, gguf_filename: str) -> Path | None: 

222 """Return the cached blob path for ``hf_repo``/``gguf_filename``, or None. 

223 

224 Uses ``huggingface_hub.try_to_load_from_cache`` so we honor whatever 

225 cache layout HF uses, then resolves the returned snapshot symlink to the 

226 blob, bounded to the cache directory. 

227 """ 

228 from huggingface_hub import try_to_load_from_cache 

229 

230 hit = try_to_load_from_cache( 

231 repo_id=hf_repo, filename=gguf_filename, cache_dir=str(self._root) 

232 ) 

233 if not isinstance(hit, str): # None (not cached) or the _CACHED_NO_EXIST sentinel 

234 return None 

235 resolved = Path(hit).resolve() 

236 try: 

237 validate_path_within(resolved, self._root) 

238 except ValueError: 

239 return None 

240 return resolved 

241 

242 def _reregister_from_cache(self, hf_repo: str, gguf_filename: str, blob_path: Path) -> None: 

243 """Write a fresh manifest for a model just recovered from the HF cache. 

244 

245 ``list_installed`` only walks ``manifests/``, so a cache-recovered model 

246 is resolvable but otherwise invisible (``lilbee model list``, the TUI 

247 catalog, the pull command's "already installed" check) until a manifest 

248 exists. The ``task`` comes from the featured catalog; for a non-catalog 

249 ref it's unknown, so the rewrite is skipped. Best-effort: a read-only 

250 models dir or a write race must not break the resolve that succeeded. 

251 """ 

252 from datetime import UTC, datetime 

253 

254 ref = format_native_gguf_ref(hf_repo, gguf_filename) 

255 try: 

256 from lilbee.catalog import ( 

257 find_catalog_entry, 

258 ) # deferred: lilbee.catalog is a heavy import 

259 

260 entry = find_catalog_entry(ref) 

261 if entry is None: 

262 return 

263 self._write_manifest( 

264 ModelManifest( 

265 hf_repo=hf_repo, 

266 gguf_filename=gguf_filename, 

267 size_bytes=blob_path.stat().st_size, 

268 task=entry.task, 

269 downloaded_at=datetime.now(UTC).isoformat(), 

270 blob=blob_path.name, # the blob's filename is its sha in the HF cache 

271 ) 

272 ) 

273 log.info("Recovered manifest for %s from the model cache", ref) 

274 except Exception: # cache-warming write; the resolve already returned a path 

275 log.debug("Could not re-register %s from the model cache", ref, exc_info=True) 

276 

277 def is_installed(self, ref: str) -> bool: 

278 """Return True if a model is installed and its blob is present.""" 

279 try: 

280 self.resolve(ref) 

281 return True 

282 except (KeyError, ValueError): 

283 return False 

284 

285 def install( 

286 self, 

287 hf_repo: str, 

288 gguf_filename: str, 

289 source_path: Path, 

290 manifest: ModelManifest, 

291 ) -> Path: 

292 """Write a manifest, copying *source_path* into the HF cache if needed.""" 

293 digest = _sha256_file(source_path) 

294 cache_path = self._repo_cache_dir(hf_repo) 

295 blobs_dir = cache_path / "blobs" 

296 blob_path = blobs_dir / digest 

297 if not blob_path.exists(): 

298 blobs_dir.mkdir(parents=True, exist_ok=True) 

299 _copy_atomic(source_path, blob_path) 

300 

301 updated = ModelManifest( 

302 hf_repo=hf_repo, 

303 gguf_filename=gguf_filename, 

304 # Record the size install actually wrote, not the caller's claim, 

305 # so the on-disk size check has a trustworthy reference. 

306 size_bytes=source_path.stat().st_size, 

307 task=manifest.task, 

308 downloaded_at=manifest.downloaded_at, 

309 blob=digest, 

310 ) 

311 self._write_manifest(updated) 

312 return blob_path 

313 

314 def remove(self, ref: str) -> bool: 

315 """Remove a manifest and its backing blob. 

316 

317 The blob is shared via SHA-256 digest, so it only goes away 

318 when no other installed manifest references the same digest. 

319 Empty cache directories (``blobs/``, the per-repo ``models--`` 

320 folder, and the per-repo manifest folder) are pruned so a 

321 deleted model leaves no orphan bytes behind. 

322 """ 

323 try: 

324 hf_repo, gguf_filename = parse_hf_ref(ref) 

325 except ValueError: 

326 return False 

327 manifest = self._read_manifest(hf_repo, gguf_filename) 

328 if manifest is None: 

329 return False 

330 manifest_path = self._manifest_path(hf_repo, gguf_filename) 

331 manifest_path.unlink() 

332 repo_dir = manifest_path.parent 

333 if repo_dir.exists() and not any(repo_dir.iterdir()): 

334 repo_dir.rmdir() 

335 if manifest.blob is not None: 

336 self._gc_blob(manifest.hf_repo, manifest.blob) 

337 log.info("Removed model %s", ref) 

338 return True 

339 

340 def _gc_blob(self, hf_repo: str, digest: str) -> None: 

341 """Drop blob bytes and HuggingFace cache cruft now that *digest* 

342 and possibly the whole repo are unused. 

343 

344 When the per-repo ``models--<repo>/`` directory has no installed 

345 manifests left, the whole directory is wiped so HF's ``refs/``, 

346 ``snapshots/``, and stale ``blobs/`` all go with it. Otherwise 

347 only the specific blob file is removed when no remaining 

348 manifest still references its digest. 

349 """ 

350 cache_path = self._repo_cache_dir(hf_repo) 

351 try: 

352 validate_path_within(cache_path, self._root) 

353 except ValueError: 

354 log.warning("Refusing to remove cache outside models_dir: %s", cache_path) 

355 return 

356 siblings = [m for m in self.list_installed() if m.hf_repo == hf_repo] 

357 if not siblings: 

358 if cache_path.exists(): 

359 shutil.rmtree(cache_path) 

360 return 

361 if any(m.blob == digest for m in siblings): 

362 return 

363 blob_file = cache_path / "blobs" / digest 

364 if blob_file.exists(): 

365 blob_file.unlink() 

366 

367 def list_installed(self) -> list[ModelManifest]: 

368 """Return manifests for models whose blob is fully present on disk. 

369 

370 A manifest with a null blob field or a missing blob file is the 

371 residue of a canceled or partial download. Surfacing it would 

372 let the picker offer an unusable selection, so the read filter 

373 lives here at the source instead of in every UI caller. 

374 """ 

375 manifests: list[ModelManifest] = [] 

376 if not self._manifests_dir.exists(): 

377 return manifests 

378 for repo_dir in sorted(self._manifests_dir.iterdir()): 

379 if not repo_dir.is_dir(): 

380 continue 

381 for tag_file in sorted(repo_dir.glob("*.gguf.json")): 

382 manifest = self._load_manifest_file(tag_file) 

383 if manifest is not None and self._blob_present(manifest): 

384 manifests.append(manifest) 

385 return manifests 

386 

387 def _blob_present(self, manifest: ModelManifest) -> bool: 

388 """True iff *manifest* points at a blob whose on-disk size matches.""" 

389 if manifest.blob is None: 

390 return False 

391 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob 

392 return _blob_size_matches(blob_file, manifest.size_bytes) 

393 

394 def get_manifest(self, ref: str) -> ModelManifest | None: 

395 """Return the manifest for *ref* or None if not installed.""" 

396 try: 

397 hf_repo, gguf_filename = parse_hf_ref(ref) 

398 except ValueError: 

399 return None 

400 return self._read_manifest(hf_repo, gguf_filename) 

401 

402 def installed_ref_for_repo(self, hf_repo: str) -> str | None: 

403 """Full ``<repo>/<file>.gguf`` ref of an installed quant of *hf_repo*, or None. 

404 

405 Alphabetical-first when several quants are installed, matching 

406 ``_resolve_repo_only``'s determinism. 

407 """ 

408 refs = sorted(m.ref for m in self.list_installed() if m.hf_repo == hf_repo) 

409 return refs[0] if refs else None 

410 

411 def _manifest_path(self, hf_repo: str, gguf_filename: str) -> Path: 

412 repo = _validate_hf_repo(hf_repo) 

413 filename = _validate_gguf_filename(gguf_filename) 

414 path = self._manifests_dir / repo_to_dir(repo) / f"{filename}.json" 

415 validate_path_within(path, self._manifests_dir) 

416 return path 

417 

418 def _read_manifest(self, hf_repo: str, gguf_filename: str) -> ModelManifest | None: 

419 return self._load_manifest_file(self._manifest_path(hf_repo, gguf_filename)) 

420 

421 def _write_manifest(self, manifest: ModelManifest) -> None: 

422 path = self._manifest_path(manifest.hf_repo, manifest.gguf_filename) 

423 path.parent.mkdir(parents=True, exist_ok=True) 

424 data = json.dumps(asdict(manifest), indent=2) 

425 tmp_path: str | None = None 

426 try: 

427 with tempfile.NamedTemporaryFile( 

428 dir=path.parent, suffix=".tmp", mode="w", delete=False 

429 ) as tmp: 

430 tmp_path = tmp.name 

431 tmp.write(data) 

432 os.replace(tmp_path, path) 

433 except BaseException: 

434 if tmp_path is not None: 

435 Path(tmp_path).unlink(missing_ok=True) 

436 raise 

437 

438 def _load_manifest_file(self, path: Path) -> ModelManifest | None: 

439 if not path.exists(): 

440 return None 

441 try: 

442 data = json.loads(path.read_text()) 

443 return ModelManifest(**data) 

444 except (json.JSONDecodeError, TypeError, KeyError): 

445 log.warning("Corrupt manifest: %s", path) 

446 return None 

447 

448 

449def register_downloaded_model(entry: CatalogModel, file_path: Path) -> None: 

450 """Write a registry manifest for a freshly downloaded GGUF. 

451 

452 A failed manifest write is logged, not raised, when the GGUF is still in the 

453 HF cache (``resolve`` recovers from it); if it isn't, the download itself is 

454 broken and the failure propagates so the caller reports it. 

455 """ 

456 from datetime import UTC, datetime 

457 

458 registry = ModelRegistry(cfg.models_dir) 

459 manifest = ModelManifest( 

460 hf_repo=entry.hf_repo, 

461 gguf_filename=file_path.name, 

462 size_bytes=file_path.stat().st_size, 

463 task=entry.task, 

464 downloaded_at=datetime.now(UTC).isoformat(), 

465 ) 

466 try: 

467 registry.install(entry.hf_repo, file_path.name, file_path, manifest) 

468 log.info("Registered %s/%s in manifest", entry.hf_repo, file_path.name) 

469 except Exception: 

470 ref = format_native_gguf_ref(entry.hf_repo, file_path.name) 

471 if not registry.is_installed(ref): 

472 raise 

473 log.warning( 

474 "Manifest write failed for %s; recovered via the model cache", ref, exc_info=True 

475 )