Coverage for src / lilbee / catalog / hf_client.py: 100%
129 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""HuggingFace API client with TTL cache."""
3from __future__ import annotations
5import logging
6import os
7import threading
8import time
9from http import HTTPStatus
11import httpx
12from huggingface_hub import ModelInfo
13from huggingface_hub.hf_api import RepoSibling
15from lilbee.catalog.compat import classify
16from lilbee.catalog.models import CatalogModel, HfGgufMeta, HfPage
17from lilbee.catalog.refs import GGUF_GLOB, pick_best_gguf
18from lilbee.core.config import cfg
20log = logging.getLogger(__name__)
22# Substrings dropped from huggingface_hub's request / file-download loggers.
23# These advisories aren't actionable in a local TUI: HF prints an
24# unauthenticated-requests notice on every public pull, and the file_download
25# logger re-warns on every retry the library schedules. The catalog surfaces
26# the final download failure with a clear message, so per-attempt warnings
27# are noise.
28_HF_SUPPRESS_SUBSTRINGS = (
29 "unauthenticated requests to the HF Hub",
30 "Error while downloading from",
31 "Trying to resume download",
32)
34_HF_FILTERED_LOGGER_NAMES = (
35 "huggingface_hub.utils._http",
36 "huggingface_hub.file_download",
37)
40class _HfSubstringFilter(logging.Filter):
41 """Drop huggingface_hub log records whose message contains a suppressed substring."""
43 def __init__(self, needles: tuple[str, ...]) -> None:
44 super().__init__()
45 self._needles = needles
47 def filter(self, record: logging.LogRecord) -> bool:
48 return not any(n in record.getMessage() for n in self._needles)
51def install_hf_log_filter() -> None:
52 """Attach the substring filter to huggingface_hub's chatty loggers.
54 Called automatically when this module is imported (see the module-top
55 invocation below) so the filter is in place before any catalog HTTP
56 call can emit a warning. Exposed as a function so tests can re-apply.
57 """
58 hf_filter = _HfSubstringFilter(_HF_SUPPRESS_SUBSTRINGS)
59 for name in _HF_FILTERED_LOGGER_NAMES:
60 logging.getLogger(name).addFilter(hf_filter)
63# Install the filter at module import. All HF HTTP traffic in lilbee
64# routes through this module, so installing here always beats the first
65# huggingface_hub warning to the punch.
66install_hf_log_filter()
68HF_API_URL = "https://huggingface.co/api/models"
70DEFAULT_TIMEOUT = 30.0
72# Fields requested from the HF listing API via ``?expand=``. Without this
73# expand, the default response omits siblings, cardData, and gguf.
74_HF_EXPAND_FIELDS: list[str] = ["gguf", "siblings", "downloads", "pipeline_tag", "cardData"]
76# HF ``?search=`` is a single space-tokenized substring match on the model id.
77# Multiple ``search=`` params are silently ignored, so the user's query is
78# space-joined onto the GGUF filter into one param value.
79_HF_GGUF_SEARCH_TERM = "GGUF"
81_EMPTY_HF_PAGE = HfPage(models=[], has_more=False)
83_BYTES_PER_GB = 1024**3
86def hf_token() -> str | None:
87 """Resolve the HuggingFace token in priority order: env > cfg > hub cache."""
88 token = os.environ.get("LILBEE_HF_TOKEN") or os.environ.get("HF_TOKEN") or None
89 if token:
90 return token
91 if cfg.hf_token:
92 return cfg.hf_token
93 try:
94 from huggingface_hub import get_token
96 return get_token()
97 except Exception:
98 return None
101def hf_headers() -> dict[str, str]:
102 """Build HTTP headers for HuggingFace API requests."""
103 token = hf_token()
104 if token:
105 return {"Authorization": f"Bearer {token}"}
106 return {}
109def _hf_search_value(search: str) -> str:
110 """Build the HF ``search=`` value: GGUF plus the user's tokens, space-joined."""
111 tokens = [_HF_GGUF_SEARCH_TERM, *search.split()]
112 return " ".join(tokens)
115def _resolve_sibling_gguf(siblings: list[RepoSibling]) -> str:
116 """Concrete GGUF filename for a repo's sibling list, or ``GGUF_GLOB``.
118 Uses the same quant picker as the pull path so the filename a catalog
119 row carries always names the file a pull of that row produces.
120 """
121 gguf_files = [s.rfilename for s in siblings if s.rfilename.endswith(".gguf")]
122 if not gguf_files:
123 return GGUF_GLOB
124 return pick_best_gguf(gguf_files)
127def _estimate_size_from_siblings(siblings: list[RepoSibling]) -> float:
128 """Estimate model size in GB from the largest GGUF file in siblings."""
129 max_bytes = 0
130 for sib in siblings:
131 if sib.rfilename.endswith(".gguf"):
132 max_bytes = max(max_bytes, sib.size or 0)
133 if max_bytes > 0:
134 return round(max_bytes / _BYTES_PER_GB, 1)
135 return 0.0 # unknown: display as "?" in UI
138class HfClient:
139 """HuggingFace catalog API client with a per-instance TTL cache.
141 Holds the per-process cache of catalog pages keyed by query
142 parameters. The cache TTL and capacity are class-level so tests can
143 override them via subclassing if needed; the cache state itself is
144 per-instance so ``reset_services()`` discards a stale instance
145 along with its cache.
146 """
148 CACHE_TTL: float = 300.0
149 CACHE_MAX_ENTRIES: int = 50
150 # Rate-limit the "Failed to fetch models" warning so an offline user
151 # doesn't see one line per UI tick. First failure surfaces immediately;
152 # repeats within the window stay at DEBUG.
153 FETCH_FAILURE_WARN_INTERVAL_S: float = 300.0
155 def __init__(self) -> None:
156 self._cache: dict[str, tuple[float, HfPage]] = {}
157 self._cache_lock = threading.Lock()
158 self._arch_cache: dict[str, str] = {}
159 # -inf, not 0.0: on a freshly booted machine ``time.monotonic()`` can be
160 # smaller than the window, which would push the first failure to DEBUG.
161 self._last_fetch_failure_warn: float = float("-inf")
163 def get_cached_arch(self, ref: str) -> str | None:
164 """Return the cached `general.architecture` for *ref*, or None if not cached."""
165 return self._arch_cache.get(ref)
167 def cache_arch(self, ref: str, architecture: str) -> None:
168 """Record *architecture* for *ref* in the per-instance cache."""
169 self._arch_cache[ref] = architecture
171 def fetch_models(
172 self,
173 pipeline_tag: str = "text-generation",
174 sort: str = "downloads",
175 limit: int = 50,
176 offset: int = 0,
177 library: str | None = None,
178 search: str = "",
179 ) -> HfPage:
180 """Fetch GGUF models from HuggingFace API with TTL cache.
182 Returns an ``HfPage`` with a ``has_more`` flag derived from the
183 ``Link: <...>; rel="next"`` response header (RFC 5988), the same
184 mechanism the ``huggingface_hub`` library uses internally.
185 """
186 # Local import to avoid a cycle: query imports hf_client (this
187 # module), and hf_client uses pipeline_to_task from query.
188 from lilbee.catalog.query import pipeline_to_task
190 search_value = _hf_search_value(search)
191 cache_key = f"{pipeline_tag}:{sort}:{limit}:{offset}:{library}:{search_value}"
192 now = time.monotonic()
193 with self._cache_lock:
194 expired = [k for k, (ts, _) in self._cache.items() if now - ts >= self.CACHE_TTL]
195 for k in expired:
196 del self._cache[k]
198 cached = self._cache.get(cache_key)
199 if cached and now - cached[0] < self.CACHE_TTL:
200 return cached[1]
202 params = httpx.QueryParams(
203 pipeline_tag=pipeline_tag,
204 search=search_value,
205 sort=sort,
206 limit=limit,
207 skip=offset,
208 expand=_HF_EXPAND_FIELDS,
209 )
210 if library:
211 params = params.add("library", library)
212 try:
213 resp = httpx.get(
214 HF_API_URL, params=params, timeout=DEFAULT_TIMEOUT, headers=hf_headers()
215 )
216 if resp.status_code >= HTTPStatus.BAD_REQUEST:
217 log.warning("HuggingFace API returned HTTP %d", resp.status_code)
218 return _EMPTY_HF_PAGE
219 data = resp.json()
220 except (httpx.HTTPError, ValueError) as exc:
221 self._log_fetch_failure(exc)
222 return _EMPTY_HF_PAGE
224 has_more = "next" in resp.links
226 models: list[CatalogModel] = []
227 for raw in data:
228 if not raw.get("id"):
229 continue
230 item = ModelInfo(**raw)
231 card_desc = item.card_data.get("description", "") if item.card_data else ""
232 gguf_meta = HfGgufMeta(**(item.gguf or {}))
233 if gguf_meta.total > 0:
234 size_gb = round(gguf_meta.total / _BYTES_PER_GB, 1)
235 else:
236 size_gb = _estimate_size_from_siblings(item.siblings or [])
237 task = pipeline_to_task(item.pipeline_tag or "")
238 models.append(
239 CatalogModel(
240 hf_repo=item.id,
241 gguf_filename=_resolve_sibling_gguf(item.siblings or []),
242 size_gb=size_gb,
243 min_ram_gb=round(max(2.0, size_gb * 1.5), 1),
244 description=card_desc[:120] if card_desc else "",
245 featured=False,
246 downloads=item.downloads or 0,
247 task=task,
248 architecture=gguf_meta.architecture,
249 compat=classify(gguf_meta.architecture),
250 )
251 )
252 self.cache_arch(item.id, gguf_meta.architecture)
253 page = HfPage(models=models, has_more=has_more)
254 with self._cache_lock:
255 self._cache[cache_key] = (now, page)
256 if len(self._cache) > self.CACHE_MAX_ENTRIES:
257 oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
258 del self._cache[oldest_key]
259 return page
261 def _log_fetch_failure(self, exc: Exception) -> None:
262 """Log an HF fetch failure, rate-limited so offline use doesn't spam.
264 First failure of each ``FETCH_FAILURE_WARN_INTERVAL_S`` window logs
265 at WARNING; repeats within the window log at DEBUG. The interval
266 starts from the last WARNING so a flapping network produces one
267 line every five minutes, not one per UI tick.
268 """
269 now = time.monotonic()
270 if now - self._last_fetch_failure_warn >= self.FETCH_FAILURE_WARN_INTERVAL_S:
271 log.warning("Failed to fetch models from HuggingFace: %s", exc)
272 self._last_fetch_failure_warn = now
273 else:
274 log.debug("Suppressed repeat HF fetch failure: %s", exc)