Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import functools 

7import inspect 

8import io 

9import logging 

10import math 

11from collections.abc import AsyncGenerator, AsyncIterator 

12from typing import TYPE_CHECKING, Any 

13from urllib.parse import urlparse 

14 

15from lilbee.crawler import bootstrap 

16from lilbee.crawler.bootstrap import CrawlerBrowserError 

17from lilbee.crawler.models import ( 

18 CancelToken, 

19 ConcurrencySpec, 

20 FetchedPage, 

21 FilterSpec, 

22) 

23 

24if TYPE_CHECKING: 

25 from lilbee.crawler.fetcher import WebFetcher 

26 

27log = logging.getLogger(__name__) 

28 

29 

30def _build_rate_limited_dispatcher(concurrency: ConcurrencySpec) -> Any: 

31 """Build a SemaphoreDispatcher + RateLimiter from a ConcurrencySpec, or None. 

32 

33 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher 

34 kwarg, so per-domain rate limiting is only reachable by threading a 

35 dispatcher through AsyncWebCrawler itself. This helper centralizes the 

36 spec read so the TUI / CLI / server all get identical behavior. 

37 """ 

38 if not concurrency.retry_on_rate_limit: 

39 return None 

40 from crawl4ai.async_dispatcher import RateLimiter, SemaphoreDispatcher 

41 

42 rate_limiter = RateLimiter( 

43 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max), 

44 max_delay=concurrency.retry_max_backoff, 

45 max_retries=concurrency.retry_max_attempts, 

46 ) 

47 return SemaphoreDispatcher( 

48 semaphore_count=concurrency.semaphore_count, 

49 rate_limiter=rate_limiter, 

50 ) 

51 

52 

53class _LilbeeAsyncCrawler: 

54 """AsyncWebCrawler wrapper that injects a default dispatcher on ``arun_many``. 

55 

56 BFSDeepCrawlStrategy calls ``arun_many`` without a dispatcher kwarg, so the 

57 wrapper supplies one to make rate limiting and 429/503 retries reachable. 

58 An explicit ``dispatcher=`` on the call still wins. 

59 """ 

60 

61 def __init__(self, *, verbose: bool, dispatcher: Any) -> None: 

62 from crawl4ai import AsyncWebCrawler 

63 

64 self._inner = AsyncWebCrawler(verbose=verbose) 

65 self._dispatcher = dispatcher 

66 

67 async def __aenter__(self) -> _LilbeeAsyncCrawler: 

68 await self._inner.__aenter__() 

69 return self 

70 

71 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any: 

72 return await self._inner.__aexit__(exc_type, exc, tb) 

73 

74 async def arun(self, *args: Any, **kwargs: Any) -> Any: 

75 return await self._inner.arun(*args, **kwargs) 

76 

77 async def arun_many( 

78 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any 

79 ) -> Any: 

80 return await self._inner.arun_many( 

81 urls, 

82 config=config, 

83 dispatcher=dispatcher if dispatcher is not None else self._dispatcher, 

84 **kwargs, 

85 ) 

86 

87 

88@contextlib.asynccontextmanager 

89async def _open_crawler(*, quiet: bool = False, dispatcher: Any = None) -> AsyncIterator[Any]: 

90 """Open an AsyncWebCrawler, wrapping with the dispatcher when provided. 

91 

92 Raises :class:`CrawlerBrowserError` if the Chromium binary is missing so 

93 Playwright's ASCII install banner does not leak into the TUI. 

94 """ 

95 if not bootstrap.chromium_installed(): 

96 raise CrawlerBrowserError( 

97 "Playwright Chromium browser not installed. " 

98 "Run 'uv run playwright install chromium' to enable /crawl." 

99 ) 

100 

101 from crawl4ai import AsyncWebCrawler 

102 

103 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext() 

104 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext() 

105 with stdout_ctx, stderr_ctx: 

106 if dispatcher is not None: 

107 async with _LilbeeAsyncCrawler(verbose=not quiet, dispatcher=dispatcher) as crawler: 

108 yield crawler 

109 else: 

110 async with AsyncWebCrawler(verbose=not quiet) as crawler: 

111 yield crawler 

112 

113 

114def _safe_strategy_cancel(strategy: Any) -> None: 

115 """Call ``strategy.cancel()`` if available; swallow only the known SDK shapes. 

116 

117 Narrow catch: ``AttributeError`` covers a missing nested attribute mid-call; 

118 ``RuntimeError`` covers cancel-on-closed-strategy. Anything else propagates. 

119 """ 

120 cancel_method = getattr(strategy, "cancel", None) 

121 if callable(cancel_method): 

122 try: 

123 cancel_method() 

124 except (AttributeError, RuntimeError) as exc: 

125 log.debug("strategy.cancel() raised: %s", exc) 

126 

127 

128async def _safe_aclose(stream: Any) -> None: 

129 """Close an async generator stream; no-op for list / single-result shapes.""" 

130 if stream is None: 

131 return 

132 if inspect.isasyncgen(stream): 

133 with contextlib.suppress(Exception): 

134 await stream.aclose() 

135 

136 

137async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]: 

138 """Normalize crawl4ai's ``arun()`` return (async generator, list, or single result).""" 

139 if inspect.isasyncgen(stream): 

140 async for item in stream: 

141 yield item 

142 return 

143 # A list is the batch-mode shape; iterate and yield each item. 

144 if isinstance(stream, list): 

145 for item in stream: 

146 yield item 

147 return 

148 yield stream 

149 

150 

151def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any: 

152 """Build a URLFilter that scopes a crawl to the starting URL's host. 

153 

154 Default behavior (``include_subdomains=False``) restricts link-following to 

155 the exact host of *start_url*. For ``https://en.wikipedia.org/...`` this 

156 excludes ``af.wikipedia.org`` and every other language subdomain. 

157 

158 When ``include_subdomains=True``, crawl4ai's DomainFilter matches the host 

159 plus any of its subdomains (``foo.example.com`` matches ``example.com``), 

160 which is the loose "whole registrable domain" behavior users may want. 

161 """ 

162 from crawl4ai.deep_crawling.filters import DomainFilter, URLFilter 

163 

164 host = (urlparse(start_url).hostname or "").lower() 

165 if include_subdomains: 

166 return DomainFilter(allowed_domains=host) if host else None 

167 

168 class _ExactHostFilter(URLFilter): # type: ignore[misc] 

169 def __init__(self, allowed_host: str) -> None: 

170 super().__init__() 

171 self._host = allowed_host 

172 

173 def apply(self, url: str) -> bool: 

174 link_host = (urlparse(url).hostname or "").lower() 

175 ok = link_host == self._host 

176 self._update_stats(ok) 

177 return ok 

178 

179 return _ExactHostFilter(host) if host else None 

180 

181 

182class Crawl4aiFetcher: 

183 """:class:`WebFetcher` implementation backed by crawl4ai.""" 

184 

185 def __init__(self, *, quiet: bool = False) -> None: 

186 self._quiet = quiet 

187 

188 async def __aenter__(self) -> Crawl4aiFetcher: 

189 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because 

190 # ``fetch_recursive`` needs a per-call dispatcher (which depends on 

191 # the :class:`ConcurrencySpec` for that call). Nothing to set up here. 

192 return self 

193 

194 async def __aexit__( 

195 self, 

196 exc_type: type[BaseException] | None, 

197 exc: BaseException | None, 

198 tb: Any, 

199 ) -> None: 

200 return None 

201 

202 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage: 

203 """Fetch a single URL via crawl4ai's ``arun``.""" 

204 from crawl4ai import CrawlerRunConfig 

205 

206 config = CrawlerRunConfig(page_timeout=int(timeout * 1000)) 

207 async with _open_crawler(quiet=self._quiet) as crawler: 

208 result = await crawler.arun(url=url, config=config) 

209 markdown = (result.markdown or "").strip() 

210 if markdown: 

211 return FetchedPage(url=url, markdown=markdown, success=True) 

212 return FetchedPage( 

213 url=url, 

214 success=False, 

215 error=result.error_message or "No content extracted", 

216 ) 

217 

218 async def fetch_recursive( 

219 self, 

220 seed_url: str, 

221 *, 

222 depth: int | None, 

223 max_pages: int | None, 

224 timeout: float, 

225 concurrency: ConcurrencySpec, 

226 filters: FilterSpec, 

227 cancel: CancelToken | None = None, 

228 ) -> AsyncGenerator[FetchedPage, None]: 

229 """Stream pages discovered by crawl4ai's native BFS. 

230 

231 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter 

232 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which 

233 is the sentinel it understands. 

234 """ 

235 

236 def _should_cancel() -> bool: 

237 return cancel is not None and cancel.is_set() 

238 

239 from crawl4ai import CrawlerRunConfig 

240 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy 

241 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter 

242 

243 filter_chain_items: list[Any] = [] 

244 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains) 

245 if host_filter is not None: 

246 filter_chain_items.append(host_filter) 

247 if filters.exclude_patterns: 

248 filter_chain_items.append( 

249 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True) 

250 ) 

251 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain() 

252 

253 strategy = BFSDeepCrawlStrategy( 

254 max_depth=math.inf if depth is None else depth, 

255 max_pages=math.inf if max_pages is None else max_pages, 

256 should_cancel=_should_cancel, 

257 filter_chain=filter_chain, 

258 ) 

259 config = CrawlerRunConfig( 

260 deep_crawl_strategy=strategy, 

261 page_timeout=int(timeout * 1000), 

262 mean_delay=concurrency.mean_delay, 

263 max_range=concurrency.max_delay_range, 

264 semaphore_count=concurrency.semaphore_count, 

265 stream=True, 

266 ) 

267 

268 dispatcher = _build_rate_limited_dispatcher(concurrency) 

269 stream: Any = None 

270 strategy_cancelled = False 

271 # Exceptions propagate to the orchestration layer, which decides 

272 # whether to log cancel-teardown noise at debug vs surface a real 

273 # failure. The adapter's only housekeeping is stream close + BFS 

274 # strategy cancel so Playwright tears down in order. 

275 async with _open_crawler(quiet=self._quiet, dispatcher=dispatcher) as crawler: 

276 stream = await crawler.arun(url=seed_url, config=config) 

277 try: 

278 async for cr in _iter_crawl_stream(stream): 

279 if _should_cancel(): 

280 _safe_strategy_cancel(strategy) 

281 strategy_cancelled = True 

282 break 

283 if cr.success: 

284 yield FetchedPage(url=cr.url, markdown=cr.markdown or "") 

285 else: 

286 yield FetchedPage( 

287 url=cr.url, 

288 success=False, 

289 error=cr.error_message or "Unknown error", 

290 ) 

291 finally: 

292 # If the consumer breaks out before we saw a cancel, still 

293 # short-circuit the BFS strategy so any in-flight arun_many 

294 # batch stops dispatching. Mirrors the orchestrator's 

295 # previous "hard cap on visible counter" behavior now that 

296 # the strategy object lives inside the adapter. 

297 if not strategy_cancelled: 

298 _safe_strategy_cancel(strategy) 

299 # Close the async generator (if it is one) before the 

300 # crawler context exits, so Playwright tears down 

301 # in-flight URLs in order. Skipping this is what produced 

302 # the "BrowserContext.new_page: Connection closed" spam 

303 # on cancel. 

304 await _safe_aclose(stream) 

305 

306 

307# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher. 

308# We don't instantiate at import time so the check stays purely structural. 

309if TYPE_CHECKING: 

310 _: WebFetcher = Crawl4aiFetcher() 

311 

312 

313@functools.cache 

314def crawler_available() -> bool: 

315 """Check if the crawl4ai backend is importable (i.e. the extra is installed). 

316 

317 Uses ``importlib.util.find_spec`` rather than ``import crawl4ai`` so the 

318 check stays fast on the UI thread. ``crawl4ai`` is in AGENTS.md's 

319 known-heavy-imports list; executing it on Windows with Defender 

320 real-time scanning takes seconds, and the Settings screen's feature- 

321 gate call (``_FEATURE_GATED_GROUPS``) hits it synchronously during 

322 ``compose``. ``find_spec`` just walks ``sys.path`` to locate the 

323 package; the actual import runs later from the crawler bootstrap 

324 where the cost is expected. 

325 """ 

326 import importlib.util 

327 

328 return importlib.util.find_spec("crawl4ai") is not None