Coverage for src / lilbee / crawler / runner.py: 100%
153 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Crawl orchestration: build specs from ``cfg``, drive a :class:`WebFetcher`.
3By default a recursive crawl is scoped to the exact starting host so a
4Wikipedia article does not wander into other language editions. Callers
5opt into subdomain scope via ``include_subdomains=True``.
6"""
8from __future__ import annotations
10import asyncio
11import logging
12import threading
13import time
14from collections.abc import Awaitable, Callable
15from datetime import UTC, datetime
16from pathlib import Path
17from typing import Any
19from lilbee.app.services import get_services
20from lilbee.core.config import cfg
21from lilbee.crawler import bootstrap, save, sitemap
22from lilbee.crawler.bootstrap import CrawlerBrowserError
23from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher
24from lilbee.crawler.discovery import build_concurrency_spec, build_filter_spec
25from lilbee.crawler.events import (
26 _drain_page_stream,
27 _fetched_to_result,
28 _handle_crawl_teardown_error,
29 _pages_cap,
30)
31from lilbee.crawler.models import CrawlResult
32from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta
33from lilbee.crawler.url_filter import validate_crawl_url
34from lilbee.runtime.progress import (
35 CrawlDoneEvent,
36 CrawlPageEvent,
37 CrawlStartEvent,
38 DetailedProgressCallback,
39 EventType,
40)
42log = logging.getLogger(__name__)
45def _get_crawl_semaphore() -> asyncio.Semaphore | None:
46 """Return the process-wide crawl semaphore, or None when unlimited."""
47 return get_services().crawler_semaphore
50def _resolve_limit(value: int | None, cfg_ceiling: int | None) -> int | None:
51 """Resolve a caller-provided crawl limit to the number the fetcher consumes.
53 None -> cfg_ceiling (itself may be None; ``None`` means unbounded)
54 n > 0 -> n (explicit caller intent; cfg is not a ceiling here)
55 n <= 0 -> ValueError (use None for unbounded, not 0)
56 """
57 effective = value if value is not None else cfg_ceiling
58 if effective is None:
59 return None
60 if effective <= 0:
61 raise ValueError("crawl limit must be a positive int or None")
62 return effective
65async def crawl_single(url: str, *, quiet: bool = False) -> CrawlResult:
66 """Fetch a single URL.
68 Raises :class:`CrawlerBackendError` if the crawler extra isn't installed.
69 """
70 validate_crawl_url(url)
71 from lilbee.crawler import crawler_available
73 if not crawler_available():
74 raise bootstrap.CrawlerBackendError(
75 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
76 )
77 try:
78 async with Crawl4aiFetcher(quiet=quiet) as fetcher:
79 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout)
80 return _fetched_to_result(page)
81 except CrawlerBrowserError:
82 raise
83 except Exception as exc:
84 log.warning("Failed to crawl %s: %s", url, exc)
85 return CrawlResult(url=url, success=False, error=str(exc))
88async def crawl_recursive(
89 url: str,
90 max_depth: int | None = None,
91 max_pages: int | None = None,
92 on_progress: DetailedProgressCallback | None = None,
93 cancel: threading.Event | None = None,
94 *,
95 quiet: bool = False,
96 include_subdomains: bool = False,
97 on_result: Callable[[CrawlResult], Any] | None = None,
98) -> list[CrawlResult]:
99 """Crawl a URL recursively using BFS, streaming per-page progress.
101 None values for ``max_depth`` / ``max_pages`` mean unbounded (constrained
102 only by whatever ceiling the user has set in ``cfg.crawl_max_{depth,pages}``,
103 if any). Positive ints are explicit caps. ``CRAWL_PAGE`` events fire as
104 each page completes; total is ``CRAWL_TOTAL_UNKNOWN`` by default and
105 promoted to the sitemap count when available.
107 Pass ``include_subdomains=True`` to broaden scope from the exact host to the
108 host plus any subdomains. If ``on_result`` is provided, it's called for each
109 streamed ``CrawlResult`` the moment it arrives so callers can flush pages to
110 disk incrementally and keep partial output across cancellation.
111 """
112 validate_crawl_url(url)
113 depth = _resolve_limit(max_depth, cfg.crawl_max_depth)
114 pages = _resolve_limit(max_pages, cfg.crawl_max_pages)
116 # Fail fast when the ``crawler`` extra wasn't installed so SSE
117 # callers see ``event: error`` instead of a silent zero-results run.
118 from lilbee.crawler import crawler_available
120 if not crawler_available():
121 raise bootstrap.CrawlerBackendError(
122 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
123 )
125 # Fail fast before pulling in backend submodules so callers get a clean
126 # CrawlerBrowserError instead of a Playwright install banner.
127 if not bootstrap.chromium_installed():
128 raise CrawlerBrowserError(
129 "Playwright Chromium browser not installed. "
130 "Run 'uv run playwright install chromium' to enable /crawl."
131 )
133 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count
134 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any
135 # failure; off the hot path so a slow/missing sitemap never blocks the crawl.
136 sitemap_total = await asyncio.to_thread(
137 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains
138 )
140 concurrency = build_concurrency_spec()
141 filters = build_filter_spec(include_subdomains=include_subdomains)
143 results: list[CrawlResult] = []
144 try:
145 async with Crawl4aiFetcher(quiet=quiet) as fetcher:
146 # Hold an explicit reference to the generator so we can aclose
147 # it deterministically on break. Without this, the generator's
148 # finally block (which also short-circuits the BFS strategy) only
149 # runs at gc time, which is too late for callers that expect the
150 # strategy to stop the moment we hit ``max_pages``.
151 page_stream = fetcher.fetch_recursive(
152 url,
153 depth=depth,
154 max_pages=pages,
155 timeout=cfg.crawl_timeout,
156 concurrency=concurrency,
157 filters=filters,
158 cancel=cancel,
159 )
160 try:
161 results = await _drain_page_stream(
162 page_stream,
163 on_progress=on_progress,
164 on_result=on_result,
165 sitemap_total=sitemap_total,
166 pages_cap=_pages_cap(pages),
167 cancel=cancel,
168 )
169 finally:
170 await page_stream.aclose()
171 except CrawlerBrowserError:
172 raise
173 except Exception as exc:
174 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results)
176 return results
179async def _maybe_periodic_sync(tasks: set[asyncio.Task[None]]) -> None:
180 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed.
182 Skips when periodic sync is disabled (``interval=0``) or another sync
183 is already running. The spawned task is added to ``tasks`` so the
184 caller can drain it before returning.
185 """
186 interval = cfg.crawl_sync_interval
187 sync_state = get_services().crawler_sync_state
188 if interval <= 0 or not sync_state.lock.acquire(blocking=False):
189 return
191 now = time.monotonic()
192 if now - sync_state.last_run < interval:
193 sync_state.lock.release()
194 return
196 sync_state.last_run = now
198 async def _run_sync() -> None:
199 try:
200 from lilbee.data.ingest import sync
202 await sync(quiet=True)
203 except Exception as exc:
204 log.warning("Periodic sync during crawl failed: %s", exc)
205 finally:
206 sync_state.lock.release()
208 task = asyncio.create_task(_run_sync())
209 tasks.add(task)
210 task.add_done_callback(tasks.discard)
213def _make_flush_page(
214 meta: dict[str, CrawlMeta],
215 written_paths: list[Path],
216 counter: dict[str, int],
217) -> Callable[[CrawlResult], Any]:
218 """Build a per-result flush closure that batches metadata writes via ``to_thread``."""
220 def _sync_flush(result: CrawlResult) -> Path | None:
221 outcome = save._save_single_result(result, meta)
222 if outcome is None:
223 return None
224 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat())
225 counter["pending"] += 1
226 if counter["pending"] >= METADATA_FLUSH_INTERVAL:
227 save.save_crawl_metadata(meta)
228 counter["pending"] = 0
229 return outcome.path
231 async def flush_page(result: CrawlResult) -> Path | None:
232 path = await asyncio.to_thread(_sync_flush, result)
233 if path is not None:
234 written_paths.append(path)
235 return path
237 return flush_page
240async def _ensure_crawler_ready(
241 on_progress: DetailedProgressCallback | None,
242) -> None:
243 """Reject early when the extra is missing; bootstrap Chromium on first use.
245 Runs before the Chromium bootstrap so a user without [crawler] doesn't pay
246 the ~160 MB download just to hit the same error afterward. The bootstrap
247 short-circuits when Chromium is already installed; any progress is forwarded
248 through ``on_progress`` so downstream UIs surface a 'setup' stage.
249 """
250 from lilbee.crawler import crawler_available
252 if not crawler_available():
253 raise bootstrap.CrawlerBackendError(
254 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it."
255 )
257 if not bootstrap.chromium_installed():
258 await bootstrap.bootstrap_chromium(on_progress=on_progress)
261async def _run_crawl(
262 url: str,
263 *,
264 depth: int | None,
265 max_pages: int | None,
266 on_progress: DetailedProgressCallback | None,
267 cancel: threading.Event | None,
268 quiet: bool,
269 include_subdomains: bool,
270 flush_page: Callable[[Any], Awaitable[Path | None]],
271) -> int:
272 """Run the single-URL or recursive crawl. Returns ``pages_seen``."""
273 if depth == 0:
274 result = await crawl_single(url, quiet=quiet)
275 try:
276 await flush_page(result)
277 except OSError:
278 log.exception("Flush failed for %s", result.url)
279 if on_progress:
280 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1))
281 return 1
282 results = await crawl_recursive(
283 url,
284 max_depth=depth,
285 max_pages=max_pages,
286 on_progress=on_progress,
287 cancel=cancel,
288 quiet=quiet,
289 include_subdomains=include_subdomains,
290 on_result=flush_page,
291 )
292 return len(results)
295async def crawl_and_save(
296 url: str,
297 *,
298 depth: int | None = None,
299 max_pages: int | None = None,
300 on_progress: DetailedProgressCallback | None = None,
301 cancel: threading.Event | None = None,
302 quiet: bool = False,
303 include_subdomains: bool = False,
304) -> list[Path]:
305 """Crawl URL(s), save as markdown, update metadata. Returns paths written.
307 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` =
308 single URL, no recursion. ``N > 0`` = max link-follow depth. ``max_pages``:
309 ``None`` = no limit, positive int = cap. ``cfg.crawl_max_{depth,pages}`` act
310 as ceilings applied only when ``depth``/``max_pages`` are ``None``.
312 Hash-based change detection: always fetches but only saves changed or new
313 files. Pages flush to disk as they stream so a cancelled crawl preserves
314 the pages already fetched.
315 """
316 await _ensure_crawler_ready(on_progress)
318 sem = _get_crawl_semaphore()
319 if sem is not None:
320 await sem.acquire()
321 tasks: set[asyncio.Task[None]] = set()
322 try:
323 if on_progress:
324 start_depth = depth if depth is not None else 0
325 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth))
327 meta = save.load_crawl_metadata()
328 written_paths: list[Path] = []
329 counter = {"pending": 0}
330 flush_page = _make_flush_page(meta, written_paths, counter)
332 pages_seen = await _run_crawl(
333 url,
334 depth=depth,
335 max_pages=max_pages,
336 on_progress=on_progress,
337 cancel=cancel,
338 quiet=quiet,
339 include_subdomains=include_subdomains,
340 flush_page=flush_page,
341 )
343 if counter["pending"] > 0:
344 try:
345 save.save_crawl_metadata(meta)
346 except OSError:
347 log.exception("Final metadata flush failed")
349 cancelled = cancel is not None and cancel.is_set()
350 if not cancelled:
351 await _maybe_periodic_sync(tasks)
353 if on_progress:
354 on_progress(
355 EventType.CRAWL_DONE,
356 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)),
357 )
359 return written_paths
360 finally:
361 # Drain this call's periodic-sync tasks before returning so
362 # asyncio.run() doesn't close the loop with a pending sync.
363 if tasks:
364 await asyncio.gather(*tasks, return_exceptions=True)
365 if sem is not None:
366 sem.release()