Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%
159 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`."""
3from __future__ import annotations
5import contextlib
6import functools
7import inspect
8import io
9import logging
10import math
11from collections.abc import AsyncGenerator, AsyncIterator
12from typing import TYPE_CHECKING, Any
13from urllib.parse import urlparse
15from lilbee.core.config import cfg
16from lilbee.core.config.enums import CrawlRenderMode
17from lilbee.crawler import bootstrap
18from lilbee.crawler.bootstrap import CrawlerBrowserError
19from lilbee.crawler.models import (
20 CancelToken,
21 ConcurrencySpec,
22 FetchedPage,
23 FilterSpec,
24)
25from lilbee.crawler.url_filter import validate_crawl_url
27if TYPE_CHECKING:
28 from lilbee.crawler.fetcher import WebFetcher
30log = logging.getLogger(__name__)
33def _build_inner_crawler(*, verbose: bool, render_mode: CrawlRenderMode) -> Any:
34 """Construct a crawl4ai ``AsyncWebCrawler`` for the requested render mode.
36 HTTP mode swaps in the browserless HTTP strategy; browser mode tunes
37 Chromium for memory (light/text/memory-saving + periodic process recycle),
38 reading the recycle threshold and launch flags from config.
39 """
40 from crawl4ai import AsyncWebCrawler
42 if render_mode is CrawlRenderMode.HTTP:
43 from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
45 return AsyncWebCrawler(crawler_strategy=AsyncHTTPCrawlerStrategy(), verbose=verbose)
47 from crawl4ai import BrowserConfig
49 config = BrowserConfig(
50 light_mode=True,
51 text_mode=True,
52 memory_saving_mode=True,
53 max_pages_before_recycle=cfg.crawl_browser_recycle_pages,
54 extra_args=list(cfg.crawl_browser_extra_args),
55 verbose=verbose,
56 )
57 return AsyncWebCrawler(config=config, verbose=verbose)
60def _build_rate_limited_dispatcher(
61 concurrency: ConcurrencySpec, render_mode: CrawlRenderMode
62) -> Any:
63 """Build the recursive-crawl dispatcher from a ConcurrencySpec, or None.
65 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher
66 kwarg, so per-domain rate limiting is only reachable by threading a
67 dispatcher through AsyncWebCrawler itself. Browser mode uses a
68 MemoryAdaptiveDispatcher so a crawl backs off when system memory is tight
69 rather than steamrolling the machine; HTTP mode is light enough to stay on
70 the plain semaphore path.
71 """
72 if not concurrency.retry_on_rate_limit:
73 return None
74 from crawl4ai.async_dispatcher import RateLimiter
76 rate_limiter = RateLimiter(
77 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max),
78 max_delay=concurrency.retry_max_backoff,
79 max_retries=concurrency.retry_max_attempts,
80 )
81 if render_mode is CrawlRenderMode.BROWSER:
82 from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
84 return MemoryAdaptiveDispatcher(
85 max_session_permit=concurrency.semaphore_count,
86 rate_limiter=rate_limiter,
87 )
88 from crawl4ai.async_dispatcher import SemaphoreDispatcher
90 return SemaphoreDispatcher(
91 semaphore_count=concurrency.semaphore_count,
92 rate_limiter=rate_limiter,
93 )
96class _LilbeeAsyncCrawler:
97 """AsyncWebCrawler wrapper that injects a default dispatcher on ``arun_many``.
99 BFSDeepCrawlStrategy calls ``arun_many`` without a dispatcher kwarg, so the
100 wrapper supplies one to make rate limiting and 429/503 retries reachable.
101 An explicit ``dispatcher=`` on the call still wins.
102 """
104 def __init__(self, inner: Any, *, dispatcher: Any) -> None:
105 self._inner = inner
106 self._dispatcher = dispatcher
108 async def __aenter__(self) -> _LilbeeAsyncCrawler:
109 await self._inner.__aenter__()
110 return self
112 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any:
113 return await self._inner.__aexit__(exc_type, exc, tb)
115 async def arun(self, *args: Any, **kwargs: Any) -> Any:
116 return await self._inner.arun(*args, **kwargs)
118 async def arun_many(
119 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any
120 ) -> Any:
121 return await self._inner.arun_many(
122 urls,
123 config=config,
124 dispatcher=dispatcher if dispatcher is not None else self._dispatcher,
125 **kwargs,
126 )
129@contextlib.asynccontextmanager
130async def _open_crawler(
131 *, quiet: bool = False, render_mode: CrawlRenderMode, dispatcher: Any = None
132) -> AsyncIterator[Any]:
133 """Open an AsyncWebCrawler for ``render_mode``, wrapping with the dispatcher.
135 Browser mode requires the Chromium binary and raises
136 :class:`CrawlerBrowserError` if it is missing, so Playwright's ASCII install
137 banner does not leak into the TUI. HTTP mode needs no browser at all.
138 """
139 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed():
140 raise CrawlerBrowserError(
141 "Playwright Chromium browser not installed. "
142 "Run 'uv run playwright install chromium' to enable browser-mode crawling."
143 )
145 inner = _build_inner_crawler(verbose=not quiet, render_mode=render_mode)
147 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext()
148 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext()
149 with stdout_ctx, stderr_ctx:
150 if dispatcher is not None:
151 async with _LilbeeAsyncCrawler(inner, dispatcher=dispatcher) as crawler:
152 yield crawler
153 else:
154 async with inner as crawler:
155 yield crawler
158def _safe_strategy_cancel(strategy: Any) -> None:
159 """Call ``strategy.cancel()`` if available; swallow only the known SDK shapes.
161 Narrow catch: ``AttributeError`` covers a missing nested attribute mid-call;
162 ``RuntimeError`` covers cancel-on-closed-strategy. Anything else propagates.
163 """
164 cancel_method = getattr(strategy, "cancel", None)
165 if callable(cancel_method):
166 try:
167 cancel_method()
168 except (AttributeError, RuntimeError) as exc:
169 log.debug("strategy.cancel() raised: %s", exc)
172async def _safe_aclose(stream: Any) -> None:
173 """Close an async generator stream; no-op for list / single-result shapes."""
174 if stream is None:
175 return
176 if inspect.isasyncgen(stream):
177 with contextlib.suppress(Exception):
178 await stream.aclose()
181async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]:
182 """Normalize crawl4ai's ``arun()`` return (async generator, list, or single result)."""
183 if inspect.isasyncgen(stream):
184 async for item in stream:
185 yield item
186 return
187 # A list is the batch-mode shape; iterate and yield each item.
188 if isinstance(stream, list):
189 for item in stream:
190 yield item
191 return
192 yield stream
195def _link_passes_ssrf(url: str) -> bool:
196 """Return True when a discovered link resolves to a public, http(s) target.
198 Re-validates every followed link against the IP blocklist so a discovered
199 or DNS-rebound link to a private/metadata host is dropped before fetch.
200 """
201 try:
202 validate_crawl_url(url)
203 except ValueError:
204 return False
205 return True
208def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any:
209 """Build a URLFilter that scopes a crawl to the starting URL's host.
211 Default behavior (``include_subdomains=False``) restricts link-following to
212 the exact host of *start_url*. When ``include_subdomains=True`` the host
213 plus any subdomain is in scope. Either way every followed link is also
214 re-validated against the SSRF blocklist, since the host scope check alone
215 would let a same-host link that resolves to a private IP through.
216 """
217 from crawl4ai.deep_crawling.filters import URLFilter
219 host = (urlparse(start_url).hostname or "").lower()
220 if not host:
221 return None
223 def _in_scope(link_host: str) -> bool:
224 if link_host == host:
225 return True
226 return include_subdomains and link_host.endswith(f".{host}")
228 class _ScopedSsrfFilter(URLFilter): # type: ignore[misc]
229 def apply(self, url: str) -> bool:
230 link_host = (urlparse(url).hostname or "").lower()
231 ok = _in_scope(link_host) and _link_passes_ssrf(url)
232 self._update_stats(ok)
233 return ok
235 return _ScopedSsrfFilter()
238class Crawl4aiFetcher:
239 """:class:`WebFetcher` implementation backed by crawl4ai."""
241 def __init__(self, *, quiet: bool = False, render_mode: CrawlRenderMode) -> None:
242 self._quiet = quiet
243 self._render_mode = render_mode
245 async def __aenter__(self) -> Crawl4aiFetcher:
246 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because
247 # ``fetch_recursive`` needs a per-call dispatcher (which depends on
248 # the :class:`ConcurrencySpec` for that call). Nothing to set up here.
249 return self
251 async def __aexit__(
252 self,
253 exc_type: type[BaseException] | None,
254 exc: BaseException | None,
255 tb: Any,
256 ) -> None:
257 return None
259 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage:
260 """Fetch a single URL via crawl4ai's ``arun``."""
261 from crawl4ai import CrawlerRunConfig
263 config = CrawlerRunConfig(page_timeout=int(timeout * 1000))
264 async with _open_crawler(quiet=self._quiet, render_mode=self._render_mode) as crawler:
265 result = await crawler.arun(url=url, config=config)
266 markdown = (result.markdown or "").strip()
267 if markdown:
268 return FetchedPage(url=url, markdown=markdown, success=True)
269 return FetchedPage(
270 url=url,
271 success=False,
272 error=result.error_message or "No content extracted",
273 )
275 async def fetch_recursive(
276 self,
277 seed_url: str,
278 *,
279 depth: int | None,
280 max_pages: int | None,
281 timeout: float,
282 concurrency: ConcurrencySpec,
283 filters: FilterSpec,
284 cancel: CancelToken | None = None,
285 ) -> AsyncGenerator[FetchedPage, None]:
286 """Stream pages discovered by crawl4ai's native BFS.
288 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter
289 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which
290 is the sentinel it understands.
291 """
293 def _should_cancel() -> bool:
294 return cancel is not None and cancel.is_set()
296 from crawl4ai import CrawlerRunConfig
297 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
298 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter
300 filter_chain_items: list[Any] = []
301 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains)
302 if host_filter is not None:
303 filter_chain_items.append(host_filter)
304 if filters.exclude_patterns:
305 filter_chain_items.append(
306 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True)
307 )
308 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain()
310 strategy = BFSDeepCrawlStrategy(
311 max_depth=math.inf if depth is None else depth,
312 max_pages=math.inf if max_pages is None else max_pages,
313 should_cancel=_should_cancel,
314 filter_chain=filter_chain,
315 )
316 config = CrawlerRunConfig(
317 deep_crawl_strategy=strategy,
318 page_timeout=int(timeout * 1000),
319 mean_delay=concurrency.mean_delay,
320 max_range=concurrency.max_delay_range,
321 semaphore_count=concurrency.semaphore_count,
322 stream=True,
323 )
325 dispatcher = _build_rate_limited_dispatcher(concurrency, self._render_mode)
326 stream: Any = None
327 strategy_cancelled = False
328 # Exceptions propagate to the orchestration layer, which decides
329 # whether to log cancel-teardown noise at debug vs surface a real
330 # failure. The adapter's only housekeeping is stream close + BFS
331 # strategy cancel so Playwright tears down in order.
332 async with _open_crawler(
333 quiet=self._quiet, render_mode=self._render_mode, dispatcher=dispatcher
334 ) as crawler:
335 stream = await crawler.arun(url=seed_url, config=config)
336 try:
337 async for cr in _iter_crawl_stream(stream):
338 if _should_cancel():
339 _safe_strategy_cancel(strategy)
340 strategy_cancelled = True
341 break
342 if cr.success:
343 yield FetchedPage(url=cr.url, markdown=cr.markdown or "")
344 else:
345 yield FetchedPage(
346 url=cr.url,
347 success=False,
348 error=cr.error_message or "Unknown error",
349 )
350 finally:
351 # If the consumer breaks out before we saw a cancel, still
352 # short-circuit the BFS strategy so any in-flight arun_many
353 # batch stops dispatching. Mirrors the orchestrator's
354 # previous "hard cap on visible counter" behavior now that
355 # the strategy object lives inside the adapter.
356 if not strategy_cancelled:
357 _safe_strategy_cancel(strategy)
358 # Close the async generator (if it is one) before the
359 # crawler context exits, so Playwright tears down
360 # in-flight URLs in order. Skipping this is what produced
361 # the "BrowserContext.new_page: Connection closed" spam
362 # on cancel.
363 await _safe_aclose(stream)
366# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher.
367# We don't instantiate at import time so the check stays purely structural.
368if TYPE_CHECKING:
369 _: WebFetcher = Crawl4aiFetcher(render_mode=CrawlRenderMode.HTTP)
372@functools.cache
373def crawler_available() -> bool:
374 """Check if the crawl4ai backend is importable (i.e. the extra is installed).
376 Uses ``importlib.util.find_spec`` rather than ``import crawl4ai`` so the
377 check stays fast on the UI thread. ``crawl4ai`` is in AGENTS.md's
378 known-heavy-imports list; executing it on Windows with Defender
379 real-time scanning takes seconds, and the Settings screen's feature-
380 gate call (``_FEATURE_GATED_GROUPS``) hits it synchronously during
381 ``compose``. ``find_spec`` just walks ``sys.path`` to locate the
382 package; the actual import runs later from the crawler bootstrap
383 where the cost is expected.
384 """
385 import importlib.util
387 return importlib.util.find_spec("crawl4ai") is not None