Coverage for src / lilbee / crawler / runner.py: 100%
188 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Crawl orchestration: build specs from ``cfg``, drive a :class:`WebFetcher`.
3By default a recursive crawl is scoped to the exact starting host so a
4Wikipedia article does not wander into other language editions. Callers
5opt into subdomain scope via ``include_subdomains=True``.
6"""
8from __future__ import annotations
10import asyncio
11import logging
12import threading
13import time
14from collections.abc import Awaitable, Callable
15from datetime import UTC, datetime
16from pathlib import Path
17from typing import Any
19from lilbee.app.services import get_services
20from lilbee.core.config import cfg
21from lilbee.core.config.enums import CrawlRenderMode
22from lilbee.crawler import bootstrap, save, sitemap
23from lilbee.crawler.bootstrap import CrawlerBrowserError
24from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher
25from lilbee.crawler.discovery import build_concurrency_spec, build_filter_spec
26from lilbee.crawler.events import (
27 _drain_page_stream,
28 _fetched_to_result,
29 _handle_crawl_teardown_error,
30 _pages_cap,
31)
32from lilbee.crawler.models import CRAWL_PAGES_UNLIMITED, CrawlResult
33from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta
34from lilbee.crawler.url_filter import validate_crawl_url
35from lilbee.runtime.progress import (
36 CrawlDoneEvent,
37 CrawlPageEvent,
38 CrawlStartEvent,
39 DetailedProgressCallback,
40 EventType,
41 SetupDoneEvent,
42 SetupStartEvent,
43)
45# Component name for the browser-warmup setup phase (distinct from the
46# Chromium download, whose component is "chromium"). The crawl emits a
47# start/done bracket around opening the crawler so the Task Center shows a
48# "preparing crawler" stage instead of a silent stall on first use.
49_BROWSER_SETUP_COMPONENT = "browser"
51log = logging.getLogger(__name__)
54def _get_crawl_semaphore() -> asyncio.Semaphore | None:
55 """Return the process-wide crawl semaphore, or None when unlimited."""
56 return get_services().crawler_semaphore
59def _resolve_depth(value: int | None, cfg_ceiling: int | None) -> int | None:
60 """Resolve a crawl depth to the value the dispatcher consumes.
62 Depth has its own contract, distinct from the page-count limit: ``0`` is a
63 valid "seed only / single page" depth, not "unbounded". (Page counts use
64 :func:`_resolve_page_limit`, where ``0`` means "no limit".)
66 None -> cfg_ceiling (itself may be None; ``None`` means unbounded)
67 n >= 0 -> n (0 = seed only; explicit caller intent overrides cfg)
68 n < 0 -> ValueError (use None for unbounded)
69 """
70 effective = value if value is not None else cfg_ceiling
71 if effective is None:
72 return None
73 if effective < 0:
74 raise ValueError("crawl depth must be 0 (seed only) or a positive int")
75 return effective
78def _resolve_page_limit(max_pages: int | None) -> int | None:
79 """Resolve the page bound the fetcher consumes (None means unbounded).
81 ``CRAWL_PAGES_UNLIMITED`` (0) is an explicit "no limit" and returns None.
82 ``None`` is unspecified: it falls back to ``cfg.crawl_max_pages`` if set,
83 else the protective default ``cfg.crawl_safety_max_pages`` so a hostile site
84 can't exhaust the disk on a crawl nobody bounded. A positive int is honored
85 as-is, even above the default.
86 """
87 if max_pages == CRAWL_PAGES_UNLIMITED:
88 return None
89 if max_pages is not None:
90 return max_pages
91 if cfg.crawl_max_pages is not None:
92 return cfg.crawl_max_pages
93 return cfg.crawl_safety_max_pages
96def _looks_like_missing_chromium(exc: BaseException) -> bool:
97 """Heuristic for the Playwright "Executable doesn't exist" launch failure."""
98 return "Executable doesn't exist" in str(exc)
101async def crawl_single(
102 url: str,
103 *,
104 quiet: bool = False,
105 on_progress: DetailedProgressCallback | None = None,
106 render_mode: CrawlRenderMode = CrawlRenderMode.BROWSER,
107) -> CrawlResult:
108 """Fetch a single URL.
110 ``render_mode`` defaults to ``BROWSER`` for direct callers; the public
111 entry point :func:`crawl_and_save` resolves it from ``cfg.crawl_render_mode``
112 and passes the canonical value down.
114 Raises :class:`CrawlerBackendError` if the crawler extra isn't installed.
115 On a "Chromium executable missing" launch failure, re-runs the
116 bootstrap once and retries -- ``chromium_installed()`` can return True
117 when the wrong revision lives in the cache root, in which case the
118 launch fails the first attempt.
120 ``on_progress`` receives a setup_start/setup_done bracket around opening
121 the crawler so the first crawl's browser warmup is visible rather than a
122 silent stall.
123 """
124 validate_crawl_url(url)
125 from lilbee.crawler import crawler_available
127 if not crawler_available():
128 raise bootstrap.CrawlerBackendError(
129 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
130 )
131 # The setup bracket exists to surface the Chromium warmup, which only
132 # happens in browser mode; HTTP mode opens a browserless client with no
133 # warmup, so emitting a "browser" setup stage there would be misleading.
134 emit_setup = render_mode is CrawlRenderMode.BROWSER
135 if on_progress is not None and emit_setup:
136 on_progress(EventType.SETUP_START, SetupStartEvent(component=_BROWSER_SETUP_COMPONENT))
137 try:
138 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher:
139 if on_progress is not None and emit_setup:
140 on_progress(
141 EventType.SETUP_DONE,
142 SetupDoneEvent(component=_BROWSER_SETUP_COMPONENT, success=True),
143 )
144 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout)
145 return _fetched_to_result(page)
146 except CrawlerBrowserError:
147 raise
148 except Exception as exc:
149 if _looks_like_missing_chromium(exc):
150 log.warning("Chromium missing for %s; bootstrapping then retrying", url)
151 await bootstrap.bootstrap_chromium(on_progress=None)
152 try:
153 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher:
154 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout)
155 return _fetched_to_result(page)
156 except Exception as retry_exc:
157 log.warning("Crawl retry failed for %s: %s", url, retry_exc)
158 return CrawlResult(url=url, success=False, error=str(retry_exc))
159 log.warning("Failed to crawl %s: %s", url, exc)
160 return CrawlResult(url=url, success=False, error=str(exc))
163async def crawl_recursive(
164 url: str,
165 max_depth: int | None = None,
166 max_pages: int | None = None,
167 on_progress: DetailedProgressCallback | None = None,
168 cancel: threading.Event | None = None,
169 *,
170 quiet: bool = False,
171 include_subdomains: bool = False,
172 on_result: Callable[[CrawlResult], Any] | None = None,
173 render_mode: CrawlRenderMode = CrawlRenderMode.BROWSER,
174) -> list[CrawlResult]:
175 """Crawl a URL recursively using BFS, streaming per-page progress.
177 ``render_mode`` defaults to ``BROWSER`` for direct callers; the public
178 entry point :func:`crawl_and_save` resolves it from ``cfg.crawl_render_mode``
179 and passes the canonical value down.
181 ``max_depth`` of None means unbounded depth. ``max_pages`` of
182 ``CRAWL_PAGES_UNLIMITED`` (0) means no page limit; a positive int is that
183 cap; None is unspecified and falls back to ``cfg.crawl_safety_max_pages`` so
184 a hostile site can't exhaust the disk on a crawl nobody bounded.
185 ``CRAWL_PAGE`` events fire as each page completes; total is
186 ``CRAWL_TOTAL_UNKNOWN`` by default and promoted to the sitemap count
187 when available.
189 Pass ``include_subdomains=True`` to broaden scope from the exact host to the
190 host plus any subdomains. If ``on_result`` is provided, it's called for each
191 streamed ``CrawlResult`` the moment it arrives so callers can flush pages to
192 disk incrementally and keep partial output across cancellation.
193 """
194 validate_crawl_url(url)
195 # ``_run_crawl`` already resolved the depth ceiling (and routed a seed-only
196 # 0 to the single-page path), so the recursive path takes ``max_depth`` as
197 # given: None = unbounded, a positive int = the cap.
198 depth = max_depth
199 pages = _resolve_page_limit(max_pages)
201 # Fail fast when the ``crawler`` extra wasn't installed so SSE
202 # callers see ``event: error`` instead of a silent zero-results run.
203 from lilbee.crawler import crawler_available
205 if not crawler_available():
206 raise bootstrap.CrawlerBackendError(
207 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
208 )
210 # Fail fast before pulling in backend submodules so callers get a clean
211 # CrawlerBrowserError instead of a Playwright install banner. HTTP mode
212 # needs no browser, so the guard only applies to browser-mode crawls.
213 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed():
214 raise CrawlerBrowserError(
215 "Playwright Chromium browser not installed. "
216 "Run 'uv run playwright install chromium' to enable browser-mode crawling."
217 )
219 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count
220 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any
221 # failure; off the hot path so a slow/missing sitemap never blocks the crawl.
222 sitemap_total = await asyncio.to_thread(
223 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains
224 )
226 concurrency = build_concurrency_spec()
227 filters = build_filter_spec(include_subdomains=include_subdomains)
229 results: list[CrawlResult] = []
230 # Browser mode launches Chromium, whose one-time warmup can take many
231 # seconds; bracket it with setup events so the Task Center shows a
232 # "preparing crawler" stage instead of a silent stall. HTTP mode has no
233 # browser warmup, so the bracket is skipped to avoid a misleading stage.
234 emit_setup = render_mode is CrawlRenderMode.BROWSER
235 if on_progress is not None and emit_setup:
236 on_progress(EventType.SETUP_START, SetupStartEvent(component=_BROWSER_SETUP_COMPONENT))
237 try:
238 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher:
239 if on_progress is not None and emit_setup:
240 on_progress(
241 EventType.SETUP_DONE,
242 SetupDoneEvent(component=_BROWSER_SETUP_COMPONENT, success=True),
243 )
244 # Hold an explicit reference to the generator so we can aclose
245 # it deterministically on break. Without this, the generator's
246 # finally block (which also short-circuits the BFS strategy) only
247 # runs at gc time, which is too late for callers that expect the
248 # strategy to stop the moment we hit ``max_pages``.
249 page_stream = fetcher.fetch_recursive(
250 url,
251 depth=depth,
252 max_pages=pages,
253 timeout=cfg.crawl_timeout,
254 concurrency=concurrency,
255 filters=filters,
256 cancel=cancel,
257 )
258 try:
259 results = await _drain_page_stream(
260 page_stream,
261 on_progress=on_progress,
262 on_result=on_result,
263 sitemap_total=sitemap_total,
264 pages_cap=_pages_cap(pages),
265 cancel=cancel,
266 )
267 finally:
268 await page_stream.aclose()
269 except CrawlerBrowserError:
270 raise
271 except Exception as exc:
272 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results)
274 return results
277async def _maybe_periodic_sync(tasks: set[asyncio.Task[None]]) -> None:
278 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed.
280 Skips when periodic sync is disabled (``interval=0``) or another sync
281 is already running. The spawned task is added to ``tasks`` so the
282 caller can drain it before returning.
283 """
284 interval = cfg.crawl_sync_interval
285 sync_state = get_services().crawler_sync_state
286 if interval <= 0 or not sync_state.lock.acquire(blocking=False):
287 return
289 now = time.monotonic()
290 if now - sync_state.last_run < interval:
291 sync_state.lock.release()
292 return
294 sync_state.last_run = now
296 async def _run_sync() -> None:
297 try:
298 from lilbee.data.ingest import sync
300 await sync(quiet=True)
301 except Exception as exc:
302 log.warning("Periodic sync during crawl failed: %s", exc)
303 finally:
304 sync_state.lock.release()
306 task = asyncio.create_task(_run_sync())
307 tasks.add(task)
308 task.add_done_callback(tasks.discard)
311def _make_flush_page(
312 meta: dict[str, CrawlMeta],
313 written_paths: list[Path],
314 counter: dict[str, int],
315) -> Callable[[CrawlResult], Any]:
316 """Build a per-result flush closure that batches metadata writes via ``to_thread``."""
318 def _sync_flush(result: CrawlResult) -> Path | None:
319 outcome = save._save_single_result(result, meta)
320 if outcome is None:
321 return None
322 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat())
323 counter["pending"] += 1
324 if counter["pending"] >= METADATA_FLUSH_INTERVAL:
325 save.save_crawl_metadata(meta)
326 counter["pending"] = 0
327 return outcome.path
329 async def flush_page(result: CrawlResult) -> Path | None:
330 path = await asyncio.to_thread(_sync_flush, result)
331 if path is not None:
332 written_paths.append(path)
333 return path
335 return flush_page
338async def _ensure_crawler_ready(
339 on_progress: DetailedProgressCallback | None,
340 render_mode: CrawlRenderMode,
341) -> None:
342 """Reject early when the extra is missing; bootstrap Chromium on first use.
344 Runs before the Chromium bootstrap so a user without [crawler] doesn't pay
345 the ~160 MB download just to hit the same error afterward. Only browser mode
346 needs Chromium; HTTP mode skips the bootstrap entirely. The bootstrap
347 short-circuits when Chromium is already installed; any progress is forwarded
348 through ``on_progress`` so downstream UIs surface a 'setup' stage.
349 """
350 from lilbee.crawler import crawler_available
352 if not crawler_available():
353 raise bootstrap.CrawlerBackendError(
354 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
355 )
357 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed():
358 await bootstrap.bootstrap_chromium(on_progress=on_progress)
361async def _run_crawl(
362 url: str,
363 *,
364 depth: int | None,
365 max_pages: int | None,
366 on_progress: DetailedProgressCallback | None,
367 cancel: threading.Event | None,
368 quiet: bool,
369 include_subdomains: bool,
370 flush_page: Callable[[Any], Awaitable[Path | None]],
371 render_mode: CrawlRenderMode,
372) -> int:
373 """Run the single-URL or recursive crawl. Returns ``pages_seen``.
375 Resolves the depth ceiling here (permitting the seed-only ``0``) so a
376 ``cfg.crawl_max_depth`` of 0 routes to the single-page path instead of
377 blowing up inside the recursive resolver.
379 A resolved page limit of 1 is also a single-page crawl: crawl4ai's BFS
380 under-counts tiny ``max_pages`` (``max_pages=1`` yields 0 pages), so route
381 the "at most one page" request to the reliable single-URL fetch.
382 """
383 depth = _resolve_depth(depth, cfg.crawl_max_depth)
384 pages = _resolve_page_limit(max_pages)
385 if depth == 0 or pages == 1:
386 result = await crawl_single(
387 url, quiet=quiet, on_progress=on_progress, render_mode=render_mode
388 )
389 try:
390 await flush_page(result)
391 except OSError:
392 log.exception("Flush failed for %s", result.url)
393 if on_progress:
394 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1))
395 return 1
396 results = await crawl_recursive(
397 url,
398 max_depth=depth,
399 max_pages=max_pages,
400 on_progress=on_progress,
401 cancel=cancel,
402 quiet=quiet,
403 include_subdomains=include_subdomains,
404 on_result=flush_page,
405 render_mode=render_mode,
406 )
407 return len(results)
410async def crawl_and_save(
411 url: str,
412 *,
413 depth: int | None = None,
414 max_pages: int | None = None,
415 on_progress: DetailedProgressCallback | None = None,
416 cancel: threading.Event | None = None,
417 quiet: bool = False,
418 include_subdomains: bool = False,
419 render_mode: CrawlRenderMode | None = None,
420) -> list[Path]:
421 """Crawl URL(s), save as markdown, update metadata. Returns paths written.
423 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` =
424 single URL, no recursion. ``N > 0`` = max link-follow depth. ``max_pages``:
425 ``None`` = no limit, positive int = cap. ``cfg.crawl_max_{depth,pages}`` act
426 as ceilings applied only when ``depth``/``max_pages`` are ``None``.
428 ``render_mode``: ``None`` resolves to ``cfg.crawl_render_mode`` (the single
429 write-boundary for the default). ``http`` fetches without a browser;
430 ``browser`` runs a tuned Chromium with JavaScript enabled.
432 Hash-based change detection: always fetches but only saves changed or new
433 files. Pages flush to disk as they stream so a cancelled crawl preserves
434 the pages already fetched.
435 """
436 mode = render_mode if render_mode is not None else cfg.crawl_render_mode
437 await _ensure_crawler_ready(on_progress, mode)
439 sem = _get_crawl_semaphore()
440 if sem is not None:
441 await sem.acquire()
442 tasks: set[asyncio.Task[None]] = set()
443 try:
444 if on_progress:
445 start_depth = depth if depth is not None else 0
446 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth))
448 meta = save.load_crawl_metadata()
449 written_paths: list[Path] = []
450 counter = {"pending": 0}
451 flush_page = _make_flush_page(meta, written_paths, counter)
453 pages_seen = await _run_crawl(
454 url,
455 depth=depth,
456 max_pages=max_pages,
457 on_progress=on_progress,
458 cancel=cancel,
459 quiet=quiet,
460 include_subdomains=include_subdomains,
461 flush_page=flush_page,
462 render_mode=mode,
463 )
465 if counter["pending"] > 0:
466 try:
467 save.save_crawl_metadata(meta)
468 except OSError:
469 log.exception("Final metadata flush failed")
471 cancelled = cancel is not None and cancel.is_set()
472 if not cancelled:
473 await _maybe_periodic_sync(tasks)
475 if on_progress:
476 on_progress(
477 EventType.CRAWL_DONE,
478 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)),
479 )
481 return written_paths
482 finally:
483 # Drain this call's periodic-sync tasks before returning so
484 # asyncio.run() doesn't close the loop with a pending sync.
485 if tasks:
486 await asyncio.gather(*tasks, return_exceptions=True)
487 if sem is not None:
488 sem.release()