Coverage for src / lilbee / crawler / events.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Per-page event emission, result translation, and cancel-teardown classification.""" 

2 

3from __future__ import annotations 

4 

5import inspect 

6import logging 

7import math 

8import threading 

9from collections.abc import Callable 

10from typing import Any 

11 

12from lilbee.crawler.models import CrawlResult, FetchedPage 

13from lilbee.runtime.progress import ( 

14 CrawlPageEvent, 

15 DetailedProgressCallback, 

16 EventType, 

17) 

18 

19log = logging.getLogger(__name__) 

20 

21 

22def _fetched_to_result(page: FetchedPage) -> CrawlResult: 

23 """Translate the fetcher's value type to the public ``CrawlResult`` shape.""" 

24 return CrawlResult( 

25 url=page.url, 

26 markdown=page.markdown, 

27 success=page.success, 

28 error=page.error, 

29 ) 

30 

31 

32def _pages_cap(pages: int | None) -> float: 

33 """Return the per-result counter ceiling for visible progress. 

34 

35 ``None`` (unbounded) maps to ``math.inf`` so the streaming loop's hard 

36 cap check is a pure numeric compare with no branching. 

37 """ 

38 return math.inf if pages is None else pages 

39 

40 

41async def _drain_page_stream( 

42 page_stream: Any, 

43 *, 

44 on_progress: DetailedProgressCallback | None, 

45 on_result: Callable[[CrawlResult], Any] | None, 

46 sitemap_total: int, 

47 pages_cap: float, 

48 cancel: threading.Event | None, 

49) -> list[CrawlResult]: 

50 """Consume a fetcher's page stream, emitting events and flushing per page. 

51 

52 Returns the accumulated ``CrawlResult`` list. The stream is closed 

53 deterministically by the caller; this helper only iterates. 

54 """ 

55 results: list[CrawlResult] = [] 

56 counter = 0 

57 

58 def _should_cancel() -> bool: 

59 return cancel is not None and cancel.is_set() 

60 

61 async for page in page_stream: 

62 if _should_cancel(): 

63 break 

64 counter += 1 

65 if on_progress: 

66 on_progress( 

67 EventType.CRAWL_PAGE, 

68 CrawlPageEvent(url=page.url, current=counter, total=sitemap_total), 

69 ) 

70 new_result = _fetched_to_result(page) 

71 results.append(new_result) 

72 if on_result is not None: 

73 try: 

74 rv = on_result(new_result) 

75 if inspect.isawaitable(rv): 

76 await rv 

77 except OSError: 

78 # A disk-side flush failure must not masquerade as a crawl 

79 # failure. Log and keep streaming; the caller still sees the 

80 # result in its returned list. 

81 log.exception("Flush callback failed for %s", new_result.url) 

82 # Hard cap on visible progress. The BFS may emit failed / redirected 

83 # pages that push the per-result counter past the cap even after the 

84 # strategy has stopped dispatching. Break explicitly so the 

85 # user-visible count never exceeds the number the caller asked for. 

86 if counter >= pages_cap: 

87 break 

88 return results 

89 

90 

91def _handle_crawl_teardown_error( 

92 url: str, 

93 exc: Exception, 

94 *, 

95 cancel: threading.Event | None, 

96 results: list[CrawlResult], 

97) -> None: 

98 """Classify a recursive-crawl exception: cancel-teardown vs real failure. 

99 

100 After cancel, crawl4ai may raise BrowserContext teardown errors as 

101 in-flight URLs bail. That's expected noise, not a failure worth 

102 surfacing. Otherwise, log and append a synthetic error result (only 

103 when nothing was produced so callers always see at least one entry). 

104 """ 

105 cancelled = cancel is not None and cancel.is_set() 

106 if cancelled: 

107 log.debug("Recursive crawl of %s ended during cancel teardown: %s", url, exc) 

108 return 

109 log.warning("Recursive crawl of %s failed: %s", url, exc) 

110 if not results: 

111 results.append(CrawlResult(url=url, success=False, error=str(exc)))