Coverage for src / lilbee / crawler / events.py: 100%
45 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Per-page event emission, result translation, and cancel-teardown classification."""
3from __future__ import annotations
5import inspect
6import logging
7import math
8import threading
9from collections.abc import Callable
10from typing import Any
12from lilbee.crawler.models import CrawlResult, FetchedPage
13from lilbee.runtime.progress import (
14 CrawlPageEvent,
15 DetailedProgressCallback,
16 EventType,
17)
19log = logging.getLogger(__name__)
22def _fetched_to_result(page: FetchedPage) -> CrawlResult:
23 """Translate the fetcher's value type to the public ``CrawlResult`` shape."""
24 return CrawlResult(
25 url=page.url,
26 markdown=page.markdown,
27 success=page.success,
28 error=page.error,
29 )
32def _pages_cap(pages: int | None) -> float:
33 """Return the per-result counter ceiling for visible progress.
35 ``None`` (unbounded) maps to ``math.inf`` so the streaming loop's hard
36 cap check is a pure numeric compare with no branching.
37 """
38 return math.inf if pages is None else pages
41async def _drain_page_stream(
42 page_stream: Any,
43 *,
44 on_progress: DetailedProgressCallback | None,
45 on_result: Callable[[CrawlResult], Any] | None,
46 sitemap_total: int,
47 pages_cap: float,
48 cancel: threading.Event | None,
49) -> list[CrawlResult]:
50 """Consume a fetcher's page stream, emitting events and flushing per page.
52 Returns the accumulated ``CrawlResult`` list. The stream is closed
53 deterministically by the caller; this helper only iterates.
54 """
55 results: list[CrawlResult] = []
56 counter = 0
58 def _should_cancel() -> bool:
59 return cancel is not None and cancel.is_set()
61 async for page in page_stream:
62 if _should_cancel():
63 break
64 counter += 1
65 if on_progress:
66 on_progress(
67 EventType.CRAWL_PAGE,
68 CrawlPageEvent(url=page.url, current=counter, total=sitemap_total),
69 )
70 new_result = _fetched_to_result(page)
71 results.append(new_result)
72 if on_result is not None:
73 try:
74 rv = on_result(new_result)
75 if inspect.isawaitable(rv):
76 await rv
77 except OSError:
78 # A disk-side flush failure must not masquerade as a crawl
79 # failure. Log and keep streaming; the caller still sees the
80 # result in its returned list.
81 log.exception("Flush callback failed for %s", new_result.url)
82 # Hard cap on visible progress. The BFS may emit failed / redirected
83 # pages that push the per-result counter past the cap even after the
84 # strategy has stopped dispatching. Break explicitly so the
85 # user-visible count never exceeds the number the caller asked for.
86 if counter >= pages_cap:
87 break
88 return results
91def _handle_crawl_teardown_error(
92 url: str,
93 exc: Exception,
94 *,
95 cancel: threading.Event | None,
96 results: list[CrawlResult],
97) -> None:
98 """Classify a recursive-crawl exception: cancel-teardown vs real failure.
100 After cancel, crawl4ai may raise BrowserContext teardown errors as
101 in-flight URLs bail. That's expected noise, not a failure worth
102 surfacing. Otherwise, log and append a synthetic error result (only
103 when nothing was produced so callers always see at least one entry).
104 """
105 cancelled = cancel is not None and cancel.is_set()
106 if cancelled:
107 log.debug("Recursive crawl of %s ended during cancel teardown: %s", url, exc)
108 return
109 log.warning("Recursive crawl of %s failed: %s", url, exc)
110 if not results:
111 results.append(CrawlResult(url=url, success=False, error=str(exc)))