Coverage for src / lilbee / crawler / task.py: 100%
90 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Background crawl task management: start, track, and query crawl operations."""
3import asyncio
4import logging
5import uuid
6from dataclasses import dataclass, field
7from datetime import UTC, datetime
8from enum import StrEnum
10from lilbee.core.config.enums import CrawlRenderMode
11from lilbee.crawler import crawl_and_save
12from lilbee.runtime.progress import (
13 CrawlPageEvent,
14 DetailedProgressCallback,
15 EventType,
16 ProgressEvent,
17)
19log = logging.getLogger(__name__)
21# Maximum completed tasks to retain in memory before evicting oldest.
22_MAX_COMPLETED_TASKS = 100
25class TaskStatus(StrEnum):
26 """Lifecycle states for a crawl task."""
28 PENDING = "pending"
29 RUNNING = "running"
30 DONE = "done"
31 FAILED = "failed"
34@dataclass
35class CrawlTask:
36 """Tracks a single crawl operation.
38 depth / max_pages follow the crawl_and_save three-state convention: None =
39 unbounded, 0 (depth only) = single URL, positive int = explicit cap.
40 """
42 task_id: str
43 url: str
44 depth: int | None
45 max_pages: int | None
46 render_mode: CrawlRenderMode | None = None
47 status: TaskStatus = TaskStatus.PENDING
48 pages_crawled: int = 0
49 pages_total: int | None = None
50 error: str | None = None
51 started_at: str = ""
52 finished_at: str = ""
53 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False)
56class TaskRegistry:
57 """In-memory registry of active and completed crawl tasks.
58 A single module-level instance (_registry) is used because task tracking
59 is inherently per-process state (asyncio.Task references, etc.).
60 """
62 def __init__(self) -> None:
63 self.tasks: dict[str, CrawlTask] = {}
65 def clear(self) -> None:
66 """Remove all tasks from the registry."""
67 self.tasks.clear()
70_registry = TaskRegistry()
73def now_iso() -> str:
74 """Current UTC time as ISO 8601 string."""
75 return datetime.now(UTC).isoformat()
78def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback:
79 """Return a progress callback that updates task fields from crawl events."""
81 def _on_progress(event_type: EventType, data: ProgressEvent) -> None:
82 if event_type == EventType.CRAWL_PAGE:
83 if not isinstance(data, CrawlPageEvent):
84 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}")
85 task.pages_crawled = data.current
86 task.pages_total = data.total
88 return _on_progress
91async def run_crawl(task: CrawlTask) -> None:
92 """Execute crawl, save results, and trigger sync."""
93 task.status = TaskStatus.RUNNING
94 task.started_at = now_iso()
95 progress = make_progress_updater(task)
97 try:
98 paths = await crawl_and_save(
99 task.url,
100 depth=task.depth,
101 max_pages=task.max_pages,
102 on_progress=progress,
103 render_mode=task.render_mode,
104 )
105 task.status = TaskStatus.DONE
106 task.pages_crawled = task.pages_crawled or len(paths)
107 task.finished_at = now_iso()
108 log.info("Crawl complete: %s → %d files", task.url, len(paths))
109 try:
110 from lilbee.data.ingest import sync
112 await sync(quiet=True)
113 except Exception:
114 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True)
115 except Exception as exc:
116 task.status = TaskStatus.FAILED
117 task.error = str(exc)
118 task.finished_at = now_iso()
119 log.warning("Crawl failed: %s: %s", task.url, exc)
120 finally:
121 task._async_task = None
124def _evict_completed() -> None:
125 """Remove completed tasks with the earliest ``finished_at`` when over cap.
127 Finish order diverges from start order whenever short tasks complete
128 while a long one is still running, so sort on ``finished_at``
129 rather than dict insertion order.
130 """
131 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED)
132 tasks = _registry.tasks
133 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses]
134 excess = len(completed) - _MAX_COMPLETED_TASKS
135 if excess <= 0:
136 return
137 completed.sort(key=lambda pair: pair[1].finished_at)
138 for tid, _ in completed[:excess]:
139 del tasks[tid]
142def start_crawl(
143 url: str,
144 depth: int | None = None,
145 max_pages: int | None = None,
146 render_mode: CrawlRenderMode | None = None,
147) -> str:
148 """Create a crawl task and launch it as an asyncio background task.
150 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL.
151 ``render_mode`` of ``None`` defers to ``cfg.crawl_render_mode``.
152 Returns the task_id for status polling.
153 """
154 _evict_completed()
155 task_id = uuid.uuid4().hex[:12]
156 task = CrawlTask(
157 task_id=task_id,
158 url=url,
159 depth=depth,
160 max_pages=max_pages,
161 render_mode=render_mode,
162 )
163 _registry.tasks[task_id] = task
164 task._async_task = asyncio.create_task(run_crawl(task))
165 return task_id
168def get_task(task_id: str) -> CrawlTask | None:
169 """Look up a crawl task by ID."""
170 return _registry.tasks.get(task_id)
173def list_tasks() -> list[CrawlTask]:
174 """Return all tracked crawl tasks (active and completed)."""
175 return list(_registry.tasks.values())
178def clear_tasks() -> None:
179 """Remove all tasks from the registry (for testing)."""
180 _registry.clear()