Coverage for src / lilbee / server / handlers / ingest.py: 100%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Sync and add-files handlers (SSE-streamed).""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8from collections.abc import AsyncGenerator 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from lilbee.app.ingest import copy_files 

13from lilbee.app.services import get_services 

14from lilbee.core.config import cfg 

15from lilbee.core.security import validate_path_within 

16from lilbee.runtime.ingest_lock import IngestLockRegistry 

17from lilbee.runtime.progress import SseEvent 

18from lilbee.server.handlers.sse import SseStream, sse_done, sse_error, sse_event 

19from lilbee.server.models import AddSummary, SyncSummary 

20 

21if TYPE_CHECKING: 

22 from lilbee.app.dataset import ImportSummary 

23 from lilbee.data.ingest import SyncResult 

24 

25log = logging.getLogger(__name__) 

26 

27MAX_ADD_FILES = 100 

28 

29 

30async def _run_sync_with_sentinel( 

31 sse: SseStream, 

32 enable_ocr: bool | None, 

33 force_rebuild: bool = False, 

34 retry_skipped: bool = False, 

35) -> SyncResult: 

36 """Run ingest.sync() and guarantee the drain sentinel is enqueued.""" 

37 from lilbee.app.ingest import temporary_ocr_config 

38 from lilbee.data.ingest import sync 

39 

40 try: 

41 with temporary_ocr_config(enable_ocr): 

42 return await sync( 

43 quiet=True, 

44 on_progress=sse.callback, 

45 cancel=sse.cancel, 

46 force_rebuild=force_rebuild, 

47 retry_skipped=retry_skipped, 

48 ) 

49 finally: 

50 sse.queue.put_nowait(None) 

51 

52 

53async def sync_stream( 

54 *, enable_ocr: bool | None = None, force_rebuild: bool = False, retry_skipped: bool = False 

55) -> AsyncGenerator[str, None]: 

56 """Trigger sync, yield SSE progress events, then done event. 

57 

58 When ``force_rebuild`` is true, the underlying sync drops every table and 

59 re-ingests from ``cfg.documents_dir`` (the REST equivalent of ``lilbee rebuild``). 

60 When ``retry_skipped`` is true, it clears the failed-file markers so files 

61 that were skipped on a previous sync get another attempt, without dropping 

62 the store. 

63 """ 

64 sse = SseStream() 

65 task = asyncio.create_task( 

66 _run_sync_with_sentinel(sse, enable_ocr, force_rebuild, retry_skipped) 

67 ) 

68 async for event in sse.drain(task, "Sync stream"): 

69 yield event 

70 if not sse.cancel.is_set() and task.done() and not task.cancelled(): 

71 exc = task.exception() 

72 if exc is not None: 

73 yield sse_error(str(exc)) 

74 return 

75 yield sse_done(task.result().model_dump()) 

76 

77 

78async def _run_add( 

79 paths: list[str], 

80 force: bool, 

81 enable_ocr: bool | None, 

82 ocr_timeout: float | None, 

83 sse: SseStream, 

84) -> AddSummary: 

85 """Copy files and sync, returning the summary for the final done event.""" 

86 from lilbee.app.ingest import temporary_ocr_config 

87 from lilbee.data.ingest import sync 

88 

89 try: 

90 errors: list[str] = [] 

91 valid: list[Path] = [] 

92 for p_str in paths: 

93 p = Path(p_str) 

94 if not p.exists(): 

95 errors.append(p_str) 

96 else: 

97 valid.append(p) 

98 

99 copy_result = copy_files(valid, force=force) 

100 

101 if sse.cancel.is_set(): 

102 return AddSummary(copied=copy_result.copied, skipped=copy_result.skipped, errors=errors) 

103 

104 with temporary_ocr_config(enable_ocr, ocr_timeout): 

105 sync_result = await sync(quiet=True, on_progress=sse.callback, cancel=sse.cancel) 

106 

107 return AddSummary( 

108 copied=copy_result.copied, 

109 skipped=copy_result.skipped, 

110 errors=errors, 

111 sync=SyncSummary(**sync_result.model_dump()), 

112 ) 

113 finally: 

114 sse.queue.put_nowait(None) 

115 

116 

117def validate_add_paths( 

118 data: dict[str, Any], 

119) -> tuple[list[str], bool, bool | None, float | None]: 

120 """Validate add-files input. Raises ValueError on bad input.""" 

121 paths = data.get("paths") 

122 if not isinstance(paths, list) or not paths: 

123 raise ValueError("'paths' must be a non-empty list of strings") 

124 if len(paths) > MAX_ADD_FILES: 

125 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}") 

126 

127 for p_str in paths: 

128 validate_path_within(cfg.documents_dir / Path(p_str).name, cfg.documents_dir) 

129 

130 force = bool(data.get("force", False)) 

131 enable_ocr, ocr_timeout = _parse_ocr_params(data) 

132 return paths, force, enable_ocr, ocr_timeout 

133 

134 

135def _parse_ocr_params(data: dict[str, Any]) -> tuple[bool | None, float | None]: 

136 """Extract and coerce OCR parameters from a request dict.""" 

137 enable_ocr = data.get("enable_ocr") 

138 ocr_timeout = data.get("ocr_timeout") 

139 if enable_ocr is not None: 

140 enable_ocr = bool(enable_ocr) 

141 if ocr_timeout is not None: 

142 ocr_timeout = float(ocr_timeout) 

143 return enable_ocr, ocr_timeout 

144 

145 

146async def add_files_stream(data: dict[str, Any]) -> AsyncGenerator[str, None]: 

147 """Copy files, sync, and yield SSE progress events. 

148 

149 Contended sources emit ``already_ingesting`` and the stream closes 

150 without a ``done`` event, signalling the client to wait rather than retry. 

151 """ 

152 paths = data.get("paths", []) 

153 force = bool(data.get("force", False)) 

154 enable_ocr, ocr_timeout = _parse_ocr_params(data) 

155 

156 registry = get_services().ingest_lock_registry 

157 acquired, busy = await registry.acquire(paths) 

158 try: 

159 for name in busy: 

160 log.info("Rejecting /api/add for %s: already ingesting", name) 

161 yield sse_event(SseEvent.ALREADY_INGESTING, {"source": name}) 

162 

163 if not acquired: 

164 return 

165 

166 sse = SseStream() 

167 task = asyncio.create_task(_run_add(paths, force, enable_ocr, ocr_timeout, sse)) 

168 try: 

169 async for event in sse.drain(task, "Add files stream"): 

170 yield event 

171 if not sse.cancel.is_set() and task.done() and not task.cancelled(): 

172 exc = task.exception() 

173 if exc is not None: 

174 yield sse_error(str(exc)) 

175 return 

176 summary = task.result() 

177 yield sse_done(summary.model_dump()) 

178 finally: 

179 if not task.done(): 

180 task.cancel() 

181 with contextlib.suppress(asyncio.CancelledError, Exception): 

182 await task 

183 finally: 

184 IngestLockRegistry.release(acquired) 

185 

186 

187async def _run_import_with_sentinel(sse: SseStream, data: bytes, fmt: str) -> ImportSummary: 

188 """Run the dataset import and guarantee the drain sentinel is enqueued.""" 

189 from lilbee.app.dataset import import_from_bytes 

190 

191 try: 

192 return await import_from_bytes(data, fmt, on_progress=sse.callback) 

193 finally: 

194 sse.queue.put_nowait(None) 

195 

196 

197async def import_stream(data: bytes, fmt: str) -> AsyncGenerator[str, None]: 

198 """Import a dataset, yield SSE embed-progress events, then a done event.""" 

199 sse = SseStream() 

200 task = asyncio.create_task(_run_import_with_sentinel(sse, data, fmt)) 

201 async for event in sse.drain(task, "Import stream"): 

202 yield event 

203 if not sse.cancel.is_set() and task.done() and not task.cancelled(): 

204 exc = task.exception() 

205 if exc is not None: 

206 yield sse_error(str(exc)) 

207 return 

208 yield sse_done(task.result().model_dump())