Coverage for src / lilbee / crawler / task.py: 100%
88 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Background crawl task management: start, track, and query crawl operations."""
3import asyncio
4import logging
5import uuid
6from dataclasses import dataclass, field
7from datetime import UTC, datetime
8from enum import StrEnum
10from lilbee.crawler import crawl_and_save
11from lilbee.runtime.progress import (
12 CrawlPageEvent,
13 DetailedProgressCallback,
14 EventType,
15 ProgressEvent,
16)
18log = logging.getLogger(__name__)
20# Maximum completed tasks to retain in memory before evicting oldest.
21_MAX_COMPLETED_TASKS = 100
24class TaskStatus(StrEnum):
25 """Lifecycle states for a crawl task."""
27 PENDING = "pending"
28 RUNNING = "running"
29 DONE = "done"
30 FAILED = "failed"
33@dataclass
34class CrawlTask:
35 """Tracks a single crawl operation.
37 depth / max_pages follow the crawl_and_save three-state convention: None =
38 unbounded, 0 (depth only) = single URL, positive int = explicit cap.
39 """
41 task_id: str
42 url: str
43 depth: int | None
44 max_pages: int | None
45 status: TaskStatus = TaskStatus.PENDING
46 pages_crawled: int = 0
47 pages_total: int | None = None
48 error: str | None = None
49 started_at: str = ""
50 finished_at: str = ""
51 _async_task: asyncio.Task[None] | None = field(default=None, repr=False, init=False)
54class TaskRegistry:
55 """In-memory registry of active and completed crawl tasks.
56 A single module-level instance (_registry) is used because task tracking
57 is inherently per-process state (asyncio.Task references, etc.).
58 """
60 def __init__(self) -> None:
61 self.tasks: dict[str, CrawlTask] = {}
63 def clear(self) -> None:
64 """Remove all tasks from the registry."""
65 self.tasks.clear()
68_registry = TaskRegistry()
71def now_iso() -> str:
72 """Current UTC time as ISO 8601 string."""
73 return datetime.now(UTC).isoformat()
76def make_progress_updater(task: CrawlTask) -> DetailedProgressCallback:
77 """Return a progress callback that updates task fields from crawl events."""
79 def _on_progress(event_type: EventType, data: ProgressEvent) -> None:
80 if event_type == EventType.CRAWL_PAGE:
81 if not isinstance(data, CrawlPageEvent):
82 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}")
83 task.pages_crawled = data.current
84 task.pages_total = data.total
86 return _on_progress
89async def run_crawl(task: CrawlTask) -> None:
90 """Execute crawl, save results, and trigger sync."""
91 task.status = TaskStatus.RUNNING
92 task.started_at = now_iso()
93 progress = make_progress_updater(task)
95 try:
96 paths = await crawl_and_save(
97 task.url,
98 depth=task.depth,
99 max_pages=task.max_pages,
100 on_progress=progress,
101 )
102 task.status = TaskStatus.DONE
103 task.pages_crawled = task.pages_crawled or len(paths)
104 task.finished_at = now_iso()
105 log.info("Crawl complete: %s → %d files", task.url, len(paths))
106 try:
107 from lilbee.data.ingest import sync
109 await sync(quiet=True)
110 except Exception:
111 log.warning("Post-crawl sync failed for %s", task.url, exc_info=True)
112 except Exception as exc:
113 task.status = TaskStatus.FAILED
114 task.error = str(exc)
115 task.finished_at = now_iso()
116 log.warning("Crawl failed: %s: %s", task.url, exc)
117 finally:
118 task._async_task = None
121def _evict_completed() -> None:
122 """Remove completed tasks with the earliest ``finished_at`` when over cap.
124 Finish order diverges from start order whenever short tasks complete
125 while a long one is still running, so sort on ``finished_at``
126 rather than dict insertion order.
127 """
128 done_statuses = (TaskStatus.DONE, TaskStatus.FAILED)
129 tasks = _registry.tasks
130 completed = [(tid, t) for tid, t in tasks.items() if t.status in done_statuses]
131 excess = len(completed) - _MAX_COMPLETED_TASKS
132 if excess <= 0:
133 return
134 completed.sort(key=lambda pair: pair[1].finished_at)
135 for tid, _ in completed[:excess]:
136 del tasks[tid]
139def start_crawl(
140 url: str,
141 depth: int | None = None,
142 max_pages: int | None = None,
143) -> str:
144 """Create a crawl task and launch it as an asyncio background task.
146 Defaults to whole-site unbounded recursion. Pass depth=0 for single URL.
147 Returns the task_id for status polling.
148 """
149 _evict_completed()
150 task_id = uuid.uuid4().hex[:12]
151 task = CrawlTask(
152 task_id=task_id,
153 url=url,
154 depth=depth,
155 max_pages=max_pages,
156 )
157 _registry.tasks[task_id] = task
158 task._async_task = asyncio.create_task(run_crawl(task))
159 return task_id
162def get_task(task_id: str) -> CrawlTask | None:
163 """Look up a crawl task by ID."""
164 return _registry.tasks.get(task_id)
167def list_tasks() -> list[CrawlTask]:
168 """Return all tracked crawl tasks (active and completed)."""
169 return list(_registry.tasks.values())
172def clear_tasks() -> None:
173 """Remove all tasks from the registry (for testing)."""
174 _registry.clear()