Coverage for src / lilbee / modelhub / registry.py: 100%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Manifest store keyed by ``(hf_repo, gguf_filename)`` over the HF cache. 

2 

3Canonical ref: ``<hf_repo>/<gguf_filename>``. Two quants of the same 

4repo are two distinct installations. Manifests live at 

5``manifests/<repo--repo>/<filename>.json``; blobs at 

6``models--<repo--repo>/blobs/<sha>``. 

7""" 

8 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13import logging 

14import os 

15import re 

16import shutil 

17import tempfile 

18from dataclasses import asdict, dataclass 

19from pathlib import Path 

20from typing import TYPE_CHECKING 

21 

22from lilbee.catalog.refs import format_native_gguf_ref 

23from lilbee.core.config.model import cfg 

24from lilbee.core.security import validate_path_within 

25 

26if TYPE_CHECKING: 

27 from lilbee.catalog.models import CatalogModel 

28 from lilbee.catalog.types import ModelTask 

29 

30log = logging.getLogger(__name__) 

31 

32_HASH_CHUNK_SIZE = 8192 # bytes read per iteration when hashing 

33_REPO_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+$") 

34_FILENAME_RE = re.compile(r"^[a-zA-Z0-9._-]+\.gguf$") 

35 

36REPO_DIR_SEPARATOR = "--" 

37 

38 

39def _validate_hf_repo(hf_repo: str) -> str: 

40 """Validate that a HuggingFace repo id has the form ``org/name``.""" 

41 if not hf_repo or not _REPO_SEGMENT_RE.match(hf_repo) or ".." in hf_repo: 

42 raise ValueError(f"Invalid hf_repo: {hf_repo!r}") 

43 return hf_repo 

44 

45 

46def _validate_gguf_filename(filename: str) -> str: 

47 """Validate that a filename is a safe ``.gguf`` basename (no path separators).""" 

48 if not filename or not _FILENAME_RE.match(filename) or ".." in filename: 

49 raise ValueError(f"Invalid gguf_filename: {filename!r}") 

50 return filename 

51 

52 

53_REF_SHAPE_HINT = "Use '<org>/<repo>/<filename>.gguf'." 

54 

55 

56def parse_hf_ref(ref: str) -> tuple[str, str]: 

57 """Split ``<org>/<repo>/<file>.gguf`` into ``(hf_repo, gguf_filename)``.""" 

58 if not ref.endswith(".gguf") or "/" not in ref: 

59 raise ValueError(f"Model ref {ref!r} is not a HuggingFace ref. {_REF_SHAPE_HINT}") 

60 hf_repo, gguf_filename = ref.rsplit("/", 1) 

61 return _validate_hf_repo(hf_repo), _validate_gguf_filename(gguf_filename) 

62 

63 

64def repo_to_dir(hf_repo: str) -> str: 

65 """Encode an HF repo for use as a directory name (HF cache convention).""" 

66 return hf_repo.replace("/", REPO_DIR_SEPARATOR) 

67 

68 

69@dataclass 

70class ModelManifest: 

71 """One installed model's metadata. Identity: ``(hf_repo, gguf_filename)``.""" 

72 

73 hf_repo: str 

74 gguf_filename: str 

75 size_bytes: int 

76 task: ModelTask 

77 downloaded_at: str # ISO 8601 

78 blob: str | None = None # SHA-256 hex of the blob in the HF cache; None pre-install 

79 

80 @property 

81 def ref(self) -> str: 

82 return format_native_gguf_ref(self.hf_repo, self.gguf_filename) 

83 

84 

85def _sha256_file(path: Path) -> str: 

86 """Compute SHA-256 hex digest of a file.""" 

87 h = hashlib.sha256() 

88 with path.open("rb") as f: 

89 while True: 

90 chunk = f.read(_HASH_CHUNK_SIZE) 

91 if not chunk: 

92 break 

93 h.update(chunk) 

94 return h.hexdigest() 

95 

96 

97class ModelRegistry: 

98 """Read/write manifests and resolve refs to blobs in the HF cache.""" 

99 

100 def __init__(self, models_dir: Path) -> None: 

101 self._root = models_dir 

102 self._manifests_dir = models_dir / "manifests" 

103 

104 def _repo_cache_dir(self, hf_repo: str) -> Path: 

105 """The HuggingFace cache directory for *hf_repo* under this registry root.""" 

106 return self._root / f"models--{repo_to_dir(hf_repo)}" 

107 

108 def resolve(self, ref: str) -> Path: 

109 """Return the blob path for *ref*; ``KeyError`` if not installed. 

110 

111 The canonical *ref* is ``<org>/<repo>/<file>.gguf`` resolved via the 

112 lilbee manifest. Two other shapes are accepted as a backwards-compat 

113 concession for builds already published (whose on-disk layout differs), 

114 not as the intended contract: a bare ``<org>/<repo>`` (older builds 

115 persisted these into ``config.toml``) resolves to the one quant of that 

116 repo that's installed, and a manifest that's missing / unparseable / 

117 blob-less falls back to whatever GGUF ``huggingface_hub`` reports the 

118 cache holds for that ref. The HF cache layout is stable, so this lets an 

119 upgrade keep working without anyone purging their lilbee data dir; it is 

120 deliberately the exception here, not a pattern to follow elsewhere. 

121 """ 

122 if not ref.endswith(".gguf") and ref.count("/") == 1: 

123 return self._resolve_repo_only(_validate_hf_repo(ref)) 

124 hf_repo, gguf_filename = parse_hf_ref(ref) 

125 manifest = self._read_manifest(hf_repo, gguf_filename) 

126 if manifest is not None and manifest.blob is not None: 

127 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob 

128 if blob_file.exists(): 

129 return blob_file 

130 recovered = self._find_cached_gguf(hf_repo, gguf_filename) 

131 if recovered is not None: 

132 self._reregister_from_cache(hf_repo, gguf_filename, recovered) 

133 return recovered 

134 if manifest is None: 

135 raise KeyError(f"Model {ref} not installed") 

136 # Manifest present but neither it nor the cache yields a blob; keep the 

137 # specific diagnostic so a corrupted cache stays debuggable. 

138 cache_path = self._repo_cache_dir(manifest.hf_repo) 

139 if not cache_path.exists(): 

140 raise KeyError(f"Cache folder missing for {ref}: {cache_path.name}") 

141 if manifest.blob is None: 

142 raise KeyError(f"Manifest for {ref} has no blob hash; install incomplete") 

143 raise KeyError(f"Blob file missing for {ref}: {manifest.blob}") 

144 

145 def _resolve_repo_only(self, hf_repo: str) -> Path: 

146 """Resolve a bare ``<org>/<repo>`` ref to the GGUF of that repo on disk. 

147 

148 Older builds persisted bare repo refs for the chat / embedding model. 

149 Prefers a current-format manifest under the repo; otherwise asks 

150 ``huggingface_hub`` what GGUFs the cache holds for the repo and returns 

151 the first one (alphabetical for determinism if more than one quant is 

152 installed). 

153 """ 

154 manifest_dir = self._manifests_dir / repo_to_dir(hf_repo) 

155 if manifest_dir.is_dir(): 

156 for mf in sorted(manifest_dir.glob("*.gguf.json")): 

157 manifest = self._load_manifest_file(mf) 

158 if manifest is None or manifest.blob is None: 

159 continue 

160 blob = self._repo_cache_dir(hf_repo) / "blobs" / manifest.blob 

161 if blob.exists(): 

162 return blob 

163 for filename in sorted(self._cached_gguf_names(hf_repo)): 

164 recovered = self._find_cached_gguf(hf_repo, filename) 

165 if recovered is not None: 

166 self._reregister_from_cache(hf_repo, filename, recovered) 

167 return recovered 

168 raise KeyError(f"Model {hf_repo} not installed") 

169 

170 def _cached_gguf_names(self, hf_repo: str) -> set[str]: 

171 """``.gguf`` filenames the HuggingFace cache holds for *hf_repo*.""" 

172 if not self._root.is_dir(): 

173 return set() 

174 from huggingface_hub import scan_cache_dir 

175 

176 info = scan_cache_dir(self._root) 

177 return { 

178 f.file_name 

179 for repo in info.repos 

180 if repo.repo_id == hf_repo 

181 for rev in repo.revisions 

182 for f in rev.files 

183 if f.file_name.endswith(".gguf") 

184 } 

185 

186 def _find_cached_gguf(self, hf_repo: str, gguf_filename: str) -> Path | None: 

187 """Return the cached blob path for ``hf_repo``/``gguf_filename``, or None. 

188 

189 Uses ``huggingface_hub.try_to_load_from_cache`` so we honor whatever 

190 cache layout HF uses, then resolves the returned snapshot symlink to the 

191 blob, bounded to the cache directory. 

192 """ 

193 from huggingface_hub import try_to_load_from_cache 

194 

195 hit = try_to_load_from_cache( 

196 repo_id=hf_repo, filename=gguf_filename, cache_dir=str(self._root) 

197 ) 

198 if not isinstance(hit, str): # None (not cached) or the _CACHED_NO_EXIST sentinel 

199 return None 

200 resolved = Path(hit).resolve() 

201 try: 

202 validate_path_within(resolved, self._root) 

203 except ValueError: 

204 return None 

205 return resolved 

206 

207 def _reregister_from_cache(self, hf_repo: str, gguf_filename: str, blob_path: Path) -> None: 

208 """Write a fresh manifest for a model just recovered from the HF cache. 

209 

210 ``list_installed`` only walks ``manifests/``, so a cache-recovered model 

211 is resolvable but otherwise invisible (``lilbee model list``, the TUI 

212 catalog, the pull command's "already installed" check) until a manifest 

213 exists. The ``task`` comes from the featured catalog; for a non-catalog 

214 ref it's unknown, so the rewrite is skipped. Best-effort: a read-only 

215 models dir or a write race must not break the resolve that succeeded. 

216 """ 

217 from datetime import UTC, datetime 

218 

219 ref = format_native_gguf_ref(hf_repo, gguf_filename) 

220 try: 

221 from lilbee.catalog import ( 

222 find_catalog_entry, 

223 ) # deferred: lilbee.catalog is a heavy import 

224 

225 entry = find_catalog_entry(ref) 

226 if entry is None: 

227 return 

228 self._write_manifest( 

229 ModelManifest( 

230 hf_repo=hf_repo, 

231 gguf_filename=gguf_filename, 

232 size_bytes=blob_path.stat().st_size, 

233 task=entry.task, 

234 downloaded_at=datetime.now(UTC).isoformat(), 

235 blob=blob_path.name, # the blob's filename is its sha in the HF cache 

236 ) 

237 ) 

238 log.info("Recovered manifest for %s from the model cache", ref) 

239 except Exception: # cache-warming write; the resolve already returned a path 

240 log.debug("Could not re-register %s from the model cache", ref, exc_info=True) 

241 

242 def is_installed(self, ref: str) -> bool: 

243 """Return True if a model is installed and its blob is present.""" 

244 try: 

245 self.resolve(ref) 

246 return True 

247 except (KeyError, ValueError): 

248 return False 

249 

250 def install( 

251 self, 

252 hf_repo: str, 

253 gguf_filename: str, 

254 source_path: Path, 

255 manifest: ModelManifest, 

256 ) -> Path: 

257 """Write a manifest, copying *source_path* into the HF cache if needed.""" 

258 import shutil 

259 

260 digest = _sha256_file(source_path) 

261 cache_path = self._repo_cache_dir(hf_repo) 

262 blobs_dir = cache_path / "blobs" 

263 blob_path = blobs_dir / digest 

264 if not blob_path.exists(): 

265 blobs_dir.mkdir(parents=True, exist_ok=True) 

266 shutil.copy2(source_path, blob_path) 

267 

268 updated = ModelManifest( 

269 hf_repo=hf_repo, 

270 gguf_filename=gguf_filename, 

271 size_bytes=manifest.size_bytes, 

272 task=manifest.task, 

273 downloaded_at=manifest.downloaded_at, 

274 blob=digest, 

275 ) 

276 self._write_manifest(updated) 

277 return blob_path 

278 

279 def remove(self, ref: str) -> bool: 

280 """Remove a manifest and its backing blob. 

281 

282 The blob is shared via SHA-256 digest, so it only goes away 

283 when no other installed manifest references the same digest. 

284 Empty cache directories (``blobs/``, the per-repo ``models--`` 

285 folder, and the per-repo manifest folder) are pruned so a 

286 deleted model leaves no orphan bytes behind. 

287 """ 

288 try: 

289 hf_repo, gguf_filename = parse_hf_ref(ref) 

290 except ValueError: 

291 return False 

292 manifest = self._read_manifest(hf_repo, gguf_filename) 

293 if manifest is None: 

294 return False 

295 manifest_path = self._manifest_path(hf_repo, gguf_filename) 

296 manifest_path.unlink() 

297 repo_dir = manifest_path.parent 

298 if repo_dir.exists() and not any(repo_dir.iterdir()): 

299 repo_dir.rmdir() 

300 if manifest.blob is not None: 

301 self._gc_blob(manifest.hf_repo, manifest.blob) 

302 log.info("Removed model %s", ref) 

303 return True 

304 

305 def _gc_blob(self, hf_repo: str, digest: str) -> None: 

306 """Drop blob bytes and HuggingFace cache cruft now that *digest* 

307 and possibly the whole repo are unused. 

308 

309 When the per-repo ``models--<repo>/`` directory has no installed 

310 manifests left, the whole directory is wiped so HF's ``refs/``, 

311 ``snapshots/``, and stale ``blobs/`` all go with it. Otherwise 

312 only the specific blob file is removed when no remaining 

313 manifest still references its digest. 

314 """ 

315 cache_path = self._repo_cache_dir(hf_repo) 

316 try: 

317 validate_path_within(cache_path, self._root) 

318 except ValueError: 

319 log.warning("Refusing to remove cache outside models_dir: %s", cache_path) 

320 return 

321 siblings = [m for m in self.list_installed() if m.hf_repo == hf_repo] 

322 if not siblings: 

323 if cache_path.exists(): 

324 shutil.rmtree(cache_path) 

325 return 

326 if any(m.blob == digest for m in siblings): 

327 return 

328 blob_file = cache_path / "blobs" / digest 

329 if blob_file.exists(): 

330 blob_file.unlink() 

331 

332 def list_installed(self) -> list[ModelManifest]: 

333 """Return manifests for models whose blob is fully present on disk. 

334 

335 A manifest with a null blob field or a missing blob file is the 

336 residue of a canceled or partial download. Surfacing it would 

337 let the picker offer an unusable selection, so the read filter 

338 lives here at the source instead of in every UI caller. 

339 """ 

340 manifests: list[ModelManifest] = [] 

341 if not self._manifests_dir.exists(): 

342 return manifests 

343 for repo_dir in sorted(self._manifests_dir.iterdir()): 

344 if not repo_dir.is_dir(): 

345 continue 

346 for tag_file in sorted(repo_dir.glob("*.gguf.json")): 

347 manifest = self._load_manifest_file(tag_file) 

348 if manifest is not None and self._blob_present(manifest): 

349 manifests.append(manifest) 

350 return manifests 

351 

352 def _blob_present(self, manifest: ModelManifest) -> bool: 

353 """True iff *manifest* points at an existing blob file.""" 

354 if manifest.blob is None: 

355 return False 

356 blob_file = self._repo_cache_dir(manifest.hf_repo) / "blobs" / manifest.blob 

357 return blob_file.exists() 

358 

359 def get_manifest(self, ref: str) -> ModelManifest | None: 

360 """Return the manifest for *ref* or None if not installed.""" 

361 try: 

362 hf_repo, gguf_filename = parse_hf_ref(ref) 

363 except ValueError: 

364 return None 

365 return self._read_manifest(hf_repo, gguf_filename) 

366 

367 def _manifest_path(self, hf_repo: str, gguf_filename: str) -> Path: 

368 repo = _validate_hf_repo(hf_repo) 

369 filename = _validate_gguf_filename(gguf_filename) 

370 path = self._manifests_dir / repo_to_dir(repo) / f"{filename}.json" 

371 validate_path_within(path, self._manifests_dir) 

372 return path 

373 

374 def _read_manifest(self, hf_repo: str, gguf_filename: str) -> ModelManifest | None: 

375 return self._load_manifest_file(self._manifest_path(hf_repo, gguf_filename)) 

376 

377 def _write_manifest(self, manifest: ModelManifest) -> None: 

378 path = self._manifest_path(manifest.hf_repo, manifest.gguf_filename) 

379 path.parent.mkdir(parents=True, exist_ok=True) 

380 data = json.dumps(asdict(manifest), indent=2) 

381 tmp_path: str | None = None 

382 try: 

383 with tempfile.NamedTemporaryFile( 

384 dir=path.parent, suffix=".tmp", mode="w", delete=False 

385 ) as tmp: 

386 tmp_path = tmp.name 

387 tmp.write(data) 

388 os.replace(tmp_path, path) 

389 except BaseException: 

390 if tmp_path is not None: 

391 Path(tmp_path).unlink(missing_ok=True) 

392 raise 

393 

394 def _load_manifest_file(self, path: Path) -> ModelManifest | None: 

395 if not path.exists(): 

396 return None 

397 try: 

398 data = json.loads(path.read_text()) 

399 return ModelManifest(**data) 

400 except (json.JSONDecodeError, TypeError, KeyError): 

401 log.warning("Corrupt manifest: %s", path) 

402 return None 

403 

404 

405def register_downloaded_model(entry: CatalogModel, file_path: Path) -> None: 

406 """Write a registry manifest for a freshly downloaded GGUF. 

407 

408 A failed manifest write is logged, not raised, when the GGUF is still in the 

409 HF cache (``resolve`` recovers from it); if it isn't, the download itself is 

410 broken and the failure propagates so the caller reports it. 

411 """ 

412 from datetime import UTC, datetime 

413 

414 registry = ModelRegistry(cfg.models_dir) 

415 manifest = ModelManifest( 

416 hf_repo=entry.hf_repo, 

417 gguf_filename=file_path.name, 

418 size_bytes=file_path.stat().st_size, 

419 task=entry.task, 

420 downloaded_at=datetime.now(UTC).isoformat(), 

421 ) 

422 try: 

423 registry.install(entry.hf_repo, file_path.name, file_path, manifest) 

424 log.info("Registered %s/%s in manifest", entry.hf_repo, file_path.name) 

425 except Exception: 

426 ref = format_native_gguf_ref(entry.hf_repo, file_path.name) 

427 if not registry.is_installed(ref): 

428 raise 

429 log.warning( 

430 "Manifest write failed for %s; recovered via the model cache", ref, exc_info=True 

431 )