Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import functools 

7import inspect 

8import io 

9import logging 

10import math 

11from collections.abc import AsyncGenerator, AsyncIterator 

12from typing import TYPE_CHECKING, Any 

13from urllib.parse import urlparse 

14 

15from lilbee.core.config import cfg 

16from lilbee.core.config.enums import CrawlRenderMode 

17from lilbee.crawler import bootstrap 

18from lilbee.crawler.bootstrap import CrawlerBrowserError 

19from lilbee.crawler.models import ( 

20 CancelToken, 

21 ConcurrencySpec, 

22 FetchedPage, 

23 FilterSpec, 

24) 

25from lilbee.crawler.url_filter import validate_crawl_url 

26 

27if TYPE_CHECKING: 

28 from lilbee.crawler.fetcher import WebFetcher 

29 

30log = logging.getLogger(__name__) 

31 

32 

33def _build_inner_crawler(*, verbose: bool, render_mode: CrawlRenderMode) -> Any: 

34 """Construct a crawl4ai ``AsyncWebCrawler`` for the requested render mode. 

35 

36 HTTP mode swaps in the browserless HTTP strategy; browser mode tunes 

37 Chromium for memory (light/text/memory-saving + periodic process recycle), 

38 reading the recycle threshold and launch flags from config. 

39 """ 

40 from crawl4ai import AsyncWebCrawler 

41 

42 if render_mode is CrawlRenderMode.HTTP: 

43 from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy 

44 

45 return AsyncWebCrawler(crawler_strategy=AsyncHTTPCrawlerStrategy(), verbose=verbose) 

46 

47 from crawl4ai import BrowserConfig 

48 

49 config = BrowserConfig( 

50 light_mode=True, 

51 text_mode=True, 

52 memory_saving_mode=True, 

53 max_pages_before_recycle=cfg.crawl_browser_recycle_pages, 

54 extra_args=list(cfg.crawl_browser_extra_args), 

55 verbose=verbose, 

56 ) 

57 return AsyncWebCrawler(config=config, verbose=verbose) 

58 

59 

60def _build_rate_limited_dispatcher( 

61 concurrency: ConcurrencySpec, render_mode: CrawlRenderMode 

62) -> Any: 

63 """Build the recursive-crawl dispatcher from a ConcurrencySpec, or None. 

64 

65 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher 

66 kwarg, so per-domain rate limiting is only reachable by threading a 

67 dispatcher through AsyncWebCrawler itself. Browser mode uses a 

68 MemoryAdaptiveDispatcher so a crawl backs off when system memory is tight 

69 rather than steamrolling the machine; HTTP mode is light enough to stay on 

70 the plain semaphore path. 

71 """ 

72 if not concurrency.retry_on_rate_limit: 

73 return None 

74 from crawl4ai.async_dispatcher import RateLimiter 

75 

76 rate_limiter = RateLimiter( 

77 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max), 

78 max_delay=concurrency.retry_max_backoff, 

79 max_retries=concurrency.retry_max_attempts, 

80 ) 

81 if render_mode is CrawlRenderMode.BROWSER: 

82 from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher 

83 

84 return MemoryAdaptiveDispatcher( 

85 max_session_permit=concurrency.semaphore_count, 

86 rate_limiter=rate_limiter, 

87 ) 

88 from crawl4ai.async_dispatcher import SemaphoreDispatcher 

89 

90 return SemaphoreDispatcher( 

91 semaphore_count=concurrency.semaphore_count, 

92 rate_limiter=rate_limiter, 

93 ) 

94 

95 

96class _LilbeeAsyncCrawler: 

97 """AsyncWebCrawler wrapper that injects a default dispatcher on ``arun_many``. 

98 

99 BFSDeepCrawlStrategy calls ``arun_many`` without a dispatcher kwarg, so the 

100 wrapper supplies one to make rate limiting and 429/503 retries reachable. 

101 An explicit ``dispatcher=`` on the call still wins. 

102 """ 

103 

104 def __init__(self, inner: Any, *, dispatcher: Any) -> None: 

105 self._inner = inner 

106 self._dispatcher = dispatcher 

107 

108 async def __aenter__(self) -> _LilbeeAsyncCrawler: 

109 await self._inner.__aenter__() 

110 return self 

111 

112 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any: 

113 return await self._inner.__aexit__(exc_type, exc, tb) 

114 

115 async def arun(self, *args: Any, **kwargs: Any) -> Any: 

116 return await self._inner.arun(*args, **kwargs) 

117 

118 async def arun_many( 

119 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any 

120 ) -> Any: 

121 return await self._inner.arun_many( 

122 urls, 

123 config=config, 

124 dispatcher=dispatcher if dispatcher is not None else self._dispatcher, 

125 **kwargs, 

126 ) 

127 

128 

129@contextlib.asynccontextmanager 

130async def _open_crawler( 

131 *, quiet: bool = False, render_mode: CrawlRenderMode, dispatcher: Any = None 

132) -> AsyncIterator[Any]: 

133 """Open an AsyncWebCrawler for ``render_mode``, wrapping with the dispatcher. 

134 

135 Browser mode requires the Chromium binary and raises 

136 :class:`CrawlerBrowserError` if it is missing, so Playwright's ASCII install 

137 banner does not leak into the TUI. HTTP mode needs no browser at all. 

138 """ 

139 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed(): 

140 raise CrawlerBrowserError( 

141 "Playwright Chromium browser not installed. " 

142 "Run 'uv run playwright install chromium' to enable browser-mode crawling." 

143 ) 

144 

145 inner = _build_inner_crawler(verbose=not quiet, render_mode=render_mode) 

146 

147 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext() 

148 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext() 

149 with stdout_ctx, stderr_ctx: 

150 if dispatcher is not None: 

151 async with _LilbeeAsyncCrawler(inner, dispatcher=dispatcher) as crawler: 

152 yield crawler 

153 else: 

154 async with inner as crawler: 

155 yield crawler 

156 

157 

158def _safe_strategy_cancel(strategy: Any) -> None: 

159 """Call ``strategy.cancel()`` if available; swallow only the known SDK shapes. 

160 

161 Narrow catch: ``AttributeError`` covers a missing nested attribute mid-call; 

162 ``RuntimeError`` covers cancel-on-closed-strategy. Anything else propagates. 

163 """ 

164 cancel_method = getattr(strategy, "cancel", None) 

165 if callable(cancel_method): 

166 try: 

167 cancel_method() 

168 except (AttributeError, RuntimeError) as exc: 

169 log.debug("strategy.cancel() raised: %s", exc) 

170 

171 

172async def _safe_aclose(stream: Any) -> None: 

173 """Close an async generator stream; no-op for list / single-result shapes.""" 

174 if stream is None: 

175 return 

176 if inspect.isasyncgen(stream): 

177 with contextlib.suppress(Exception): 

178 await stream.aclose() 

179 

180 

181async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]: 

182 """Normalize crawl4ai's ``arun()`` return (async generator, list, or single result).""" 

183 if inspect.isasyncgen(stream): 

184 async for item in stream: 

185 yield item 

186 return 

187 # A list is the batch-mode shape; iterate and yield each item. 

188 if isinstance(stream, list): 

189 for item in stream: 

190 yield item 

191 return 

192 yield stream 

193 

194 

195def _link_passes_ssrf(url: str) -> bool: 

196 """Return True when a discovered link resolves to a public, http(s) target. 

197 

198 Re-validates every followed link against the IP blocklist so a discovered 

199 or DNS-rebound link to a private/metadata host is dropped before fetch. 

200 """ 

201 try: 

202 validate_crawl_url(url) 

203 except ValueError: 

204 return False 

205 return True 

206 

207 

208def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any: 

209 """Build a URLFilter that scopes a crawl to the starting URL's host. 

210 

211 Default behavior (``include_subdomains=False``) restricts link-following to 

212 the exact host of *start_url*. When ``include_subdomains=True`` the host 

213 plus any subdomain is in scope. Either way every followed link is also 

214 re-validated against the SSRF blocklist, since the host scope check alone 

215 would let a same-host link that resolves to a private IP through. 

216 """ 

217 from crawl4ai.deep_crawling.filters import URLFilter 

218 

219 host = (urlparse(start_url).hostname or "").lower() 

220 if not host: 

221 return None 

222 

223 def _in_scope(link_host: str) -> bool: 

224 if link_host == host: 

225 return True 

226 return include_subdomains and link_host.endswith(f".{host}") 

227 

228 class _ScopedSsrfFilter(URLFilter): # type: ignore[misc] 

229 def apply(self, url: str) -> bool: 

230 link_host = (urlparse(url).hostname or "").lower() 

231 ok = _in_scope(link_host) and _link_passes_ssrf(url) 

232 self._update_stats(ok) 

233 return ok 

234 

235 return _ScopedSsrfFilter() 

236 

237 

238class Crawl4aiFetcher: 

239 """:class:`WebFetcher` implementation backed by crawl4ai.""" 

240 

241 def __init__(self, *, quiet: bool = False, render_mode: CrawlRenderMode) -> None: 

242 self._quiet = quiet 

243 self._render_mode = render_mode 

244 

245 async def __aenter__(self) -> Crawl4aiFetcher: 

246 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because 

247 # ``fetch_recursive`` needs a per-call dispatcher (which depends on 

248 # the :class:`ConcurrencySpec` for that call). Nothing to set up here. 

249 return self 

250 

251 async def __aexit__( 

252 self, 

253 exc_type: type[BaseException] | None, 

254 exc: BaseException | None, 

255 tb: Any, 

256 ) -> None: 

257 return None 

258 

259 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage: 

260 """Fetch a single URL via crawl4ai's ``arun``.""" 

261 from crawl4ai import CrawlerRunConfig 

262 

263 config = CrawlerRunConfig(page_timeout=int(timeout * 1000)) 

264 async with _open_crawler(quiet=self._quiet, render_mode=self._render_mode) as crawler: 

265 result = await crawler.arun(url=url, config=config) 

266 markdown = (result.markdown or "").strip() 

267 if markdown: 

268 return FetchedPage(url=url, markdown=markdown, success=True) 

269 return FetchedPage( 

270 url=url, 

271 success=False, 

272 error=result.error_message or "No content extracted", 

273 ) 

274 

275 async def fetch_recursive( 

276 self, 

277 seed_url: str, 

278 *, 

279 depth: int | None, 

280 max_pages: int | None, 

281 timeout: float, 

282 concurrency: ConcurrencySpec, 

283 filters: FilterSpec, 

284 cancel: CancelToken | None = None, 

285 ) -> AsyncGenerator[FetchedPage, None]: 

286 """Stream pages discovered by crawl4ai's native BFS. 

287 

288 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter 

289 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which 

290 is the sentinel it understands. 

291 """ 

292 

293 def _should_cancel() -> bool: 

294 return cancel is not None and cancel.is_set() 

295 

296 from crawl4ai import CrawlerRunConfig 

297 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy 

298 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter 

299 

300 filter_chain_items: list[Any] = [] 

301 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains) 

302 if host_filter is not None: 

303 filter_chain_items.append(host_filter) 

304 if filters.exclude_patterns: 

305 filter_chain_items.append( 

306 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True) 

307 ) 

308 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain() 

309 

310 strategy = BFSDeepCrawlStrategy( 

311 max_depth=math.inf if depth is None else depth, 

312 max_pages=math.inf if max_pages is None else max_pages, 

313 should_cancel=_should_cancel, 

314 filter_chain=filter_chain, 

315 ) 

316 config = CrawlerRunConfig( 

317 deep_crawl_strategy=strategy, 

318 page_timeout=int(timeout * 1000), 

319 mean_delay=concurrency.mean_delay, 

320 max_range=concurrency.max_delay_range, 

321 semaphore_count=concurrency.semaphore_count, 

322 stream=True, 

323 ) 

324 

325 dispatcher = _build_rate_limited_dispatcher(concurrency, self._render_mode) 

326 stream: Any = None 

327 strategy_cancelled = False 

328 # Exceptions propagate to the orchestration layer, which decides 

329 # whether to log cancel-teardown noise at debug vs surface a real 

330 # failure. The adapter's only housekeeping is stream close + BFS 

331 # strategy cancel so Playwright tears down in order. 

332 async with _open_crawler( 

333 quiet=self._quiet, render_mode=self._render_mode, dispatcher=dispatcher 

334 ) as crawler: 

335 stream = await crawler.arun(url=seed_url, config=config) 

336 try: 

337 async for cr in _iter_crawl_stream(stream): 

338 if _should_cancel(): 

339 _safe_strategy_cancel(strategy) 

340 strategy_cancelled = True 

341 break 

342 if cr.success: 

343 yield FetchedPage(url=cr.url, markdown=cr.markdown or "") 

344 else: 

345 yield FetchedPage( 

346 url=cr.url, 

347 success=False, 

348 error=cr.error_message or "Unknown error", 

349 ) 

350 finally: 

351 # If the consumer breaks out before we saw a cancel, still 

352 # short-circuit the BFS strategy so any in-flight arun_many 

353 # batch stops dispatching. Mirrors the orchestrator's 

354 # previous "hard cap on visible counter" behavior now that 

355 # the strategy object lives inside the adapter. 

356 if not strategy_cancelled: 

357 _safe_strategy_cancel(strategy) 

358 # Close the async generator (if it is one) before the 

359 # crawler context exits, so Playwright tears down 

360 # in-flight URLs in order. Skipping this is what produced 

361 # the "BrowserContext.new_page: Connection closed" spam 

362 # on cancel. 

363 await _safe_aclose(stream) 

364 

365 

366# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher. 

367# We don't instantiate at import time so the check stays purely structural. 

368if TYPE_CHECKING: 

369 _: WebFetcher = Crawl4aiFetcher(render_mode=CrawlRenderMode.HTTP) 

370 

371 

372@functools.cache 

373def crawler_available() -> bool: 

374 """Check if the crawl4ai backend is importable (i.e. the extra is installed). 

375 

376 Uses ``importlib.util.find_spec`` rather than ``import crawl4ai`` so the 

377 check stays fast on the UI thread. ``crawl4ai`` is in AGENTS.md's 

378 known-heavy-imports list; executing it on Windows with Defender 

379 real-time scanning takes seconds, and the Settings screen's feature- 

380 gate call (``_FEATURE_GATED_GROUPS``) hits it synchronously during 

381 ``compose``. ``find_spec`` just walks ``sys.path`` to locate the 

382 package; the actual import runs later from the crawler bootstrap 

383 where the cost is expected. 

384 """ 

385 import importlib.util 

386 

387 return importlib.util.find_spec("crawl4ai") is not None