Coverage for src / lilbee / crawler / crawl4ai_fetcher.py: 100%
137 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""crawl4ai-backed implementation of :class:`lilbee.crawler.fetcher.WebFetcher`."""
3from __future__ import annotations
5import contextlib
6import functools
7import inspect
8import io
9import logging
10import math
11from collections.abc import AsyncGenerator, AsyncIterator
12from typing import TYPE_CHECKING, Any
13from urllib.parse import urlparse
15from lilbee.crawler import bootstrap
16from lilbee.crawler.bootstrap import CrawlerBrowserError
17from lilbee.crawler.models import (
18 CancelToken,
19 ConcurrencySpec,
20 FetchedPage,
21 FilterSpec,
22)
24if TYPE_CHECKING:
25 from lilbee.crawler.fetcher import WebFetcher
27log = logging.getLogger(__name__)
30def _build_rate_limited_dispatcher(concurrency: ConcurrencySpec) -> Any:
31 """Build a SemaphoreDispatcher + RateLimiter from a ConcurrencySpec, or None.
33 BFSDeepCrawlStrategy calls ``crawler.arun_many()`` without a dispatcher
34 kwarg, so per-domain rate limiting is only reachable by threading a
35 dispatcher through AsyncWebCrawler itself. This helper centralizes the
36 spec read so the TUI / CLI / server all get identical behavior.
37 """
38 if not concurrency.retry_on_rate_limit:
39 return None
40 from crawl4ai.async_dispatcher import RateLimiter, SemaphoreDispatcher
42 rate_limiter = RateLimiter(
43 base_delay=(concurrency.retry_base_delay_min, concurrency.retry_base_delay_max),
44 max_delay=concurrency.retry_max_backoff,
45 max_retries=concurrency.retry_max_attempts,
46 )
47 return SemaphoreDispatcher(
48 semaphore_count=concurrency.semaphore_count,
49 rate_limiter=rate_limiter,
50 )
53class _LilbeeAsyncCrawler:
54 """AsyncWebCrawler wrapper that injects a default dispatcher on ``arun_many``.
56 BFSDeepCrawlStrategy calls ``arun_many`` without a dispatcher kwarg, so the
57 wrapper supplies one to make rate limiting and 429/503 retries reachable.
58 An explicit ``dispatcher=`` on the call still wins.
59 """
61 def __init__(self, *, verbose: bool, dispatcher: Any) -> None:
62 from crawl4ai import AsyncWebCrawler
64 self._inner = AsyncWebCrawler(verbose=verbose)
65 self._dispatcher = dispatcher
67 async def __aenter__(self) -> _LilbeeAsyncCrawler:
68 await self._inner.__aenter__()
69 return self
71 async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any:
72 return await self._inner.__aexit__(exc_type, exc, tb)
74 async def arun(self, *args: Any, **kwargs: Any) -> Any:
75 return await self._inner.arun(*args, **kwargs)
77 async def arun_many(
78 self, urls: Any, config: Any = None, dispatcher: Any = None, **kwargs: Any
79 ) -> Any:
80 return await self._inner.arun_many(
81 urls,
82 config=config,
83 dispatcher=dispatcher if dispatcher is not None else self._dispatcher,
84 **kwargs,
85 )
88@contextlib.asynccontextmanager
89async def _open_crawler(*, quiet: bool = False, dispatcher: Any = None) -> AsyncIterator[Any]:
90 """Open an AsyncWebCrawler, wrapping with the dispatcher when provided.
92 Raises :class:`CrawlerBrowserError` if the Chromium binary is missing so
93 Playwright's ASCII install banner does not leak into the TUI.
94 """
95 if not bootstrap.chromium_installed():
96 raise CrawlerBrowserError(
97 "Playwright Chromium browser not installed. "
98 "Run 'uv run playwright install chromium' to enable /crawl."
99 )
101 from crawl4ai import AsyncWebCrawler
103 stdout_ctx = contextlib.redirect_stdout(io.StringIO()) if quiet else contextlib.nullcontext()
104 stderr_ctx = contextlib.redirect_stderr(io.StringIO()) if quiet else contextlib.nullcontext()
105 with stdout_ctx, stderr_ctx:
106 if dispatcher is not None:
107 async with _LilbeeAsyncCrawler(verbose=not quiet, dispatcher=dispatcher) as crawler:
108 yield crawler
109 else:
110 async with AsyncWebCrawler(verbose=not quiet) as crawler:
111 yield crawler
114def _safe_strategy_cancel(strategy: Any) -> None:
115 """Call ``strategy.cancel()`` if available; swallow only the known SDK shapes.
117 Narrow catch: ``AttributeError`` covers a missing nested attribute mid-call;
118 ``RuntimeError`` covers cancel-on-closed-strategy. Anything else propagates.
119 """
120 cancel_method = getattr(strategy, "cancel", None)
121 if callable(cancel_method):
122 try:
123 cancel_method()
124 except (AttributeError, RuntimeError) as exc:
125 log.debug("strategy.cancel() raised: %s", exc)
128async def _safe_aclose(stream: Any) -> None:
129 """Close an async generator stream; no-op for list / single-result shapes."""
130 if stream is None:
131 return
132 if inspect.isasyncgen(stream):
133 with contextlib.suppress(Exception):
134 await stream.aclose()
137async def _iter_crawl_stream(stream: Any) -> AsyncIterator[Any]:
138 """Normalize crawl4ai's ``arun()`` return (async generator, list, or single result)."""
139 if inspect.isasyncgen(stream):
140 async for item in stream:
141 yield item
142 return
143 # A list is the batch-mode shape; iterate and yield each item.
144 if isinstance(stream, list):
145 for item in stream:
146 yield item
147 return
148 yield stream
151def _host_scope_filter(start_url: str, *, include_subdomains: bool) -> Any:
152 """Build a URLFilter that scopes a crawl to the starting URL's host.
154 Default behavior (``include_subdomains=False``) restricts link-following to
155 the exact host of *start_url*. For ``https://en.wikipedia.org/...`` this
156 excludes ``af.wikipedia.org`` and every other language subdomain.
158 When ``include_subdomains=True``, crawl4ai's DomainFilter matches the host
159 plus any of its subdomains (``foo.example.com`` matches ``example.com``),
160 which is the loose "whole registrable domain" behavior users may want.
161 """
162 from crawl4ai.deep_crawling.filters import DomainFilter, URLFilter
164 host = (urlparse(start_url).hostname or "").lower()
165 if include_subdomains:
166 return DomainFilter(allowed_domains=host) if host else None
168 class _ExactHostFilter(URLFilter): # type: ignore[misc]
169 def __init__(self, allowed_host: str) -> None:
170 super().__init__()
171 self._host = allowed_host
173 def apply(self, url: str) -> bool:
174 link_host = (urlparse(url).hostname or "").lower()
175 ok = link_host == self._host
176 self._update_stats(ok)
177 return ok
179 return _ExactHostFilter(host) if host else None
182class Crawl4aiFetcher:
183 """:class:`WebFetcher` implementation backed by crawl4ai."""
185 def __init__(self, *, quiet: bool = False) -> None:
186 self._quiet = quiet
188 async def __aenter__(self) -> Crawl4aiFetcher:
189 # Crawl4ai opens a fresh ``AsyncWebCrawler`` per operation because
190 # ``fetch_recursive`` needs a per-call dispatcher (which depends on
191 # the :class:`ConcurrencySpec` for that call). Nothing to set up here.
192 return self
194 async def __aexit__(
195 self,
196 exc_type: type[BaseException] | None,
197 exc: BaseException | None,
198 tb: Any,
199 ) -> None:
200 return None
202 async def fetch_single(self, url: str, *, timeout: float) -> FetchedPage:
203 """Fetch a single URL via crawl4ai's ``arun``."""
204 from crawl4ai import CrawlerRunConfig
206 config = CrawlerRunConfig(page_timeout=int(timeout * 1000))
207 async with _open_crawler(quiet=self._quiet) as crawler:
208 result = await crawler.arun(url=url, config=config)
209 markdown = (result.markdown or "").strip()
210 if markdown:
211 return FetchedPage(url=url, markdown=markdown, success=True)
212 return FetchedPage(
213 url=url,
214 success=False,
215 error=result.error_message or "No content extracted",
216 )
218 async def fetch_recursive(
219 self,
220 seed_url: str,
221 *,
222 depth: int | None,
223 max_pages: int | None,
224 timeout: float,
225 concurrency: ConcurrencySpec,
226 filters: FilterSpec,
227 cancel: CancelToken | None = None,
228 ) -> AsyncGenerator[FetchedPage, None]:
229 """Stream pages discovered by crawl4ai's native BFS.
231 ``depth`` / ``max_pages`` of ``None`` mean unbounded; the adapter
232 translates to ``math.inf`` for crawl4ai's BFSDeepCrawlStrategy, which
233 is the sentinel it understands.
234 """
236 def _should_cancel() -> bool:
237 return cancel is not None and cancel.is_set()
239 from crawl4ai import CrawlerRunConfig
240 from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
241 from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter
243 filter_chain_items: list[Any] = []
244 host_filter = _host_scope_filter(seed_url, include_subdomains=filters.include_subdomains)
245 if host_filter is not None:
246 filter_chain_items.append(host_filter)
247 if filters.exclude_patterns:
248 filter_chain_items.append(
249 URLPatternFilter(filters.exclude_patterns, use_glob=False, reverse=True)
250 )
251 filter_chain = FilterChain(filter_chain_items) if filter_chain_items else FilterChain()
253 strategy = BFSDeepCrawlStrategy(
254 max_depth=math.inf if depth is None else depth,
255 max_pages=math.inf if max_pages is None else max_pages,
256 should_cancel=_should_cancel,
257 filter_chain=filter_chain,
258 )
259 config = CrawlerRunConfig(
260 deep_crawl_strategy=strategy,
261 page_timeout=int(timeout * 1000),
262 mean_delay=concurrency.mean_delay,
263 max_range=concurrency.max_delay_range,
264 semaphore_count=concurrency.semaphore_count,
265 stream=True,
266 )
268 dispatcher = _build_rate_limited_dispatcher(concurrency)
269 stream: Any = None
270 strategy_cancelled = False
271 # Exceptions propagate to the orchestration layer, which decides
272 # whether to log cancel-teardown noise at debug vs surface a real
273 # failure. The adapter's only housekeeping is stream close + BFS
274 # strategy cancel so Playwright tears down in order.
275 async with _open_crawler(quiet=self._quiet, dispatcher=dispatcher) as crawler:
276 stream = await crawler.arun(url=seed_url, config=config)
277 try:
278 async for cr in _iter_crawl_stream(stream):
279 if _should_cancel():
280 _safe_strategy_cancel(strategy)
281 strategy_cancelled = True
282 break
283 if cr.success:
284 yield FetchedPage(url=cr.url, markdown=cr.markdown or "")
285 else:
286 yield FetchedPage(
287 url=cr.url,
288 success=False,
289 error=cr.error_message or "Unknown error",
290 )
291 finally:
292 # If the consumer breaks out before we saw a cancel, still
293 # short-circuit the BFS strategy so any in-flight arun_many
294 # batch stops dispatching. Mirrors the orchestrator's
295 # previous "hard cap on visible counter" behavior now that
296 # the strategy object lives inside the adapter.
297 if not strategy_cancelled:
298 _safe_strategy_cancel(strategy)
299 # Close the async generator (if it is one) before the
300 # crawler context exits, so Playwright tears down
301 # in-flight URLs in order. Skipping this is what produced
302 # the "BrowserContext.new_page: Connection closed" spam
303 # on cancel.
304 await _safe_aclose(stream)
307# Protocol conformance check: Crawl4aiFetcher is structurally a WebFetcher.
308# We don't instantiate at import time so the check stays purely structural.
309if TYPE_CHECKING:
310 _: WebFetcher = Crawl4aiFetcher()
313@functools.cache
314def crawler_available() -> bool:
315 """Check if the crawl4ai backend is importable (i.e. the extra is installed).
317 Uses ``importlib.util.find_spec`` rather than ``import crawl4ai`` so the
318 check stays fast on the UI thread. ``crawl4ai`` is in AGENTS.md's
319 known-heavy-imports list; executing it on Windows with Defender
320 real-time scanning takes seconds, and the Settings screen's feature-
321 gate call (``_FEATURE_GATED_GROUPS``) hits it synchronously during
322 ``compose``. ``find_spec`` just walks ``sys.path`` to locate the
323 package; the actual import runs later from the crawler bootstrap
324 where the cost is expected.
325 """
326 import importlib.util
328 return importlib.util.find_spec("crawl4ai") is not None