Coverage for src / lilbee / catalog / hf_client.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""HuggingFace API client with TTL cache.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import os 

7import threading 

8import time 

9from http import HTTPStatus 

10 

11import httpx 

12from huggingface_hub import ModelInfo 

13from huggingface_hub.hf_api import RepoSibling 

14 

15from lilbee.catalog.compat import classify 

16from lilbee.catalog.models import CatalogModel, HfGgufMeta, HfPage 

17from lilbee.catalog.refs import GGUF_GLOB, pick_best_gguf 

18from lilbee.core.config import cfg 

19 

20log = logging.getLogger(__name__) 

21 

22# Substrings dropped from huggingface_hub's request / file-download loggers. 

23# These advisories aren't actionable in a local TUI: HF prints an 

24# unauthenticated-requests notice on every public pull, and the file_download 

25# logger re-warns on every retry the library schedules. The catalog surfaces 

26# the final download failure with a clear message, so per-attempt warnings 

27# are noise. 

28_HF_SUPPRESS_SUBSTRINGS = ( 

29 "unauthenticated requests to the HF Hub", 

30 "Error while downloading from", 

31 "Trying to resume download", 

32) 

33 

34_HF_FILTERED_LOGGER_NAMES = ( 

35 "huggingface_hub.utils._http", 

36 "huggingface_hub.file_download", 

37) 

38 

39 

40class _HfSubstringFilter(logging.Filter): 

41 """Drop huggingface_hub log records whose message contains a suppressed substring.""" 

42 

43 def __init__(self, needles: tuple[str, ...]) -> None: 

44 super().__init__() 

45 self._needles = needles 

46 

47 def filter(self, record: logging.LogRecord) -> bool: 

48 return not any(n in record.getMessage() for n in self._needles) 

49 

50 

51def install_hf_log_filter() -> None: 

52 """Attach the substring filter to huggingface_hub's chatty loggers. 

53 

54 Called automatically when this module is imported (see the module-top 

55 invocation below) so the filter is in place before any catalog HTTP 

56 call can emit a warning. Exposed as a function so tests can re-apply. 

57 """ 

58 hf_filter = _HfSubstringFilter(_HF_SUPPRESS_SUBSTRINGS) 

59 for name in _HF_FILTERED_LOGGER_NAMES: 

60 logging.getLogger(name).addFilter(hf_filter) 

61 

62 

63# Install the filter at module import. All HF HTTP traffic in lilbee 

64# routes through this module, so installing here always beats the first 

65# huggingface_hub warning to the punch. 

66install_hf_log_filter() 

67 

68HF_API_URL = "https://huggingface.co/api/models" 

69 

70DEFAULT_TIMEOUT = 30.0 

71 

72# Fields requested from the HF listing API via ``?expand=``. Without this 

73# expand, the default response omits siblings, cardData, and gguf. 

74_HF_EXPAND_FIELDS: list[str] = ["gguf", "siblings", "downloads", "pipeline_tag", "cardData"] 

75 

76# HF ``?search=`` is a single space-tokenized substring match on the model id. 

77# Multiple ``search=`` params are silently ignored, so the user's query is 

78# space-joined onto the GGUF filter into one param value. 

79_HF_GGUF_SEARCH_TERM = "GGUF" 

80 

81_EMPTY_HF_PAGE = HfPage(models=[], has_more=False) 

82 

83_BYTES_PER_GB = 1024**3 

84 

85 

86def hf_token() -> str | None: 

87 """Resolve the HuggingFace token in priority order: env > cfg > hub cache.""" 

88 token = os.environ.get("LILBEE_HF_TOKEN") or os.environ.get("HF_TOKEN") or None 

89 if token: 

90 return token 

91 if cfg.hf_token: 

92 return cfg.hf_token 

93 try: 

94 from huggingface_hub import get_token 

95 

96 return get_token() 

97 except Exception: 

98 return None 

99 

100 

101def hf_headers() -> dict[str, str]: 

102 """Build HTTP headers for HuggingFace API requests.""" 

103 token = hf_token() 

104 if token: 

105 return {"Authorization": f"Bearer {token}"} 

106 return {} 

107 

108 

109def _hf_search_value(search: str) -> str: 

110 """Build the HF ``search=`` value: GGUF plus the user's tokens, space-joined.""" 

111 tokens = [_HF_GGUF_SEARCH_TERM, *search.split()] 

112 return " ".join(tokens) 

113 

114 

115def _resolve_sibling_gguf(siblings: list[RepoSibling]) -> str: 

116 """Concrete GGUF filename for a repo's sibling list, or ``GGUF_GLOB``. 

117 

118 Uses the same quant picker as the pull path so the filename a catalog 

119 row carries always names the file a pull of that row produces. 

120 """ 

121 gguf_files = [s.rfilename for s in siblings if s.rfilename.endswith(".gguf")] 

122 if not gguf_files: 

123 return GGUF_GLOB 

124 return pick_best_gguf(gguf_files) 

125 

126 

127def _estimate_size_from_siblings(siblings: list[RepoSibling]) -> float: 

128 """Estimate model size in GB from the largest GGUF file in siblings.""" 

129 max_bytes = 0 

130 for sib in siblings: 

131 if sib.rfilename.endswith(".gguf"): 

132 max_bytes = max(max_bytes, sib.size or 0) 

133 if max_bytes > 0: 

134 return round(max_bytes / _BYTES_PER_GB, 1) 

135 return 0.0 # unknown: display as "?" in UI 

136 

137 

138class HfClient: 

139 """HuggingFace catalog API client with a per-instance TTL cache. 

140 

141 Holds the per-process cache of catalog pages keyed by query 

142 parameters. The cache TTL and capacity are class-level so tests can 

143 override them via subclassing if needed; the cache state itself is 

144 per-instance so ``reset_services()`` discards a stale instance 

145 along with its cache. 

146 """ 

147 

148 CACHE_TTL: float = 300.0 

149 CACHE_MAX_ENTRIES: int = 50 

150 # Rate-limit the "Failed to fetch models" warning so an offline user 

151 # doesn't see one line per UI tick. First failure surfaces immediately; 

152 # repeats within the window stay at DEBUG. 

153 FETCH_FAILURE_WARN_INTERVAL_S: float = 300.0 

154 

155 def __init__(self) -> None: 

156 self._cache: dict[str, tuple[float, HfPage]] = {} 

157 self._cache_lock = threading.Lock() 

158 self._arch_cache: dict[str, str] = {} 

159 # -inf, not 0.0: on a freshly booted machine ``time.monotonic()`` can be 

160 # smaller than the window, which would push the first failure to DEBUG. 

161 self._last_fetch_failure_warn: float = float("-inf") 

162 

163 def get_cached_arch(self, ref: str) -> str | None: 

164 """Return the cached `general.architecture` for *ref*, or None if not cached.""" 

165 return self._arch_cache.get(ref) 

166 

167 def cache_arch(self, ref: str, architecture: str) -> None: 

168 """Record *architecture* for *ref* in the per-instance cache.""" 

169 self._arch_cache[ref] = architecture 

170 

171 def fetch_models( 

172 self, 

173 pipeline_tag: str = "text-generation", 

174 sort: str = "downloads", 

175 limit: int = 50, 

176 offset: int = 0, 

177 library: str | None = None, 

178 search: str = "", 

179 ) -> HfPage: 

180 """Fetch GGUF models from HuggingFace API with TTL cache. 

181 

182 Returns an ``HfPage`` with a ``has_more`` flag derived from the 

183 ``Link: <...>; rel="next"`` response header (RFC 5988), the same 

184 mechanism the ``huggingface_hub`` library uses internally. 

185 """ 

186 # Local import to avoid a cycle: query imports hf_client (this 

187 # module), and hf_client uses pipeline_to_task from query. 

188 from lilbee.catalog.query import pipeline_to_task 

189 

190 search_value = _hf_search_value(search) 

191 cache_key = f"{pipeline_tag}:{sort}:{limit}:{offset}:{library}:{search_value}" 

192 now = time.monotonic() 

193 with self._cache_lock: 

194 expired = [k for k, (ts, _) in self._cache.items() if now - ts >= self.CACHE_TTL] 

195 for k in expired: 

196 del self._cache[k] 

197 

198 cached = self._cache.get(cache_key) 

199 if cached and now - cached[0] < self.CACHE_TTL: 

200 return cached[1] 

201 

202 params = httpx.QueryParams( 

203 pipeline_tag=pipeline_tag, 

204 search=search_value, 

205 sort=sort, 

206 limit=limit, 

207 skip=offset, 

208 expand=_HF_EXPAND_FIELDS, 

209 ) 

210 if library: 

211 params = params.add("library", library) 

212 try: 

213 resp = httpx.get( 

214 HF_API_URL, params=params, timeout=DEFAULT_TIMEOUT, headers=hf_headers() 

215 ) 

216 if resp.status_code >= HTTPStatus.BAD_REQUEST: 

217 log.warning("HuggingFace API returned HTTP %d", resp.status_code) 

218 return _EMPTY_HF_PAGE 

219 data = resp.json() 

220 except (httpx.HTTPError, ValueError) as exc: 

221 self._log_fetch_failure(exc) 

222 return _EMPTY_HF_PAGE 

223 

224 has_more = "next" in resp.links 

225 

226 models: list[CatalogModel] = [] 

227 for raw in data: 

228 if not raw.get("id"): 

229 continue 

230 item = ModelInfo(**raw) 

231 card_desc = item.card_data.get("description", "") if item.card_data else "" 

232 gguf_meta = HfGgufMeta(**(item.gguf or {})) 

233 if gguf_meta.total > 0: 

234 size_gb = round(gguf_meta.total / _BYTES_PER_GB, 1) 

235 else: 

236 size_gb = _estimate_size_from_siblings(item.siblings or []) 

237 task = pipeline_to_task(item.pipeline_tag or "") 

238 models.append( 

239 CatalogModel( 

240 hf_repo=item.id, 

241 gguf_filename=_resolve_sibling_gguf(item.siblings or []), 

242 size_gb=size_gb, 

243 min_ram_gb=round(max(2.0, size_gb * 1.5), 1), 

244 description=card_desc[:120] if card_desc else "", 

245 featured=False, 

246 downloads=item.downloads or 0, 

247 task=task, 

248 architecture=gguf_meta.architecture, 

249 compat=classify(gguf_meta.architecture), 

250 ) 

251 ) 

252 self.cache_arch(item.id, gguf_meta.architecture) 

253 page = HfPage(models=models, has_more=has_more) 

254 with self._cache_lock: 

255 self._cache[cache_key] = (now, page) 

256 if len(self._cache) > self.CACHE_MAX_ENTRIES: 

257 oldest_key = min(self._cache, key=lambda k: self._cache[k][0]) 

258 del self._cache[oldest_key] 

259 return page 

260 

261 def _log_fetch_failure(self, exc: Exception) -> None: 

262 """Log an HF fetch failure, rate-limited so offline use doesn't spam. 

263 

264 First failure of each ``FETCH_FAILURE_WARN_INTERVAL_S`` window logs 

265 at WARNING; repeats within the window log at DEBUG. The interval 

266 starts from the last WARNING so a flapping network produces one 

267 line every five minutes, not one per UI tick. 

268 """ 

269 now = time.monotonic() 

270 if now - self._last_fetch_failure_warn >= self.FETCH_FAILURE_WARN_INTERVAL_S: 

271 log.warning("Failed to fetch models from HuggingFace: %s", exc) 

272 self._last_fetch_failure_warn = now 

273 else: 

274 log.debug("Suppressed repeat HF fetch failure: %s", exc)