Coverage for src / lilbee / server / handlers / ingest.py: 100%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Sync and add-files handlers (SSE-streamed)."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8from collections.abc import AsyncGenerator
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from lilbee.app.ingest import copy_files
13from lilbee.app.services import get_services
14from lilbee.core.config import cfg
15from lilbee.core.security import validate_path_within
16from lilbee.runtime.ingest_lock import IngestLockRegistry
17from lilbee.runtime.progress import SseEvent
18from lilbee.server.handlers.sse import SseStream, sse_done, sse_error, sse_event
19from lilbee.server.models import AddSummary, SyncSummary
21if TYPE_CHECKING:
22 from lilbee.data.ingest import SyncResult
24log = logging.getLogger(__name__)
26MAX_ADD_FILES = 100
29async def _run_sync_with_sentinel(
30 sse: SseStream,
31 enable_ocr: bool | None,
32 force_rebuild: bool = False,
33 retry_skipped: bool = False,
34) -> SyncResult:
35 """Run ingest.sync() and guarantee the drain sentinel is enqueued."""
36 from lilbee.app.ingest import temporary_ocr_config
37 from lilbee.data.ingest import sync
39 try:
40 with temporary_ocr_config(enable_ocr):
41 return await sync(
42 quiet=True,
43 on_progress=sse.callback,
44 cancel=sse.cancel,
45 force_rebuild=force_rebuild,
46 retry_skipped=retry_skipped,
47 )
48 finally:
49 sse.queue.put_nowait(None)
52async def sync_stream(
53 *, enable_ocr: bool | None = None, force_rebuild: bool = False, retry_skipped: bool = False
54) -> AsyncGenerator[str, None]:
55 """Trigger sync, yield SSE progress events, then done event.
57 When ``force_rebuild`` is true, the underlying sync drops every table and
58 re-ingests from ``cfg.documents_dir`` (the REST equivalent of ``lilbee rebuild``).
59 When ``retry_skipped`` is true, it clears the failed-file markers so files
60 that were skipped on a previous sync get another attempt, without dropping
61 the store.
62 """
63 sse = SseStream()
64 task = asyncio.create_task(
65 _run_sync_with_sentinel(sse, enable_ocr, force_rebuild, retry_skipped)
66 )
67 async for event in sse.drain(task, "Sync stream"):
68 yield event
69 if not sse.cancel.is_set() and task.done() and not task.cancelled():
70 exc = task.exception()
71 if exc is not None:
72 yield sse_error(str(exc))
73 return
74 yield sse_done(task.result().model_dump())
77async def _run_add(
78 paths: list[str],
79 force: bool,
80 enable_ocr: bool | None,
81 ocr_timeout: float | None,
82 sse: SseStream,
83) -> AddSummary:
84 """Copy files and sync, returning the summary for the final done event."""
85 from lilbee.app.ingest import temporary_ocr_config
86 from lilbee.data.ingest import sync
88 try:
89 errors: list[str] = []
90 valid: list[Path] = []
91 for p_str in paths:
92 p = Path(p_str)
93 if not p.exists():
94 errors.append(p_str)
95 else:
96 valid.append(p)
98 copy_result = copy_files(valid, force=force)
100 if sse.cancel.is_set():
101 return AddSummary(copied=copy_result.copied, skipped=copy_result.skipped, errors=errors)
103 with temporary_ocr_config(enable_ocr, ocr_timeout):
104 sync_result = await sync(quiet=True, on_progress=sse.callback, cancel=sse.cancel)
106 return AddSummary(
107 copied=copy_result.copied,
108 skipped=copy_result.skipped,
109 errors=errors,
110 sync=SyncSummary(**sync_result.model_dump()),
111 )
112 finally:
113 sse.queue.put_nowait(None)
116def validate_add_paths(
117 data: dict[str, Any],
118) -> tuple[list[str], bool, bool | None, float | None]:
119 """Validate add-files input. Raises ValueError on bad input."""
120 paths = data.get("paths")
121 if not isinstance(paths, list) or not paths:
122 raise ValueError("'paths' must be a non-empty list of strings")
123 if len(paths) > MAX_ADD_FILES:
124 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}")
126 for p_str in paths:
127 validate_path_within(cfg.documents_dir / Path(p_str).name, cfg.documents_dir)
129 force = bool(data.get("force", False))
130 enable_ocr, ocr_timeout = _parse_ocr_params(data)
131 return paths, force, enable_ocr, ocr_timeout
134def _parse_ocr_params(data: dict[str, Any]) -> tuple[bool | None, float | None]:
135 """Extract and coerce OCR parameters from a request dict."""
136 enable_ocr = data.get("enable_ocr")
137 ocr_timeout = data.get("ocr_timeout")
138 if enable_ocr is not None:
139 enable_ocr = bool(enable_ocr)
140 if ocr_timeout is not None:
141 ocr_timeout = float(ocr_timeout)
142 return enable_ocr, ocr_timeout
145async def add_files_stream(data: dict[str, Any]) -> AsyncGenerator[str, None]:
146 """Copy files, sync, and yield SSE progress events.
148 Contended sources emit ``already_ingesting`` and the stream closes
149 without a ``done`` event, signalling the client to wait rather than retry.
150 """
151 paths = data.get("paths", [])
152 force = bool(data.get("force", False))
153 enable_ocr, ocr_timeout = _parse_ocr_params(data)
155 registry = get_services().ingest_lock_registry
156 acquired, busy = await registry.acquire(paths)
157 try:
158 for name in busy:
159 log.info("Rejecting /api/add for %s: already ingesting", name)
160 yield sse_event(SseEvent.ALREADY_INGESTING, {"source": name})
162 if not acquired:
163 return
165 sse = SseStream()
166 task = asyncio.create_task(_run_add(paths, force, enable_ocr, ocr_timeout, sse))
167 try:
168 async for event in sse.drain(task, "Add files stream"):
169 yield event
170 if not sse.cancel.is_set() and task.done() and not task.cancelled():
171 exc = task.exception()
172 if exc is not None:
173 yield sse_error(str(exc))
174 return
175 summary = task.result()
176 yield sse_done(summary.model_dump())
177 finally:
178 if not task.done():
179 task.cancel()
180 with contextlib.suppress(asyncio.CancelledError, Exception):
181 await task
182 finally:
183 IngestLockRegistry.release(acquired)