Coverage for src / lilbee / crawler / runner.py: 100%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Crawl orchestration: build specs from ``cfg``, drive a :class:`WebFetcher`. 

2 

3By default a recursive crawl is scoped to the exact starting host so a 

4Wikipedia article does not wander into other language editions. Callers 

5opt into subdomain scope via ``include_subdomains=True``. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import logging 

12import threading 

13import time 

14from collections.abc import Awaitable, Callable 

15from datetime import UTC, datetime 

16from pathlib import Path 

17from typing import Any 

18 

19from lilbee.app.services import get_services 

20from lilbee.core.config import cfg 

21from lilbee.crawler import bootstrap, save, sitemap 

22from lilbee.crawler.bootstrap import CrawlerBrowserError 

23from lilbee.crawler.crawl4ai_fetcher import Crawl4aiFetcher 

24from lilbee.crawler.discovery import build_concurrency_spec, build_filter_spec 

25from lilbee.crawler.events import ( 

26 _drain_page_stream, 

27 _fetched_to_result, 

28 _handle_crawl_teardown_error, 

29 _pages_cap, 

30) 

31from lilbee.crawler.models import CrawlResult 

32from lilbee.crawler.save import METADATA_FLUSH_INTERVAL, CrawlMeta 

33from lilbee.crawler.url_filter import validate_crawl_url 

34from lilbee.runtime.progress import ( 

35 CrawlDoneEvent, 

36 CrawlPageEvent, 

37 CrawlStartEvent, 

38 DetailedProgressCallback, 

39 EventType, 

40) 

41 

42log = logging.getLogger(__name__) 

43 

44 

45def _get_crawl_semaphore() -> asyncio.Semaphore | None: 

46 """Return the process-wide crawl semaphore, or None when unlimited.""" 

47 return get_services().crawler_semaphore 

48 

49 

50def _resolve_limit(value: int | None, cfg_ceiling: int | None) -> int | None: 

51 """Resolve a caller-provided crawl limit to the number the fetcher consumes. 

52 

53 None -> cfg_ceiling (itself may be None; ``None`` means unbounded) 

54 n > 0 -> n (explicit caller intent; cfg is not a ceiling here) 

55 n <= 0 -> ValueError (use None for unbounded, not 0) 

56 """ 

57 effective = value if value is not None else cfg_ceiling 

58 if effective is None: 

59 return None 

60 if effective <= 0: 

61 raise ValueError("crawl limit must be a positive int or None") 

62 return effective 

63 

64 

65async def crawl_single(url: str, *, quiet: bool = False) -> CrawlResult: 

66 """Fetch a single URL. 

67 

68 Raises :class:`CrawlerBackendError` if the crawler extra isn't installed. 

69 """ 

70 validate_crawl_url(url) 

71 from lilbee.crawler import crawler_available 

72 

73 if not crawler_available(): 

74 raise bootstrap.CrawlerBackendError( 

75 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

76 ) 

77 try: 

78 async with Crawl4aiFetcher(quiet=quiet) as fetcher: 

79 page = await fetcher.fetch_single(url, timeout=cfg.crawl_timeout) 

80 return _fetched_to_result(page) 

81 except CrawlerBrowserError: 

82 raise 

83 except Exception as exc: 

84 log.warning("Failed to crawl %s: %s", url, exc) 

85 return CrawlResult(url=url, success=False, error=str(exc)) 

86 

87 

88async def crawl_recursive( 

89 url: str, 

90 max_depth: int | None = None, 

91 max_pages: int | None = None, 

92 on_progress: DetailedProgressCallback | None = None, 

93 cancel: threading.Event | None = None, 

94 *, 

95 quiet: bool = False, 

96 include_subdomains: bool = False, 

97 on_result: Callable[[CrawlResult], Any] | None = None, 

98) -> list[CrawlResult]: 

99 """Crawl a URL recursively using BFS, streaming per-page progress. 

100 

101 None values for ``max_depth`` / ``max_pages`` mean unbounded (constrained 

102 only by whatever ceiling the user has set in ``cfg.crawl_max_{depth,pages}``, 

103 if any). Positive ints are explicit caps. ``CRAWL_PAGE`` events fire as 

104 each page completes; total is ``CRAWL_TOTAL_UNKNOWN`` by default and 

105 promoted to the sitemap count when available. 

106 

107 Pass ``include_subdomains=True`` to broaden scope from the exact host to the 

108 host plus any subdomains. If ``on_result`` is provided, it's called for each 

109 streamed ``CrawlResult`` the moment it arrives so callers can flush pages to 

110 disk incrementally and keep partial output across cancellation. 

111 """ 

112 validate_crawl_url(url) 

113 depth = _resolve_limit(max_depth, cfg.crawl_max_depth) 

114 pages = _resolve_limit(max_pages, cfg.crawl_max_pages) 

115 

116 # Fail fast when the ``crawler`` extra wasn't installed so SSE 

117 # callers see ``event: error`` instead of a silent zero-results run. 

118 from lilbee.crawler import crawler_available 

119 

120 if not crawler_available(): 

121 raise bootstrap.CrawlerBackendError( 

122 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

123 ) 

124 

125 # Fail fast before pulling in backend submodules so callers get a clean 

126 # CrawlerBrowserError instead of a Playwright install banner. 

127 if not bootstrap.chromium_installed(): 

128 raise CrawlerBrowserError( 

129 "Playwright Chromium browser not installed. " 

130 "Run 'uv run playwright install chromium' to enable /crawl." 

131 ) 

132 

133 # Best-effort sitemap lookup so the TUI / CLI can render a real page-count 

134 # denominator instead of [n/-1]. Falls back to CRAWL_TOTAL_UNKNOWN on any 

135 # failure; off the hot path so a slow/missing sitemap never blocks the crawl. 

136 sitemap_total = await asyncio.to_thread( 

137 sitemap._count_sitemap_urls, url, include_subdomains=include_subdomains 

138 ) 

139 

140 concurrency = build_concurrency_spec() 

141 filters = build_filter_spec(include_subdomains=include_subdomains) 

142 

143 results: list[CrawlResult] = [] 

144 try: 

145 async with Crawl4aiFetcher(quiet=quiet) as fetcher: 

146 # Hold an explicit reference to the generator so we can aclose 

147 # it deterministically on break. Without this, the generator's 

148 # finally block (which also short-circuits the BFS strategy) only 

149 # runs at gc time, which is too late for callers that expect the 

150 # strategy to stop the moment we hit ``max_pages``. 

151 page_stream = fetcher.fetch_recursive( 

152 url, 

153 depth=depth, 

154 max_pages=pages, 

155 timeout=cfg.crawl_timeout, 

156 concurrency=concurrency, 

157 filters=filters, 

158 cancel=cancel, 

159 ) 

160 try: 

161 results = await _drain_page_stream( 

162 page_stream, 

163 on_progress=on_progress, 

164 on_result=on_result, 

165 sitemap_total=sitemap_total, 

166 pages_cap=_pages_cap(pages), 

167 cancel=cancel, 

168 ) 

169 finally: 

170 await page_stream.aclose() 

171 except CrawlerBrowserError: 

172 raise 

173 except Exception as exc: 

174 _handle_crawl_teardown_error(url, exc, cancel=cancel, results=results) 

175 

176 return results 

177 

178 

179async def _maybe_periodic_sync(tasks: set[asyncio.Task[None]]) -> None: 

180 """Fire off a background sync if the ``crawl_sync_interval`` has elapsed. 

181 

182 Skips when periodic sync is disabled (``interval=0``) or another sync 

183 is already running. The spawned task is added to ``tasks`` so the 

184 caller can drain it before returning. 

185 """ 

186 interval = cfg.crawl_sync_interval 

187 sync_state = get_services().crawler_sync_state 

188 if interval <= 0 or not sync_state.lock.acquire(blocking=False): 

189 return 

190 

191 now = time.monotonic() 

192 if now - sync_state.last_run < interval: 

193 sync_state.lock.release() 

194 return 

195 

196 sync_state.last_run = now 

197 

198 async def _run_sync() -> None: 

199 try: 

200 from lilbee.data.ingest import sync 

201 

202 await sync(quiet=True) 

203 except Exception as exc: 

204 log.warning("Periodic sync during crawl failed: %s", exc) 

205 finally: 

206 sync_state.lock.release() 

207 

208 task = asyncio.create_task(_run_sync()) 

209 tasks.add(task) 

210 task.add_done_callback(tasks.discard) 

211 

212 

213def _make_flush_page( 

214 meta: dict[str, CrawlMeta], 

215 written_paths: list[Path], 

216 counter: dict[str, int], 

217) -> Callable[[CrawlResult], Any]: 

218 """Build a per-result flush closure that batches metadata writes via ``to_thread``.""" 

219 

220 def _sync_flush(result: CrawlResult) -> Path | None: 

221 outcome = save._save_single_result(result, meta) 

222 if outcome is None: 

223 return None 

224 save._update_single_metadata(meta, result.url, outcome, datetime.now(UTC).isoformat()) 

225 counter["pending"] += 1 

226 if counter["pending"] >= METADATA_FLUSH_INTERVAL: 

227 save.save_crawl_metadata(meta) 

228 counter["pending"] = 0 

229 return outcome.path 

230 

231 async def flush_page(result: CrawlResult) -> Path | None: 

232 path = await asyncio.to_thread(_sync_flush, result) 

233 if path is not None: 

234 written_paths.append(path) 

235 return path 

236 

237 return flush_page 

238 

239 

240async def _ensure_crawler_ready( 

241 on_progress: DetailedProgressCallback | None, 

242) -> None: 

243 """Reject early when the extra is missing; bootstrap Chromium on first use. 

244 

245 Runs before the Chromium bootstrap so a user without [crawler] doesn't pay 

246 the ~160 MB download just to hit the same error afterward. The bootstrap 

247 short-circuits when Chromium is already installed; any progress is forwarded 

248 through ``on_progress`` so downstream UIs surface a 'setup' stage. 

249 """ 

250 from lilbee.crawler import crawler_available 

251 

252 if not crawler_available(): 

253 raise bootstrap.CrawlerBackendError( 

254 "Web crawling is not available. Run 'uv sync --extra crawler' to enable it." 

255 ) 

256 

257 if not bootstrap.chromium_installed(): 

258 await bootstrap.bootstrap_chromium(on_progress=on_progress) 

259 

260 

261async def _run_crawl( 

262 url: str, 

263 *, 

264 depth: int | None, 

265 max_pages: int | None, 

266 on_progress: DetailedProgressCallback | None, 

267 cancel: threading.Event | None, 

268 quiet: bool, 

269 include_subdomains: bool, 

270 flush_page: Callable[[Any], Awaitable[Path | None]], 

271) -> int: 

272 """Run the single-URL or recursive crawl. Returns ``pages_seen``.""" 

273 if depth == 0: 

274 result = await crawl_single(url, quiet=quiet) 

275 try: 

276 await flush_page(result) 

277 except OSError: 

278 log.exception("Flush failed for %s", result.url) 

279 if on_progress: 

280 on_progress(EventType.CRAWL_PAGE, CrawlPageEvent(url=url, current=1, total=1)) 

281 return 1 

282 results = await crawl_recursive( 

283 url, 

284 max_depth=depth, 

285 max_pages=max_pages, 

286 on_progress=on_progress, 

287 cancel=cancel, 

288 quiet=quiet, 

289 include_subdomains=include_subdomains, 

290 on_result=flush_page, 

291 ) 

292 return len(results) 

293 

294 

295async def crawl_and_save( 

296 url: str, 

297 *, 

298 depth: int | None = None, 

299 max_pages: int | None = None, 

300 on_progress: DetailedProgressCallback | None = None, 

301 cancel: threading.Event | None = None, 

302 quiet: bool = False, 

303 include_subdomains: bool = False, 

304) -> list[Path]: 

305 """Crawl URL(s), save as markdown, update metadata. Returns paths written. 

306 

307 ``depth``: ``None`` = whole-site unbounded recursion (default). ``0`` = 

308 single URL, no recursion. ``N > 0`` = max link-follow depth. ``max_pages``: 

309 ``None`` = no limit, positive int = cap. ``cfg.crawl_max_{depth,pages}`` act 

310 as ceilings applied only when ``depth``/``max_pages`` are ``None``. 

311 

312 Hash-based change detection: always fetches but only saves changed or new 

313 files. Pages flush to disk as they stream so a cancelled crawl preserves 

314 the pages already fetched. 

315 """ 

316 await _ensure_crawler_ready(on_progress) 

317 

318 sem = _get_crawl_semaphore() 

319 if sem is not None: 

320 await sem.acquire() 

321 tasks: set[asyncio.Task[None]] = set() 

322 try: 

323 if on_progress: 

324 start_depth = depth if depth is not None else 0 

325 on_progress(EventType.CRAWL_START, CrawlStartEvent(url=url, depth=start_depth)) 

326 

327 meta = save.load_crawl_metadata() 

328 written_paths: list[Path] = [] 

329 counter = {"pending": 0} 

330 flush_page = _make_flush_page(meta, written_paths, counter) 

331 

332 pages_seen = await _run_crawl( 

333 url, 

334 depth=depth, 

335 max_pages=max_pages, 

336 on_progress=on_progress, 

337 cancel=cancel, 

338 quiet=quiet, 

339 include_subdomains=include_subdomains, 

340 flush_page=flush_page, 

341 ) 

342 

343 if counter["pending"] > 0: 

344 try: 

345 save.save_crawl_metadata(meta) 

346 except OSError: 

347 log.exception("Final metadata flush failed") 

348 

349 cancelled = cancel is not None and cancel.is_set() 

350 if not cancelled: 

351 await _maybe_periodic_sync(tasks) 

352 

353 if on_progress: 

354 on_progress( 

355 EventType.CRAWL_DONE, 

356 CrawlDoneEvent(pages_crawled=pages_seen, files_written=len(written_paths)), 

357 ) 

358 

359 return written_paths 

360 finally: 

361 # Drain this call's periodic-sync tasks before returning so 

362 # asyncio.run() doesn't close the loop with a pending sync. 

363 if tasks: 

364 await asyncio.gather(*tasks, return_exceptions=True) 

365 if sem is not None: 

366 sem.release()