Coverage for src / lilbee / crawler / task.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Background crawl task management: start, track, and query crawl operations.""" 

2 

3import asyncio 

4import logging 

5import uuid 

6from dataclasses import dataclass, field 

7from datetime import UTC, datetime 

8from enum import StrEnum 

9 

10from lilbee.crawler import crawl_and_save 

11from lilbee.runtime.progress import ( 

12 CrawlPageEvent, 

13 DetailedProgressCallback, 

14 EventType, 

15 ProgressEvent, 

16) 

17 

18log = logging.getLogger(__name__) 

19 

20# Maximum completed tasks to retain in memory before evicting oldest. 

21_MAX_COMPLETED_TASKS = 100 

22 

23 

24class TaskStatus(StrEnum): 

25 """Lifecycle states for a crawl task.""" 

26 

27 PENDING = "pending" 

28 RUNNING = "running" 

29 DONE = "done" 

30 FAILED = "failed" 

31 

32 

33@dataclass 

34class CrawlTask: 

35 """Tracks a single crawl operation. 

36 

37 depth / max_pages follow the crawl_and_save three-state convention: None = 

38 unbounded, 0 (depth only) = single URL, positive int = explicit cap. 

39 """ 

40 

41 task_id: str 

42 url: str 

43 depth: int | None 

44 max_pages: int | None 

45 status: TaskStatus = TaskStatus.PENDING 

46 pages_crawled: int = 0 

47 pages_total: int | None = None 

48 error: str | None = None 

49 started_at: str = "" 

50 finished_at: str = "" 

51 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False) 

52 

53 

54class TaskRegistry: 

55 """In-memory registry of active and completed crawl tasks. 

56 A single module-level instance (_registry) is used because task tracking 

57 is inherently per-process state (asyncio.Task references, etc.). 

58 """ 

59 

60 def __init__(self) -> None: 

61 self.tasks: dict[str, CrawlTask] = {} 

62 

63 def clear(self) -> None: 

64 """Remove all tasks from the registry.""" 

65 self.tasks.clear() 

66 

67 

68_registry = TaskRegistry() 

69 

70 

71def now_iso() -> str: 

72 """Current UTC time as ISO 8601 string.""" 

73 return datetime.now(UTC).isoformat() 

74 

75 

76def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback: 

77 """Return a progress callback that updates task fields from crawl events.""" 

78 

79 def _on_progress(event_type: EventType, data: ProgressEvent) -> None: 

80 if event_type == EventType.CRAWL_PAGE: 

81 if not isinstance(data, CrawlPageEvent): 

82 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}") 

83 task.pages_crawled = data.current 

84 task.pages_total = data.total 

85 

86 return _on_progress 

87 

88 

89async def run_crawl(task: CrawlTask) -> None: 

90 """Execute crawl, save results, and trigger sync.""" 

91 task.status = TaskStatus.RUNNING 

92 task.started_at = now_iso() 

93 progress = make_progress_updater(task) 

94 

95 try: 

96 paths = await crawl_and_save( 

97 task.url, 

98 depth=task.depth, 

99 max_pages=task.max_pages, 

100 on_progress=progress, 

101 ) 

102 task.status = TaskStatus.DONE 

103 task.pages_crawled = task.pages_crawled or len(paths) 

104 task.finished_at = now_iso() 

105 log.info("Crawl complete: %s → %d files", task.url, len(paths)) 

106 try: 

107 from lilbee.data.ingest import sync 

108 

109 await sync(quiet=True) 

110 except Exception: 

111 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True) 

112 except Exception as exc: 

113 task.status = TaskStatus.FAILED 

114 task.error = str(exc) 

115 task.finished_at = now_iso() 

116 log.warning("Crawl failed: %s: %s", task.url, exc) 

117 finally: 

118 task._async_task = None 

119 

120 

121def _evict_completed() -> None: 

122 """Remove completed tasks with the earliest ``finished_at`` when over cap. 

123 

124 Finish order diverges from start order whenever short tasks complete 

125 while a long one is still running, so sort on ``finished_at`` 

126 rather than dict insertion order. 

127 """ 

128 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED) 

129 tasks = _registry.tasks 

130 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses] 

131 excess = len(completed) - _MAX_COMPLETED_TASKS 

132 if excess <= 0: 

133 return 

134 completed.sort(key=lambda pair: pair[1].finished_at) 

135 for tid, _ in completed[:excess]: 

136 del tasks[tid] 

137 

138 

139def start_crawl( 

140 url: str, 

141 depth: int | None = None, 

142 max_pages: int | None = None, 

143) -> str: 

144 """Create a crawl task and launch it as an asyncio background task. 

145 

146 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL. 

147 Returns the task_id for status polling. 

148 """ 

149 _evict_completed() 

150 task_id = uuid.uuid4().hex[:12] 

151 task = CrawlTask( 

152 task_id=task_id, 

153 url=url, 

154 depth=depth, 

155 max_pages=max_pages, 

156 ) 

157 _registry.tasks[task_id] = task 

158 task._async_task = asyncio.create_task(run_crawl(task)) 

159 return task_id 

160 

161 

162def get_task(task_id: str) -> CrawlTask | None: 

163 """Look up a crawl task by ID.""" 

164 return _registry.tasks.get(task_id) 

165 

166 

167def list_tasks() -> list[CrawlTask]: 

168 """Return all tracked crawl tasks (active and completed).""" 

169 return list(_registry.tasks.values()) 

170 

171 

172def clear_tasks() -> None: 

173 """Remove all tasks from the registry (for testing).""" 

174 _registry.clear()