Coverage for src / lilbee / server / handlers / ingest.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Sync and add-files handlers (SSE-streamed).""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8from collections.abc import AsyncGenerator 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from lilbee.app.ingest import copy_files 

13from lilbee.app.services import get_services 

14from lilbee.core.config import cfg 

15from lilbee.core.security import validate_path_within 

16from lilbee.runtime.ingest_lock import IngestLockRegistry 

17from lilbee.runtime.progress import SseEvent 

18from lilbee.server.handlers.sse import SseStream, sse_done, sse_error, sse_event 

19from lilbee.server.models import AddSummary, SyncSummary 

20 

21if TYPE_CHECKING: 

22 from lilbee.data.ingest import SyncResult 

23 

24log = logging.getLogger(__name__) 

25 

26MAX_ADD_FILES = 100 

27 

28 

29async def _run_sync_with_sentinel( 

30 sse: SseStream, 

31 enable_ocr: bool | None, 

32 force_rebuild: bool = False, 

33 retry_skipped: bool = False, 

34) -> SyncResult: 

35 """Run ingest.sync() and guarantee the drain sentinel is enqueued.""" 

36 from lilbee.app.ingest import temporary_ocr_config 

37 from lilbee.data.ingest import sync 

38 

39 try: 

40 with temporary_ocr_config(enable_ocr): 

41 return await sync( 

42 quiet=True, 

43 on_progress=sse.callback, 

44 cancel=sse.cancel, 

45 force_rebuild=force_rebuild, 

46 retry_skipped=retry_skipped, 

47 ) 

48 finally: 

49 sse.queue.put_nowait(None) 

50 

51 

52async def sync_stream( 

53 *, enable_ocr: bool | None = None, force_rebuild: bool = False, retry_skipped: bool = False 

54) -> AsyncGenerator[str, None]: 

55 """Trigger sync, yield SSE progress events, then done event. 

56 

57 When ``force_rebuild`` is true, the underlying sync drops every table and 

58 re-ingests from ``cfg.documents_dir`` (the REST equivalent of ``lilbee rebuild``). 

59 When ``retry_skipped`` is true, it clears the failed-file markers so files 

60 that were skipped on a previous sync get another attempt, without dropping 

61 the store. 

62 """ 

63 sse = SseStream() 

64 task = asyncio.create_task( 

65 _run_sync_with_sentinel(sse, enable_ocr, force_rebuild, retry_skipped) 

66 ) 

67 async for event in sse.drain(task, "Sync stream"): 

68 yield event 

69 if not sse.cancel.is_set() and task.done() and not task.cancelled(): 

70 exc = task.exception() 

71 if exc is not None: 

72 yield sse_error(str(exc)) 

73 return 

74 yield sse_done(task.result().model_dump()) 

75 

76 

77async def _run_add( 

78 paths: list[str], 

79 force: bool, 

80 enable_ocr: bool | None, 

81 ocr_timeout: float | None, 

82 sse: SseStream, 

83) -> AddSummary: 

84 """Copy files and sync, returning the summary for the final done event.""" 

85 from lilbee.app.ingest import temporary_ocr_config 

86 from lilbee.data.ingest import sync 

87 

88 try: 

89 errors: list[str] = [] 

90 valid: list[Path] = [] 

91 for p_str in paths: 

92 p = Path(p_str) 

93 if not p.exists(): 

94 errors.append(p_str) 

95 else: 

96 valid.append(p) 

97 

98 copy_result = copy_files(valid, force=force) 

99 

100 if sse.cancel.is_set(): 

101 return AddSummary(copied=copy_result.copied, skipped=copy_result.skipped, errors=errors) 

102 

103 with temporary_ocr_config(enable_ocr, ocr_timeout): 

104 sync_result = await sync(quiet=True, on_progress=sse.callback, cancel=sse.cancel) 

105 

106 return AddSummary( 

107 copied=copy_result.copied, 

108 skipped=copy_result.skipped, 

109 errors=errors, 

110 sync=SyncSummary(**sync_result.model_dump()), 

111 ) 

112 finally: 

113 sse.queue.put_nowait(None) 

114 

115 

116def validate_add_paths( 

117 data: dict[str, Any], 

118) -> tuple[list[str], bool, bool | None, float | None]: 

119 """Validate add-files input. Raises ValueError on bad input.""" 

120 paths = data.get("paths") 

121 if not isinstance(paths, list) or not paths: 

122 raise ValueError("'paths' must be a non-empty list of strings") 

123 if len(paths) > MAX_ADD_FILES: 

124 raise ValueError(f"Too many files: {len(paths)} exceeds limit of {MAX_ADD_FILES}") 

125 

126 for p_str in paths: 

127 validate_path_within(cfg.documents_dir / Path(p_str).name, cfg.documents_dir) 

128 

129 force = bool(data.get("force", False)) 

130 enable_ocr, ocr_timeout = _parse_ocr_params(data) 

131 return paths, force, enable_ocr, ocr_timeout 

132 

133 

134def _parse_ocr_params(data: dict[str, Any]) -> tuple[bool | None, float | None]: 

135 """Extract and coerce OCR parameters from a request dict.""" 

136 enable_ocr = data.get("enable_ocr") 

137 ocr_timeout = data.get("ocr_timeout") 

138 if enable_ocr is not None: 

139 enable_ocr = bool(enable_ocr) 

140 if ocr_timeout is not None: 

141 ocr_timeout = float(ocr_timeout) 

142 return enable_ocr, ocr_timeout 

143 

144 

145async def add_files_stream(data: dict[str, Any]) -> AsyncGenerator[str, None]: 

146 """Copy files, sync, and yield SSE progress events. 

147 

148 Contended sources emit ``already_ingesting`` and the stream closes 

149 without a ``done`` event, signalling the client to wait rather than retry. 

150 """ 

151 paths = data.get("paths", []) 

152 force = bool(data.get("force", False)) 

153 enable_ocr, ocr_timeout = _parse_ocr_params(data) 

154 

155 registry = get_services().ingest_lock_registry 

156 acquired, busy = await registry.acquire(paths) 

157 try: 

158 for name in busy: 

159 log.info("Rejecting /api/add for %s: already ingesting", name) 

160 yield sse_event(SseEvent.ALREADY_INGESTING, {"source": name}) 

161 

162 if not acquired: 

163 return 

164 

165 sse = SseStream() 

166 task = asyncio.create_task(_run_add(paths, force, enable_ocr, ocr_timeout, sse)) 

167 try: 

168 async for event in sse.drain(task, "Add files stream"): 

169 yield event 

170 if not sse.cancel.is_set() and task.done() and not task.cancelled(): 

171 exc = task.exception() 

172 if exc is not None: 

173 yield sse_error(str(exc)) 

174 return 

175 summary = task.result() 

176 yield sse_done(summary.model_dump()) 

177 finally: 

178 if not task.done(): 

179 task.cancel() 

180 with contextlib.suppress(asyncio.CancelledError, Exception): 

181 await task 

182 finally: 

183 IngestLockRegistry.release(acquired)