Coverage for src / lilbee / crawler / task.py: 100%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Background crawl task management: start, track, and query crawl operations.""" 

2 

3import asyncio 

4import logging 

5import uuid 

6from dataclasses import dataclass, field 

7from datetime import UTC, datetime 

8from enum import StrEnum 

9 

10from lilbee.core.config.enums import CrawlRenderMode 

11from lilbee.crawler import crawl_and_save 

12from lilbee.runtime.progress import ( 

13 CrawlPageEvent, 

14 DetailedProgressCallback, 

15 EventType, 

16 ProgressEvent, 

17) 

18 

19log = logging.getLogger(__name__) 

20 

21# Maximum completed tasks to retain in memory before evicting oldest. 

22_MAX_COMPLETED_TASKS = 100 

23 

24 

25class TaskStatus(StrEnum): 

26 """Lifecycle states for a crawl task.""" 

27 

28 PENDING = "pending" 

29 RUNNING = "running" 

30 DONE = "done" 

31 FAILED = "failed" 

32 

33 

34@dataclass 

35class CrawlTask: 

36 """Tracks a single crawl operation. 

37 

38 depth / max_pages follow the crawl_and_save three-state convention: None = 

39 unbounded, 0 (depth only) = single URL, positive int = explicit cap. 

40 """ 

41 

42 task_id: str 

43 url: str 

44 depth: int | None 

45 max_pages: int | None 

46 render_mode: CrawlRenderMode | None = None 

47 status: TaskStatus = TaskStatus.PENDING 

48 pages_crawled: int = 0 

49 pages_total: int | None = None 

50 error: str | None = None 

51 started_at: str = "" 

52 finished_at: str = "" 

53 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False) 

54 

55 

56class TaskRegistry: 

57 """In-memory registry of active and completed crawl tasks. 

58 A single module-level instance (_registry) is used because task tracking 

59 is inherently per-process state (asyncio.Task references, etc.). 

60 """ 

61 

62 def __init__(self) -> None: 

63 self.tasks: dict[str, CrawlTask] = {} 

64 

65 def clear(self) -> None: 

66 """Remove all tasks from the registry.""" 

67 self.tasks.clear() 

68 

69 

70_registry = TaskRegistry() 

71 

72 

73def now_iso() -> str: 

74 """Current UTC time as ISO 8601 string.""" 

75 return datetime.now(UTC).isoformat() 

76 

77 

78def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback: 

79 """Return a progress callback that updates task fields from crawl events.""" 

80 

81 def _on_progress(event_type: EventType, data: ProgressEvent) -> None: 

82 if event_type == EventType.CRAWL_PAGE: 

83 if not isinstance(data, CrawlPageEvent): 

84 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}") 

85 task.pages_crawled = data.current 

86 task.pages_total = data.total 

87 

88 return _on_progress 

89 

90 

91async def run_crawl(task: CrawlTask) -> None: 

92 """Execute crawl, save results, and trigger sync.""" 

93 task.status = TaskStatus.RUNNING 

94 task.started_at = now_iso() 

95 progress = make_progress_updater(task) 

96 

97 try: 

98 paths = await crawl_and_save( 

99 task.url, 

100 depth=task.depth, 

101 max_pages=task.max_pages, 

102 on_progress=progress, 

103 render_mode=task.render_mode, 

104 ) 

105 task.status = TaskStatus.DONE 

106 task.pages_crawled = task.pages_crawled or len(paths) 

107 task.finished_at = now_iso() 

108 log.info("Crawl complete: %s → %d files", task.url, len(paths)) 

109 try: 

110 from lilbee.data.ingest import sync 

111 

112 await sync(quiet=True) 

113 except Exception: 

114 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True) 

115 except Exception as exc: 

116 task.status = TaskStatus.FAILED 

117 task.error = str(exc) 

118 task.finished_at = now_iso() 

119 log.warning("Crawl failed: %s: %s", task.url, exc) 

120 finally: 

121 task._async_task = None 

122 

123 

124def _evict_completed() -> None: 

125 """Remove completed tasks with the earliest ``finished_at`` when over cap. 

126 

127 Finish order diverges from start order whenever short tasks complete 

128 while a long one is still running, so sort on ``finished_at`` 

129 rather than dict insertion order. 

130 """ 

131 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED) 

132 tasks = _registry.tasks 

133 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses] 

134 excess = len(completed) - _MAX_COMPLETED_TASKS 

135 if excess <= 0: 

136 return 

137 completed.sort(key=lambda pair: pair[1].finished_at) 

138 for tid, _ in completed[:excess]: 

139 del tasks[tid] 

140 

141 

142def start_crawl( 

143 url: str, 

144 depth: int | None = None, 

145 max_pages: int | None = None, 

146 render_mode: CrawlRenderMode | None = None, 

147) -> str: 

148 """Create a crawl task and launch it as an asyncio background task. 

149 

150 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL. 

151 ``render_mode`` of ``None`` defers to ``cfg.crawl_render_mode``. 

152 Returns the task_id for status polling. 

153 """ 

154 _evict_completed() 

155 task_id = uuid.uuid4().hex[:12] 

156 task = CrawlTask( 

157 task_id=task_id, 

158 url=url, 

159 depth=depth, 

160 max_pages=max_pages, 

161 render_mode=render_mode, 

162 ) 

163 _registry.tasks[task_id] = task 

164 task._async_task = asyncio.create_task(run_crawl(task)) 

165 return task_id 

166 

167 

168def get_task(task_id: str) -> CrawlTask | None: 

169 """Look up a crawl task by ID.""" 

170 return _registry.tasks.get(task_id) 

171 

172 

173def list_tasks() -> list[CrawlTask]: 

174 """Return all tracked crawl tasks (active and completed).""" 

175 return list(_registry.tasks.values()) 

176 

177 

178def clear_tasks() -> None: 

179 """Remove all tasks from the registry (for testing).""" 

180 _registry.clear()