Coverage for src / lilbee / crawler / runner.py: 100%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Crawl orchestration: build specs from ``cfg``, drive a :class:`WebFetcher`. 

2 

3By default a recursive crawl is scoped to the exact starting host so a 

4Wikipedia article does not wander into other language editions. Callers 

5opt into subdomain scope via ``include_subdomains=True``. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import logging 

12import threading 

13import time 

14from collections.abc import Awaitable, Callable 

15from datetime import UTC, datetime 

16from pathlib import Path 

17from typing import Any 

18 

19from lilbee.app.services import get_services 

20from lilbee.core.config import cfg 

21from lilbee.core.config.enums import CrawlRenderMode 

22from lilbee.crawler import bootstrap, save, sitemap 

23from lilbee.crawler.bootstrap import CrawlerBrowserError 

24from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher 

25from lilbee.crawler.discovery import build_concurrency_spec, build_filter_spec 

26from lilbee.crawler.events import ( 

27 _drain_page_stream, 

28 _fetched_to_result, 

29 _handle_crawl_teardown_error, 

30 _pages_cap, 

31) 

32from lilbee.crawler.models import CRAWL_PAGES_UNLIMITED, CrawlResult 

33from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta 

34from lilbee.crawler.url_filter import validate_crawl_url 

35from lilbee.runtime.progress import ( 

36 CrawlDoneEvent, 

37 CrawlPageEvent, 

38 CrawlStartEvent, 

39 DetailedProgressCallback, 

40 EventType, 

41 SetupDoneEvent, 

42 SetupStartEvent, 

43) 

44 

45# Component name for the browser-warmup setup phase (distinct from the 

46# Chromium download, whose component is "chromium"). The crawl emits a 

47# start/done bracket around opening the crawler so the Task Center shows a 

48# "preparing crawler" stage instead of a silent stall on first use. 

49_BROWSER_SETUP_COMPONENT = "browser" 

50 

51log = logging.getLogger(__name__) 

52 

53 

54def _get_crawl_semaphore() -> asyncio.Semaphore | None: 

55 """Return the process-wide crawl semaphore, or None when unlimited.""" 

56 return get_services().crawler_semaphore 

57 

58 

59def _resolve_depth(value: int | None, cfg_ceiling: int | None) -> int | None: 

60 """Resolve a crawl depth to the value the dispatcher consumes. 

61 

62 Depth has its own contract, distinct from the page-count limit: ``0`` is a 

63 valid "seed only / single page" depth, not "unbounded". (Page counts use 

64 :func:`_resolve_page_limit`, where ``0`` means "no limit".) 

65 

66 None -> cfg_ceiling (itself may be None; ``None`` means unbounded) 

67 n >= 0 -> n (0 = seed only; explicit caller intent overrides cfg) 

68 n < 0 -> ValueError (use None for unbounded) 

69 """ 

70 effective = value if value is not None else cfg_ceiling 

71 if effective is None: 

72 return None 

73 if effective < 0: 

74 raise ValueError("crawl depth must be 0 (seed only) or a positive int") 

75 return effective 

76 

77 

78def _resolve_page_limit(max_pages: int | None) -> int | None: 

79 """Resolve the page bound the fetcher consumes (None means unbounded). 

80 

81 ``CRAWL_PAGES_UNLIMITED`` (0) is an explicit "no limit" and returns None. 

82 ``None`` is unspecified: it falls back to ``cfg.crawl_max_pages`` if set, 

83 else the protective default ``cfg.crawl_safety_max_pages`` so a hostile site 

84 can't exhaust the disk on a crawl nobody bounded. A positive int is honored 

85 as-is, even above the default. 

86 """ 

87 if max_pages == CRAWL_PAGES_UNLIMITED: 

88 return None 

89 if max_pages is not None: 

90 return max_pages 

91 if cfg.crawl_max_pages is not None: 

92 return cfg.crawl_max_pages 

93 return cfg.crawl_safety_max_pages 

94 

95 

96def _looks_like_missing_chromium(exc: BaseException) -> bool: 

97 """Heuristic for the Playwright "Executable doesn't exist" launch failure.""" 

98 return "Executable doesn't exist" in str(exc) 

99 

100 

101async def crawl_single( 

102 url: str, 

103 *, 

104 quiet: bool = False, 

105 on_progress: DetailedProgressCallback | None = None, 

106 render_mode: CrawlRenderMode = CrawlRenderMode.BROWSER, 

107) -> CrawlResult: 

108 """Fetch a single URL. 

109 

110 ``render_mode`` defaults to ``BROWSER`` for direct callers; the public 

111 entry point :func:`crawl_and_save` resolves it from ``cfg.crawl_render_mode`` 

112 and passes the canonical value down. 

113 

114 Raises :class:`CrawlerBackendError` if the crawler extra isn't installed. 

115 On a "Chromium executable missing" launch failure, re-runs the 

116 bootstrap once and retries -- ``chromium_installed()`` can return True 

117 when the wrong revision lives in the cache root, in which case the 

118 launch fails the first attempt. 

119 

120 ``on_progress`` receives a setup_start/setup_done bracket around opening 

121 the crawler so the first crawl's browser warmup is visible rather than a 

122 silent stall. 

123 """ 

124 validate_crawl_url(url) 

125 from lilbee.crawler import crawler_available 

126 

127 if not crawler_available(): 

128 raise bootstrap.CrawlerBackendError( 

129 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

130 ) 

131 # The setup bracket exists to surface the Chromium warmup, which only 

132 # happens in browser mode; HTTP mode opens a browserless client with no 

133 # warmup, so emitting a "browser" setup stage there would be misleading. 

134 emit_setup = render_mode is CrawlRenderMode.BROWSER 

135 if on_progress is not None and emit_setup: 

136 on_progress(EventType.SETUP_START, SetupStartEvent(component=_BROWSER_SETUP_COMPONENT)) 

137 try: 

138 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher: 

139 if on_progress is not None and emit_setup: 

140 on_progress( 

141 EventType.SETUP_DONE, 

142 SetupDoneEvent(component=_BROWSER_SETUP_COMPONENT, success=True), 

143 ) 

144 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout) 

145 return _fetched_to_result(page) 

146 except CrawlerBrowserError: 

147 raise 

148 except Exception as exc: 

149 if _looks_like_missing_chromium(exc): 

150 log.warning("Chromium missing for %s; bootstrapping then retrying", url) 

151 await bootstrap.bootstrap_chromium(on_progress=None) 

152 try: 

153 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher: 

154 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout) 

155 return _fetched_to_result(page) 

156 except Exception as retry_exc: 

157 log.warning("Crawl retry failed for %s: %s", url, retry_exc) 

158 return CrawlResult(url=url, success=False, error=str(retry_exc)) 

159 log.warning("Failed to crawl %s: %s", url, exc) 

160 return CrawlResult(url=url, success=False, error=str(exc)) 

161 

162 

163async def crawl_recursive( 

164 url: str, 

165 max_depth: int | None = None, 

166 max_pages: int | None = None, 

167 on_progress: DetailedProgressCallback | None = None, 

168 cancel: threading.Event | None = None, 

169 *, 

170 quiet: bool = False, 

171 include_subdomains: bool = False, 

172 on_result: Callable[[CrawlResult], Any] | None = None, 

173 render_mode: CrawlRenderMode = CrawlRenderMode.BROWSER, 

174) -> list[CrawlResult]: 

175 """Crawl a URL recursively using BFS, streaming per-page progress. 

176 

177 ``render_mode`` defaults to ``BROWSER`` for direct callers; the public 

178 entry point :func:`crawl_and_save` resolves it from ``cfg.crawl_render_mode`` 

179 and passes the canonical value down. 

180 

181 ``max_depth`` of None means unbounded depth. ``max_pages`` of 

182 ``CRAWL_PAGES_UNLIMITED`` (0) means no page limit; a positive int is that 

183 cap; None is unspecified and falls back to ``cfg.crawl_safety_max_pages`` so 

184 a hostile site can't exhaust the disk on a crawl nobody bounded. 

185 ``CRAWL_PAGE`` events fire as each page completes; total is 

186 ``CRAWL_TOTAL_UNKNOWN`` by default and promoted to the sitemap count 

187 when available. 

188 

189 Pass ``include_subdomains=True`` to broaden scope from the exact host to the 

190 host plus any subdomains. If ``on_result`` is provided, it's called for each 

191 streamed ``CrawlResult`` the moment it arrives so callers can flush pages to 

192 disk incrementally and keep partial output across cancellation. 

193 """ 

194 validate_crawl_url(url) 

195 # ``_run_crawl`` already resolved the depth ceiling (and routed a seed-only 

196 # 0 to the single-page path), so the recursive path takes ``max_depth`` as 

197 # given: None = unbounded, a positive int = the cap. 

198 depth = max_depth 

199 pages = _resolve_page_limit(max_pages) 

200 

201 # Fail fast when the ``crawler`` extra wasn't installed so SSE 

202 # callers see ``event: error`` instead of a silent zero-results run. 

203 from lilbee.crawler import crawler_available 

204 

205 if not crawler_available(): 

206 raise bootstrap.CrawlerBackendError( 

207 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

208 ) 

209 

210 # Fail fast before pulling in backend submodules so callers get a clean 

211 # CrawlerBrowserError instead of a Playwright install banner. HTTP mode 

212 # needs no browser, so the guard only applies to browser-mode crawls. 

213 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed(): 

214 raise CrawlerBrowserError( 

215 "Playwright Chromium browser not installed. " 

216 "Run 'uv run playwright install chromium' to enable browser-mode crawling." 

217 ) 

218 

219 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count 

220 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any 

221 # failure; off the hot path so a slow/missing sitemap never blocks the crawl. 

222 sitemap_total = await asyncio.to_thread( 

223 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains 

224 ) 

225 

226 concurrency = build_concurrency_spec() 

227 filters = build_filter_spec(include_subdomains=include_subdomains) 

228 

229 results: list[CrawlResult] = [] 

230 # Browser mode launches Chromium, whose one-time warmup can take many 

231 # seconds; bracket it with setup events so the Task Center shows a 

232 # "preparing crawler" stage instead of a silent stall. HTTP mode has no 

233 # browser warmup, so the bracket is skipped to avoid a misleading stage. 

234 emit_setup = render_mode is CrawlRenderMode.BROWSER 

235 if on_progress is not None and emit_setup: 

236 on_progress(EventType.SETUP_START, SetupStartEvent(component=_BROWSER_SETUP_COMPONENT)) 

237 try: 

238 async with Crawl4aiFetcher(quiet=quiet, render_mode=render_mode) as fetcher: 

239 if on_progress is not None and emit_setup: 

240 on_progress( 

241 EventType.SETUP_DONE, 

242 SetupDoneEvent(component=_BROWSER_SETUP_COMPONENT, success=True), 

243 ) 

244 # Hold an explicit reference to the generator so we can aclose 

245 # it deterministically on break. Without this, the generator's 

246 # finally block (which also short-circuits the BFS strategy) only 

247 # runs at gc time, which is too late for callers that expect the 

248 # strategy to stop the moment we hit ``max_pages``. 

249 page_stream = fetcher.fetch_recursive( 

250 url, 

251 depth=depth, 

252 max_pages=pages, 

253 timeout=cfg.crawl_timeout, 

254 concurrency=concurrency, 

255 filters=filters, 

256 cancel=cancel, 

257 ) 

258 try: 

259 results = await _drain_page_stream( 

260 page_stream, 

261 on_progress=on_progress, 

262 on_result=on_result, 

263 sitemap_total=sitemap_total, 

264 pages_cap=_pages_cap(pages), 

265 cancel=cancel, 

266 ) 

267 finally: 

268 await page_stream.aclose() 

269 except CrawlerBrowserError: 

270 raise 

271 except Exception as exc: 

272 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results) 

273 

274 return results 

275 

276 

277async def _maybe_periodic_sync(tasks: set[asyncio.Task[None]]) -> None: 

278 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed. 

279 

280 Skips when periodic sync is disabled (``interval=0``) or another sync 

281 is already running. The spawned task is added to ``tasks`` so the 

282 caller can drain it before returning. 

283 """ 

284 interval = cfg.crawl_sync_interval 

285 sync_state = get_services().crawler_sync_state 

286 if interval <= 0 or not sync_state.lock.acquire(blocking=False): 

287 return 

288 

289 now = time.monotonic() 

290 if now - sync_state.last_run < interval: 

291 sync_state.lock.release() 

292 return 

293 

294 sync_state.last_run = now 

295 

296 async def _run_sync() -> None: 

297 try: 

298 from lilbee.data.ingest import sync 

299 

300 await sync(quiet=True) 

301 except Exception as exc: 

302 log.warning("Periodic sync during crawl failed: %s", exc) 

303 finally: 

304 sync_state.lock.release() 

305 

306 task = asyncio.create_task(_run_sync()) 

307 tasks.add(task) 

308 task.add_done_callback(tasks.discard) 

309 

310 

311def _make_flush_page( 

312 meta: dict[str, CrawlMeta], 

313 written_paths: list[Path], 

314 counter: dict[str, int], 

315) -> Callable[[CrawlResult], Any]: 

316 """Build a per-result flush closure that batches metadata writes via ``to_thread``.""" 

317 

318 def _sync_flush(result: CrawlResult) -> Path | None: 

319 outcome = save._save_single_result(result, meta) 

320 if outcome is None: 

321 return None 

322 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat()) 

323 counter["pending"] += 1 

324 if counter["pending"] >= METADATA_FLUSH_INTERVAL: 

325 save.save_crawl_metadata(meta) 

326 counter["pending"] = 0 

327 return outcome.path 

328 

329 async def flush_page(result: CrawlResult) -> Path | None: 

330 path = await asyncio.to_thread(_sync_flush, result) 

331 if path is not None: 

332 written_paths.append(path) 

333 return path 

334 

335 return flush_page 

336 

337 

338async def _ensure_crawler_ready( 

339 on_progress: DetailedProgressCallback | None, 

340 render_mode: CrawlRenderMode, 

341) -> None: 

342 """Reject early when the extra is missing; bootstrap Chromium on first use. 

343 

344 Runs before the Chromium bootstrap so a user without [crawler] doesn't pay 

345 the ~160 MB download just to hit the same error afterward. Only browser mode 

346 needs Chromium; HTTP mode skips the bootstrap entirely. The bootstrap 

347 short-circuits when Chromium is already installed; any progress is forwarded 

348 through ``on_progress`` so downstream UIs surface a 'setup' stage. 

349 """ 

350 from lilbee.crawler import crawler_available 

351 

352 if not crawler_available(): 

353 raise bootstrap.CrawlerBackendError( 

354 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

355 ) 

356 

357 if render_mode is CrawlRenderMode.BROWSER and not bootstrap.chromium_installed(): 

358 await bootstrap.bootstrap_chromium(on_progress=on_progress) 

359 

360 

361async def _run_crawl( 

362 url: str, 

363 *, 

364 depth: int | None, 

365 max_pages: int | None, 

366 on_progress: DetailedProgressCallback | None, 

367 cancel: threading.Event | None, 

368 quiet: bool, 

369 include_subdomains: bool, 

370 flush_page: Callable[[Any], Awaitable[Path | None]], 

371 render_mode: CrawlRenderMode, 

372) -> int: 

373 """Run the single-URL or recursive crawl. Returns ``pages_seen``. 

374 

375 Resolves the depth ceiling here (permitting the seed-only ``0``) so a 

376 ``cfg.crawl_max_depth`` of 0 routes to the single-page path instead of 

377 blowing up inside the recursive resolver. 

378 

379 A resolved page limit of 1 is also a single-page crawl: crawl4ai's BFS 

380 under-counts tiny ``max_pages`` (``max_pages=1`` yields 0 pages), so route 

381 the "at most one page" request to the reliable single-URL fetch. 

382 """ 

383 depth = _resolve_depth(depth, cfg.crawl_max_depth) 

384 pages = _resolve_page_limit(max_pages) 

385 if depth == 0 or pages == 1: 

386 result = await crawl_single( 

387 url, quiet=quiet, on_progress=on_progress, render_mode=render_mode 

388 ) 

389 try: 

390 await flush_page(result) 

391 except OSError: 

392 log.exception("Flush failed for %s", result.url) 

393 if on_progress: 

394 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1)) 

395 return 1 

396 results = await crawl_recursive( 

397 url, 

398 max_depth=depth, 

399 max_pages=max_pages, 

400 on_progress=on_progress, 

401 cancel=cancel, 

402 quiet=quiet, 

403 include_subdomains=include_subdomains, 

404 on_result=flush_page, 

405 render_mode=render_mode, 

406 ) 

407 return len(results) 

408 

409 

410async def crawl_and_save( 

411 url: str, 

412 *, 

413 depth: int | None = None, 

414 max_pages: int | None = None, 

415 on_progress: DetailedProgressCallback | None = None, 

416 cancel: threading.Event | None = None, 

417 quiet: bool = False, 

418 include_subdomains: bool = False, 

419 render_mode: CrawlRenderMode | None = None, 

420) -> list[Path]: 

421 """Crawl URL(s), save as markdown, update metadata. Returns paths written. 

422 

423 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` = 

424 single URL, no recursion. ``N > 0`` = max link-follow depth. ``max_pages``: 

425 ``None`` = no limit, positive int = cap. ``cfg.crawl_max_{depth,pages}`` act 

426 as ceilings applied only when ``depth``/``max_pages`` are ``None``. 

427 

428 ``render_mode``: ``None`` resolves to ``cfg.crawl_render_mode`` (the single 

429 write-boundary for the default). ``http`` fetches without a browser; 

430 ``browser`` runs a tuned Chromium with JavaScript enabled. 

431 

432 Hash-based change detection: always fetches but only saves changed or new 

433 files. Pages flush to disk as they stream so a cancelled crawl preserves 

434 the pages already fetched. 

435 """ 

436 mode = render_mode if render_mode is not None else cfg.crawl_render_mode 

437 await _ensure_crawler_ready(on_progress, mode) 

438 

439 sem = _get_crawl_semaphore() 

440 if sem is not None: 

441 await sem.acquire() 

442 tasks: set[asyncio.Task[None]] = set() 

443 try: 

444 if on_progress: 

445 start_depth = depth if depth is not None else 0 

446 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth)) 

447 

448 meta = save.load_crawl_metadata() 

449 written_paths: list[Path] = [] 

450 counter = {"pending": 0} 

451 flush_page = _make_flush_page(meta, written_paths, counter) 

452 

453 pages_seen = await _run_crawl( 

454 url, 

455 depth=depth, 

456 max_pages=max_pages, 

457 on_progress=on_progress, 

458 cancel=cancel, 

459 quiet=quiet, 

460 include_subdomains=include_subdomains, 

461 flush_page=flush_page, 

462 render_mode=mode, 

463 ) 

464 

465 if counter["pending"] > 0: 

466 try: 

467 save.save_crawl_metadata(meta) 

468 except OSError: 

469 log.exception("Final metadata flush failed") 

470 

471 cancelled = cancel is not None and cancel.is_set() 

472 if not cancelled: 

473 await _maybe_periodic_sync(tasks) 

474 

475 if on_progress: 

476 on_progress( 

477 EventType.CRAWL_DONE, 

478 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)), 

479 ) 

480 

481 return written_paths 

482 finally: 

483 # Drain this call's periodic-sync tasks before returning so 

484 # asyncio.run() doesn't close the loop with a pending sync. 

485 if tasks: 

486 await asyncio.gather(*tasks, return_exceptions=True) 

487 if sem is not None: 

488 sem.release()