Coverage for src / lilbee / data / ingest / pipeline.py: 100%

211 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Top-level sync orchestration: discovery, dispatch, batching, post-sync hooks.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8import threading 

9from pathlib import Path 

10from typing import Any, cast 

11 

12from rich.progress import ( 

13 BarColumn, 

14 MofNCompleteColumn, 

15 Progress, 

16 SpinnerColumn, 

17 TextColumn, 

18 TimeElapsedColumn, 

19) 

20 

21from lilbee.app.services import get_services 

22from lilbee.core.config import cfg 

23from lilbee.data.ingest.code import ingest_code_sync 

24from lilbee.data.ingest.discovery import classify_file, discover_files, file_hash 

25from lilbee.data.ingest.extract import ingest_document, ingest_markdown 

26from lilbee.data.ingest.skip_marker import ( 

27 clear_skip_markers, 

28 load_skip_markers, 

29 write_skip_markers, 

30) 

31from lilbee.data.ingest.types import ChunkRecord, FileToProcess, SyncResult, _IngestResult 

32from lilbee.runtime.asyncio_loop import is_executor_shutdown 

33from lilbee.runtime.cancellation import TaskCancelledError 

34from lilbee.runtime.cpu import cpu_quota 

35from lilbee.runtime.progress import ( 

36 BatchProgressEvent, 

37 BatchStatus, 

38 DetailedProgressCallback, 

39 EventType, 

40 FileDoneEvent, 

41 FileStartEvent, 

42 SyncDoneEvent, 

43 noop_callback, 

44 shared_progress, 

45) 

46 

47log = logging.getLogger(__name__) 

48 

49# Limit concurrent ingestion. Sourced from cpu_quota() so worker storms 

50# can't starve the TUI's asyncio main thread on macOS. 

51_MAX_CONCURRENT = cpu_quota() 

52 

53 

54async def _rebuild_concept_clusters() -> None: 

55 """Re-run Leiden clustering after sync. No-op if disabled.""" 

56 if not cfg.concept_graph: 

57 return 

58 from lilbee.retrieval.concepts import concepts_available 

59 

60 if not concepts_available(): 

61 return 

62 try: 

63 cg = get_services().concepts 

64 if not cg.get_graph(): 

65 return 

66 await asyncio.to_thread(cg.rebuild_clusters) 

67 except Exception: 

68 log.warning("Concept cluster rebuild failed", exc_info=True) 

69 

70 

71async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None: 

72 """Extract and index concepts for ingested chunks. No-op if disabled.""" 

73 if not cfg.concept_graph or not records: 

74 return 

75 from lilbee.retrieval.concepts import concepts_available 

76 

77 if not concepts_available(): 

78 return 

79 try: 

80 cg = get_services().concepts 

81 texts = [r["chunk"] for r in records] 

82 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts) 

83 chunk_ids = [(source_name, r["chunk_index"]) for r in records] 

84 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists) 

85 except Exception: 

86 log.warning("Concept indexing failed for %s", source_name, exc_info=True) 

87 

88 

89async def _ingest_file( 

90 path: Path, 

91 source_name: str, 

92 content_type: str, 

93 *, 

94 quiet: bool = False, 

95 on_progress: DetailedProgressCallback = noop_callback, 

96) -> int: 

97 """Ingest a single file. Returns chunk count.""" 

98 records: list[ChunkRecord] 

99 if content_type == "code": 

100 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress) 

101 elif path.suffix.lower() == ".md": 

102 records = await ingest_markdown(path, source_name, on_progress) 

103 else: 

104 records = await ingest_document( 

105 path, 

106 source_name, 

107 content_type, 

108 quiet=quiet, 

109 on_progress=on_progress, 

110 ) 

111 

112 store = get_services().store 

113 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records)) 

114 await _index_concepts(records, source_name) 

115 return chunk_count 

116 

117 

118def _plan_file_changes( 

119 disk_files: dict[str, Path], 

120 existing_sources: dict[str, str], 

121 cancel: threading.Event | None, 

122 skip_markers: dict[str, str] | None = None, 

123) -> tuple[list[FileToProcess], list[str], list[str], int]: 

124 """Diff disk against the store. Returns (to_process, added, updated, unchanged_count). 

125 

126 A file whose current hash matches a marker in ``skip_markers`` (set by a 

127 prior failed attempt) is treated as unchanged so we don't retry every 

128 sync. Edit the file or run ``/sync --force-rebuild`` to clear the marker 

129 and try again. 

130 """ 

131 skip_markers = skip_markers or {} 

132 files_to_process: list[FileToProcess] = [] 

133 added: list[str] = [] 

134 updated: list[str] = [] 

135 unchanged = 0 

136 for name, path in sorted(disk_files.items()): 

137 if cancel and cancel.is_set(): 

138 break 

139 content_type = classify_file(path) 

140 if content_type is None: 

141 raise ValueError(f"Unsupported file slipped through discovery: {name}") 

142 old_hash = existing_sources.get(name) 

143 current_hash = file_hash(path) 

144 if old_hash == current_hash: 

145 unchanged += 1 

146 continue 

147 if skip_markers.get(name) == current_hash: 

148 # Failed last sync at this exact hash; skip the retry. 

149 unchanged += 1 

150 continue 

151 # needs_cleanup=True unconditionally: delete_by_source is idempotent, 

152 # and this closes the race where a prior ingest wrote chunks but died 

153 # before upsert_source, leaving orphaned chunks that would duplicate. 

154 files_to_process.append( 

155 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True) 

156 ) 

157 if old_hash is not None: 

158 updated.append(name) 

159 else: 

160 added.append(name) 

161 return files_to_process, added, updated, unchanged 

162 

163 

164def detect_pending() -> int: 

165 """Count files in documents/ that are out of sync with the store. 

166 

167 Cheap operation: filesystem walk + SHA-256 hashing + a single 

168 sources-table read. No embedding, no writes. Returns the total of 

169 added + updated + removed, which is what the TaskBar hint surfaces. 

170 Reuses ``_plan_file_changes`` so the diff logic stays single-sourced. 

171 Honors skip markers: a file that failed last time at this hash does 

172 not show up as pending. 

173 """ 

174 if not cfg.documents_dir.exists(): 

175 return 0 

176 disk_files = discover_files() 

177 existing_sources = {s["filename"]: s["file_hash"] for s in get_services().store.get_sources()} 

178 removed = sum(1 for name in existing_sources if name not in disk_files) 

179 skip_markers = load_skip_markers(cfg.data_root) 

180 files_to_process, _, _, _ = _plan_file_changes( 

181 disk_files, existing_sources, cancel=None, skip_markers=skip_markers 

182 ) 

183 return len(files_to_process) + removed 

184 

185 

186def _load_pruned_skip_markers(disk_files: dict[str, Path], *, clear_first: bool) -> dict[str, str]: 

187 """Read the skip-marker file (optionally clearing it first) and drop entries 

188 for files no longer on disk, so the marker set tracks the current corpus.""" 

189 if clear_first: 

190 # Clearing the markers makes the diff re-include the skipped files. 

191 clear_skip_markers(cfg.data_root) 

192 markers = load_skip_markers(cfg.data_root) 

193 if not markers: 

194 return markers 

195 return {name: fhash for name, fhash in markers.items() if name in disk_files} 

196 

197 

198def _persist_skip_markers( 

199 markers: dict[str, str], 

200 pending_hashes: dict[str, str], 

201 *, 

202 succeeded: list[str], 

203 failed: list[str], 

204) -> None: 

205 """Mark files that produced no chunks so the next sync skips them, clear the 

206 markers for files that ingested cleanly, then write the file back.""" 

207 for name in succeeded: 

208 markers.pop(name, None) 

209 for name in failed: 

210 fhash = pending_hashes.get(name) 

211 if fhash: 

212 markers[name] = fhash 

213 write_skip_markers(cfg.data_root, markers) 

214 

215 

216async def sync( 

217 force_rebuild: bool = False, 

218 quiet: bool = False, 

219 *, 

220 on_progress: DetailedProgressCallback = noop_callback, 

221 cancel: threading.Event | None = None, 

222 retry_skipped: bool = False, 

223) -> SyncResult: 

224 """Sync documents/ with the vector store. 

225 Returns a SyncResult with the added/updated/removed/unchanged/failed/skipped lists. 

226 When *quiet* is True, the Rich progress bar is suppressed (for JSON output). 

227 When *cancel* is set, processing stops between files without data loss. 

228 When *retry_skipped* (or *force_rebuild*) is set, the failed-file skip 

229 markers are cleared so this sync attempts every file. 

230 """ 

231 _store = get_services().store 

232 

233 if force_rebuild: 

234 _store.drop_all() 

235 

236 cfg.documents_dir.mkdir(parents=True, exist_ok=True) 

237 

238 disk_files = discover_files() 

239 existing_sources = {s["filename"]: s["file_hash"] for s in _store.get_sources()} 

240 skip_markers = _load_pruned_skip_markers(disk_files, clear_first=force_rebuild or retry_skipped) 

241 

242 removed: list[str] = [] 

243 failed: list[str] = [] 

244 skipped: list[str] = [] 

245 

246 # Find files to remove (in DB but not on disk) 

247 for name in existing_sources: 

248 if name not in disk_files: 

249 _store.delete_by_source(name) 

250 _store.delete_source(name) 

251 removed.append(name) 

252 

253 files_to_process, added, updated, unchanged = _plan_file_changes( 

254 disk_files, existing_sources, cancel, skip_markers=skip_markers 

255 ) 

256 # Track skip markers for files processed this run, keyed by name → hash. 

257 pending_hashes = {entry.name: entry.file_hash for entry in files_to_process} 

258 

259 # Ingest files (with optional progress bar) 

260 if files_to_process: 

261 get_services().embedder.validate_model() 

262 await ingest_batch( 

263 files_to_process, 

264 added, 

265 updated, 

266 failed, 

267 skipped, 

268 quiet=quiet, 

269 on_progress=on_progress, 

270 cancel=cancel, 

271 ) 

272 

273 _persist_skip_markers( 

274 skip_markers, pending_hashes, succeeded=added + updated, failed=failed + skipped 

275 ) 

276 

277 if files_to_process or removed: 

278 _store.ensure_fts_index() 

279 await _rebuild_concept_clusters() 

280 # circular: lilbee.wiki imports lilbee.data.ingest.file_hash, so the 

281 # post-ingest hook stays function-local at this boundary. 

282 from lilbee.wiki.ingest import incremental_update 

283 

284 await incremental_update(set(added) | set(updated) | set(removed)) 

285 

286 result = SyncResult( 

287 added=added, 

288 updated=updated, 

289 removed=removed, 

290 unchanged=unchanged, 

291 failed=failed, 

292 skipped=skipped, 

293 ) 

294 on_progress( 

295 EventType.DONE, 

296 SyncDoneEvent( 

297 added=len(result.added), 

298 updated=len(result.updated), 

299 removed=len(result.removed), 

300 failed=len(result.failed), 

301 skipped=len(result.skipped), 

302 ), 

303 ) 

304 return result 

305 

306 

307async def ingest_batch( 

308 files_to_process: list[FileToProcess], 

309 added: list[str], 

310 updated: list[str], 

311 failed: list[str], 

312 skipped: list[str], 

313 *, 

314 quiet: bool = False, 

315 on_progress: DetailedProgressCallback = noop_callback, 

316 cancel: threading.Event | None = None, 

317) -> None: 

318 """Ingest a batch of files, optionally showing a Rich progress bar. 

319 When *needs_cleanup* is True, old chunks are deleted immediately before 

320 ingesting new ones so the two operations are atomic per file. 

321 When *cancel* is set, pending files raise CancelledError before starting. 

322 """ 

323 semaphore = asyncio.Semaphore(_MAX_CONCURRENT) 

324 total_files = len(files_to_process) 

325 

326 async def _process_one( 

327 name: str, 

328 path: Path, 

329 content_type: str, 

330 fhash: str, 

331 needs_cleanup: bool, 

332 file_index: int, 

333 ) -> _IngestResult: 

334 async with semaphore: 

335 if cancel and cancel.is_set(): 

336 raise asyncio.CancelledError 

337 

338 try: 

339 on_progress( 

340 EventType.FILE_START, 

341 FileStartEvent(file=name, total_files=total_files, current_file=file_index), 

342 ) 

343 except TaskCancelledError as exc: 

344 # FILE_START itself can raise the cooperative cancel signal; 

345 # normalize so _collect_results can drain siblings cleanly. 

346 raise asyncio.CancelledError from exc 

347 try: 

348 if needs_cleanup: 

349 get_services().store.delete_by_source(name) 

350 chunk_count = await _ingest_file( 

351 path, 

352 name, 

353 content_type, 

354 quiet=quiet, 

355 on_progress=on_progress, 

356 ) 

357 on_progress( 

358 EventType.FILE_DONE, 

359 FileDoneEvent(file=name, status="ok", chunks=chunk_count), 

360 ) 

361 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash) 

362 except (asyncio.CancelledError, TaskCancelledError) as exc: 

363 # TaskCancelledError is the TUI's cooperative cancel signal raised 

364 # by reporter.check_cancelled() inside on_progress; treat it as 

365 # asyncio cancellation so _collect_results can drain siblings 

366 # cleanly instead of orphaning their pending exceptions. 

367 raise asyncio.CancelledError from exc 

368 except Exception as exc: 

369 # During shutdown, worker pools raise RuntimeError from 

370 # submit(). Prefer to treat these as cancellation rather than 

371 # as ingest failures. Detect via the cancel flag (source of 

372 # truth) or the executor's well-known shutdown message as a 

373 # fallback when cancel was set after the submit race. 

374 if (cancel and cancel.is_set()) or is_executor_shutdown(exc): 

375 raise asyncio.CancelledError from exc 

376 # Suppress TaskCancelledError on the FILE_DONE notice: the user 

377 # already cancelled, and re-raising here would leak past 

378 # _process_one and strand sibling tasks awaiting in 

379 # _collect_results. 

380 with contextlib.suppress(TaskCancelledError): 

381 on_progress( 

382 EventType.FILE_DONE, 

383 FileDoneEvent(file=name, status="error", chunks=0), 

384 ) 

385 return _IngestResult(name, path, 0, error=exc) 

386 

387 if quiet: 

388 tasks = [ 

389 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

390 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

391 ] 

392 await _collect_results(tasks, added, updated, failed, skipped, on_progress=on_progress) 

393 else: 

394 with Progress( 

395 SpinnerColumn(), 

396 TextColumn("{task.description}"), 

397 BarColumn(), 

398 MofNCompleteColumn(), 

399 TimeElapsedColumn(), 

400 transient=True, 

401 ) as progress: 

402 ptask = progress.add_task("Ingesting documents...", total=total_files) 

403 token = shared_progress.set((progress, ptask)) 

404 try: 

405 tasks = [ 

406 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

407 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

408 ] 

409 await _collect_results( 

410 tasks, 

411 added, 

412 updated, 

413 failed, 

414 skipped, 

415 on_progress=on_progress, 

416 progress=progress, 

417 ptask=ptask, 

418 ) 

419 finally: 

420 shared_progress.reset(token) 

421 

422 

423async def _collect_results( 

424 tasks: list[asyncio.Task[_IngestResult]], 

425 added: list[str], 

426 updated: list[str], 

427 failed: list[str], 

428 skipped: list[str], 

429 *, 

430 on_progress: DetailedProgressCallback = noop_callback, 

431 progress: Progress | None = None, 

432 ptask: Any = None, 

433) -> None: 

434 """Collect task results, optionally updating a Rich progress bar. 

435 

436 On exception (typically asyncio.CancelledError from a user cancel), 

437 cancel every sibling task and await them with ``return_exceptions=True`` 

438 so their pending CancelledErrors don't surface as 

439 "Task exception was never retrieved" warnings. 

440 """ 

441 try: 

442 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1): 

443 result = await fut 

444 _apply_result(result, added, updated, failed, skipped) 

445 if progress is not None and ptask is not None: 

446 desc = ( 

447 f"Ingested {result.name}" if result.error is None else f"Failed {result.name}" 

448 ) 

449 progress.update(ptask, description=desc) 

450 progress.advance(ptask) 

451 if result.error is not None: 

452 progress_status = BatchStatus.FAILED 

453 elif result.chunk_count == 0: 

454 progress_status = BatchStatus.SKIPPED 

455 else: 

456 progress_status = BatchStatus.INGESTED 

457 with contextlib.suppress(TaskCancelledError): 

458 on_progress( 

459 EventType.BATCH_PROGRESS, 

460 BatchProgressEvent( 

461 file=result.name, 

462 status=progress_status, 

463 current=completed_count, 

464 total=len(tasks), 

465 ), 

466 ) 

467 finally: 

468 pending = [t for t in tasks if not t.done()] 

469 for task in pending: 

470 task.cancel() 

471 if pending: 

472 await asyncio.gather(*pending, return_exceptions=True) 

473 

474 

475def _discard_from_list(lst: list[str], value: str) -> None: 

476 """Remove *value* from *lst* if present.""" 

477 with contextlib.suppress(ValueError): 

478 lst.remove(value) 

479 

480 

481def _apply_result( 

482 result: _IngestResult, 

483 added: list[str], 

484 updated: list[str], 

485 failed: list[str], 

486 skipped: list[str], 

487) -> None: 

488 """Record an ingestion result: update store on success, track failure.""" 

489 if result.error is not None: 

490 # Log the error message without the traceback: ingest failures are 

491 # already surfaced to callers via SyncResult.failed, and the raw 

492 # traceback from log.exception bleeds into the TUI chat pane via the 

493 # stderr bridge. Full stack traces stay reachable by 

494 # lowering LILBEE_LOG_LEVEL to DEBUG. 

495 log.warning("Failed to ingest %s: %s", result.name, result.error) 

496 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error) 

497 _discard_from_list(added, result.name) 

498 _discard_from_list(updated, result.name) 

499 failed.append(result.name) 

500 return 

501 if result.chunk_count == 0: 

502 # No chunks produced (e.g. scanned PDF without vision model, or 

503 # vision OCR returned no text). Don't record as a source so it 

504 # gets retried on next sync, and surface as skipped so the user 

505 # knows the file did not actually land in the store. 

506 _discard_from_list(added, result.name) 

507 _discard_from_list(updated, result.name) 

508 skipped.append(result.name) 

509 return 

510 

511 fhash = result.file_hash or file_hash(result.path) 

512 get_services().store.upsert_source(result.name, fhash, result.chunk_count)