Coverage for src / lilbee / server / handlers / ingest.py: 100%
117 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Sync and add-files handlers (SSE-streamed)."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8from collections.abc import AsyncGenerator
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from lilbee.app.ingest import copy_files
13from lilbee.app.services import get_services
14from lilbee.core.config import cfg
15from lilbee.core.security import validate_path_within
16from lilbee.runtime.ingest_lock import IngestLockRegistry
17from lilbee.runtime.progress import SseEvent
18from lilbee.server.handlers.sse import SseStream, sse_done, sse_error, sse_event
19from lilbee.server.models import AddSummary, SyncSummary
21if TYPE_CHECKING:
22 from lilbee.app.dataset import ImportSummary
23 from lilbee.data.ingest import SyncResult
25log = logging.getLogger(__name__)
27MAX_ADD_FILES = 100
30async def _run_sync_with_sentinel(
31 sse: SseStream,
32 enable_ocr: bool | None,
33 force_rebuild: bool = False,
34 retry_skipped: bool = False,
35) -> SyncResult:
36 """Run ingest.sync() and guarantee the drain sentinel is enqueued."""
37 from lilbee.app.ingest import temporary_ocr_config
38 from lilbee.data.ingest import sync
40 try:
41 with temporary_ocr_config(enable_ocr):
42 return await sync(
43 quiet=True,
44 on_progress=sse.callback,
45 cancel=sse.cancel,
46 force_rebuild=force_rebuild,
47 retry_skipped=retry_skipped,
48 )
49 finally:
50 sse.queue.put_nowait(None)
53async def sync_stream(
54 *, enable_ocr: bool | None = None, force_rebuild: bool = False, retry_skipped: bool = False
55) -> AsyncGenerator[str, None]:
56 """Trigger sync, yield SSE progress events, then done event.
58 When ``force_rebuild`` is true, the underlying sync drops every table and
59 re-ingests from ``cfg.documents_dir`` (the REST equivalent of ``lilbee rebuild``).
60 When ``retry_skipped`` is true, it clears the failed-file markers so files
61 that were skipped on a previous sync get another attempt, without dropping
62 the store.
63 """
64 sse = SseStream()
65 task = asyncio.create_task(
66 _run_sync_with_sentinel(sse, enable_ocr, force_rebuild, retry_skipped)
67 )
68 async for event in sse.drain(task, "Sync stream"):
69 yield event
70 if not sse.cancel.is_set() and task.done() and not task.cancelled():
71 exc = task.exception()
72 if exc is not None:
73 yield sse_error(str(exc))
74 return
75 yield sse_done(task.result().model_dump())
78async def _run_add(
79 paths: list[str],
80 force: bool,
81 enable_ocr: bool | None,
82 ocr_timeout: float | None,
83 sse: SseStream,
84) -> AddSummary:
85 """Copy files and sync, returning the summary for the final done event."""
86 from lilbee.app.ingest import temporary_ocr_config
87 from lilbee.data.ingest import sync
89 try:
90 errors: list[str] = []
91 valid: list[Path] = []
92 for p_str in paths:
93 p = Path(p_str)
94 if not p.exists():
95 errors.append(p_str)
96 else:
97 valid.append(p)
99 copy_result = copy_files(valid, force=force)
101 if sse.cancel.is_set():
102 return AddSummary(copied=copy_result.copied, skipped=copy_result.skipped, errors=errors)
104 with temporary_ocr_config(enable_ocr, ocr_timeout):
105 sync_result = await sync(quiet=True, on_progress=sse.callback, cancel=sse.cancel)
107 return AddSummary(
108 copied=copy_result.copied,
109 skipped=copy_result.skipped,
110 errors=errors,
111 sync=SyncSummary(**sync_result.model_dump()),
112 )
113 finally:
114 sse.queue.put_nowait(None)
117def validate_add_paths(
118 data: dict[str, Any],
119) -> tuple[list[str], bool, bool | None, float | None]:
120 """Validate add-files input. Raises ValueError on bad input."""
121 paths = data.get("paths")
122 if not isinstance(paths, list) or not paths:
123 raise ValueError("'paths' must be a non-empty list of strings")
124 if len(paths) > MAX_ADD_FILES:
125 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}")
127 for p_str in paths:
128 validate_path_within(cfg.documents_dir / Path(p_str).name, cfg.documents_dir)
130 force = bool(data.get("force", False))
131 enable_ocr, ocr_timeout = _parse_ocr_params(data)
132 return paths, force, enable_ocr, ocr_timeout
135def _parse_ocr_params(data: dict[str, Any]) -> tuple[bool | None, float | None]:
136 """Extract and coerce OCR parameters from a request dict."""
137 enable_ocr = data.get("enable_ocr")
138 ocr_timeout = data.get("ocr_timeout")
139 if enable_ocr is not None:
140 enable_ocr = bool(enable_ocr)
141 if ocr_timeout is not None:
142 ocr_timeout = float(ocr_timeout)
143 return enable_ocr, ocr_timeout
146async def add_files_stream(data: dict[str, Any]) -> AsyncGenerator[str, None]:
147 """Copy files, sync, and yield SSE progress events.
149 Contended sources emit ``already_ingesting`` and the stream closes
150 without a ``done`` event, signalling the client to wait rather than retry.
151 """
152 paths = data.get("paths", [])
153 force = bool(data.get("force", False))
154 enable_ocr, ocr_timeout = _parse_ocr_params(data)
156 registry = get_services().ingest_lock_registry
157 acquired, busy = await registry.acquire(paths)
158 try:
159 for name in busy:
160 log.info("Rejecting /api/add for %s: already ingesting", name)
161 yield sse_event(SseEvent.ALREADY_INGESTING, {"source": name})
163 if not acquired:
164 return
166 sse = SseStream()
167 task = asyncio.create_task(_run_add(paths, force, enable_ocr, ocr_timeout, sse))
168 try:
169 async for event in sse.drain(task, "Add files stream"):
170 yield event
171 if not sse.cancel.is_set() and task.done() and not task.cancelled():
172 exc = task.exception()
173 if exc is not None:
174 yield sse_error(str(exc))
175 return
176 summary = task.result()
177 yield sse_done(summary.model_dump())
178 finally:
179 if not task.done():
180 task.cancel()
181 with contextlib.suppress(asyncio.CancelledError, Exception):
182 await task
183 finally:
184 IngestLockRegistry.release(acquired)
187async def _run_import_with_sentinel(sse: SseStream, data: bytes, fmt: str) -> ImportSummary:
188 """Run the dataset import and guarantee the drain sentinel is enqueued."""
189 from lilbee.app.dataset import import_from_bytes
191 try:
192 return await import_from_bytes(data, fmt, on_progress=sse.callback)
193 finally:
194 sse.queue.put_nowait(None)
197async def import_stream(data: bytes, fmt: str) -> AsyncGenerator[str, None]:
198 """Import a dataset, yield SSE embed-progress events, then a done event."""
199 sse = SseStream()
200 task = asyncio.create_task(_run_import_with_sentinel(sse, data, fmt))
201 async for event in sse.drain(task, "Import stream"):
202 yield event
203 if not sse.cancel.is_set() and task.done() and not task.cancelled():
204 exc = task.exception()
205 if exc is not None:
206 yield sse_error(str(exc))
207 return
208 yield sse_done(task.result().model_dump())