Coverage for src / lilbee / data / ingest / pipeline.py: 100%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Top-level sync orchestration: discovery, dispatch, batching, post-sync hooks.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8import threading 

9from pathlib import Path 

10from typing import Any, cast 

11 

12from rich.progress import ( 

13 BarColumn, 

14 MofNCompleteColumn, 

15 Progress, 

16 SpinnerColumn, 

17 TextColumn, 

18 TimeElapsedColumn, 

19) 

20 

21from lilbee.app.services import get_services 

22from lilbee.core.config import cfg 

23from lilbee.data.ingest.code import ingest_code_sync 

24from lilbee.data.ingest.discovery import classify_file, discover_files, file_hash 

25from lilbee.data.ingest.extract import ingest_document, ingest_markdown 

26from lilbee.data.ingest.skip_marker import ( 

27 clear_skip_markers, 

28 load_skip_markers, 

29 write_skip_markers, 

30) 

31from lilbee.data.ingest.types import ChunkRecord, FileToProcess, SyncResult, _IngestResult 

32from lilbee.data.store import PageTextRecord, SourceRecord, SourceType 

33from lilbee.runtime.asyncio_loop import is_executor_shutdown 

34from lilbee.runtime.cancellation import TaskCancelledError 

35from lilbee.runtime.cpu import cpu_quota 

36from lilbee.runtime.progress import ( 

37 BatchProgressEvent, 

38 BatchStatus, 

39 DetailedProgressCallback, 

40 EventType, 

41 FileDoneEvent, 

42 FileStartEvent, 

43 SyncDoneEvent, 

44 noop_callback, 

45 shared_progress, 

46) 

47 

48log = logging.getLogger(__name__) 

49 

50# Limit concurrent ingestion. Sourced from cpu_quota() so worker storms 

51# can't starve the TUI's asyncio main thread on macOS. 

52_MAX_CONCURRENT = cpu_quota() 

53 

54 

55async def _rebuild_concept_clusters() -> None: 

56 """Re-run Leiden clustering after sync. No-op if disabled.""" 

57 if not cfg.concept_graph: 

58 return 

59 from lilbee.retrieval.concepts import concepts_available 

60 

61 if not concepts_available(): 

62 return 

63 try: 

64 cg = get_services().concepts 

65 if not cg.get_graph(): 

66 return 

67 await asyncio.to_thread(cg.rebuild_clusters) 

68 except Exception: 

69 log.warning("Concept cluster rebuild failed", exc_info=True) 

70 

71 

72async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None: 

73 """Extract and index concepts for ingested chunks. No-op if disabled.""" 

74 if not cfg.concept_graph or not records: 

75 return 

76 from lilbee.retrieval.concepts import concepts_available 

77 

78 if not concepts_available(): 

79 return 

80 try: 

81 cg = get_services().concepts 

82 texts = [r["chunk"] for r in records] 

83 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts) 

84 chunk_ids = [(source_name, r["chunk_index"]) for r in records] 

85 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists) 

86 except Exception: 

87 log.warning("Concept indexing failed for %s", source_name, exc_info=True) 

88 

89 

90async def _ingest_file( 

91 path: Path, 

92 source_name: str, 

93 content_type: str, 

94 *, 

95 quiet: bool = False, 

96 on_progress: DetailedProgressCallback = noop_callback, 

97) -> int: 

98 """Ingest a single file. Returns chunk count.""" 

99 records: list[ChunkRecord] 

100 page_texts: list[PageTextRecord] = [] 

101 if content_type == "code": 

102 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress) 

103 elif path.suffix.lower() == ".md": 

104 records = await ingest_markdown(path, source_name, on_progress, page_texts_out=page_texts) 

105 else: 

106 records = await ingest_document( 

107 path, 

108 source_name, 

109 content_type, 

110 quiet=quiet, 

111 on_progress=on_progress, 

112 page_texts_out=page_texts, 

113 ) 

114 

115 store = get_services().store 

116 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records)) 

117 await asyncio.to_thread(store.add_page_texts, cast(list[dict], page_texts)) 

118 await _index_concepts(records, source_name) 

119 return chunk_count 

120 

121 

122def _plan_file_changes( 

123 disk_files: dict[str, Path], 

124 existing_sources: dict[str, str], 

125 cancel: threading.Event | None, 

126 skip_markers: dict[str, str] | None = None, 

127) -> tuple[list[FileToProcess], list[str], list[str], int]: 

128 """Diff disk against the store. Returns (to_process, added, updated, unchanged_count). 

129 

130 A file whose current hash matches a marker in ``skip_markers`` (set by a 

131 prior failed attempt) is treated as unchanged so we don't retry every 

132 sync. Edit the file or run ``/sync --force-rebuild`` to clear the marker 

133 and try again. 

134 """ 

135 skip_markers = skip_markers or {} 

136 files_to_process: list[FileToProcess] = [] 

137 added: list[str] = [] 

138 updated: list[str] = [] 

139 unchanged = 0 

140 for name, path in sorted(disk_files.items()): 

141 if cancel and cancel.is_set(): 

142 break 

143 content_type = classify_file(path) 

144 if content_type is None: 

145 raise ValueError(f"Unsupported file slipped through discovery: {name}") 

146 old_hash = existing_sources.get(name) 

147 current_hash = file_hash(path) 

148 if old_hash == current_hash: 

149 unchanged += 1 

150 continue 

151 if skip_markers.get(name) == current_hash: 

152 # Failed last sync at this exact hash; skip the retry. 

153 unchanged += 1 

154 continue 

155 # needs_cleanup=True unconditionally: delete_by_source is idempotent, 

156 # and this closes the race where a prior ingest wrote chunks but died 

157 # before upsert_source, leaving orphaned chunks that would duplicate. 

158 files_to_process.append( 

159 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True) 

160 ) 

161 if old_hash is not None: 

162 updated.append(name) 

163 else: 

164 added.append(name) 

165 return files_to_process, added, updated, unchanged 

166 

167 

168def _removable_sources(sources: list[SourceRecord], disk_files: dict[str, Path]) -> list[str]: 

169 """Document sources whose backing file is gone. 

170 

171 Imported sources are detached (no file under documents/), so a missing 

172 disk file must not mark them for removal. 

173 """ 

174 return [ 

175 s["filename"] 

176 for s in sources 

177 if s["filename"] not in disk_files and s["source_type"] != SourceType.IMPORTED 

178 ] 

179 

180 

181def detect_pending() -> int: 

182 """Count files in documents/ that are out of sync with the store. 

183 

184 Cheap operation: filesystem walk + SHA-256 hashing + a single 

185 sources-table read. No embedding, no writes. Returns the total of 

186 added + updated + removed, which is what the TaskBar hint surfaces. 

187 Reuses ``_plan_file_changes`` so the diff logic stays single-sourced. 

188 Honors skip markers: a file that failed last time at this hash does 

189 not show up as pending. 

190 """ 

191 if not cfg.documents_dir.exists(): 

192 return 0 

193 disk_files = discover_files() 

194 sources = get_services().store.get_sources() 

195 existing_sources = {s["filename"]: s["file_hash"] for s in sources} 

196 removed = len(_removable_sources(sources, disk_files)) 

197 skip_markers = load_skip_markers(cfg.data_root) 

198 files_to_process, _, _, _ = _plan_file_changes( 

199 disk_files, existing_sources, cancel=None, skip_markers=skip_markers 

200 ) 

201 return len(files_to_process) + removed 

202 

203 

204def _load_pruned_skip_markers(disk_files: dict[str, Path], *, clear_first: bool) -> dict[str, str]: 

205 """Read the skip-marker file (optionally clearing it first) and drop entries 

206 for files no longer on disk, so the marker set tracks the current corpus.""" 

207 if clear_first: 

208 # Clearing the markers makes the diff re-include the skipped files. 

209 clear_skip_markers(cfg.data_root) 

210 markers = load_skip_markers(cfg.data_root) 

211 if not markers: 

212 return markers 

213 return {name: fhash for name, fhash in markers.items() if name in disk_files} 

214 

215 

216def _persist_skip_markers( 

217 markers: dict[str, str], 

218 pending_hashes: dict[str, str], 

219 *, 

220 succeeded: list[str], 

221 failed: list[str], 

222) -> None: 

223 """Mark files that produced no chunks so the next sync skips them, clear the 

224 markers for files that ingested cleanly, then write the file back.""" 

225 for name in succeeded: 

226 markers.pop(name, None) 

227 for name in failed: 

228 fhash = pending_hashes.get(name) 

229 if fhash: 

230 markers[name] = fhash 

231 write_skip_markers(cfg.data_root, markers) 

232 

233 

234async def sync( 

235 force_rebuild: bool = False, 

236 quiet: bool = False, 

237 *, 

238 on_progress: DetailedProgressCallback = noop_callback, 

239 cancel: threading.Event | None = None, 

240 retry_skipped: bool = False, 

241) -> SyncResult: 

242 """Sync documents/ with the vector store. 

243 Returns a SyncResult with the added/updated/removed/unchanged/failed/skipped lists. 

244 When *quiet* is True, the Rich progress bar is suppressed (for JSON output). 

245 When *cancel* is set, processing stops between files without data loss. 

246 When *retry_skipped* (or *force_rebuild*) is set, the failed-file skip 

247 markers are cleared so this sync attempts every file. 

248 """ 

249 _store = get_services().store 

250 

251 if force_rebuild: 

252 _store.drop_all() 

253 # drop_all preserves the memories table, so refresh its vectors under the 

254 # (possibly changed) embedding model. No-op when empty or no embedder. 

255 _embedder = get_services().embedder 

256 if _embedder.embedding_available(): 

257 _store.rebuild_memory_embeddings(lambda texts: _embedder.embed_batch(texts)) 

258 

259 cfg.documents_dir.mkdir(parents=True, exist_ok=True) 

260 

261 disk_files = discover_files() 

262 sources = _store.get_sources() 

263 existing_sources = {s["filename"]: s["file_hash"] for s in sources} 

264 skip_markers = _load_pruned_skip_markers(disk_files, clear_first=force_rebuild or retry_skipped) 

265 

266 removed: list[str] = [] 

267 failed: list[str] = [] 

268 skipped: list[str] = [] 

269 

270 # Find files to remove (document sources whose file is gone; imports are kept) 

271 to_remove = _removable_sources(sources, disk_files) 

272 if to_remove: 

273 _store.remove_documents(to_remove) 

274 removed.extend(to_remove) 

275 

276 files_to_process, added, updated, unchanged = _plan_file_changes( 

277 disk_files, existing_sources, cancel, skip_markers=skip_markers 

278 ) 

279 # Track skip markers for files processed this run, keyed by name → hash. 

280 pending_hashes = {entry.name: entry.file_hash for entry in files_to_process} 

281 

282 # Snapshot the cumulative truncation counter so the delta over this sync can 

283 # surface "N chunks truncated" instead of being lost in per-chunk debug logs. 

284 truncated_before = get_services().embedder.truncated_total 

285 

286 # Ingest files (with optional progress bar) 

287 if files_to_process: 

288 get_services().embedder.validate_model() 

289 await ingest_batch( 

290 files_to_process, 

291 added, 

292 updated, 

293 failed, 

294 skipped, 

295 quiet=quiet, 

296 on_progress=on_progress, 

297 cancel=cancel, 

298 ) 

299 

300 _persist_skip_markers( 

301 skip_markers, pending_hashes, succeeded=added + updated, failed=failed + skipped 

302 ) 

303 

304 if files_to_process or removed: 

305 _store.ensure_fts_index() 

306 _store.ensure_vector_index() 

307 await _rebuild_concept_clusters() 

308 # circular: lilbee.wiki imports lilbee.data.ingest.file_hash, so the 

309 # post-ingest hook stays function-local at this boundary. 

310 from lilbee.wiki.ingest import incremental_update 

311 

312 await incremental_update(set(added) | set(updated) | set(removed)) 

313 

314 result = SyncResult( 

315 added=added, 

316 updated=updated, 

317 removed=removed, 

318 unchanged=unchanged, 

319 failed=failed, 

320 skipped=skipped, 

321 truncated=get_services().embedder.truncated_total - truncated_before, 

322 ) 

323 on_progress( 

324 EventType.DONE, 

325 SyncDoneEvent( 

326 added=len(result.added), 

327 updated=len(result.updated), 

328 removed=len(result.removed), 

329 failed=len(result.failed), 

330 skipped=len(result.skipped), 

331 ), 

332 ) 

333 return result 

334 

335 

336async def ingest_batch( 

337 files_to_process: list[FileToProcess], 

338 added: list[str], 

339 updated: list[str], 

340 failed: list[str], 

341 skipped: list[str], 

342 *, 

343 quiet: bool = False, 

344 on_progress: DetailedProgressCallback = noop_callback, 

345 cancel: threading.Event | None = None, 

346) -> None: 

347 """Ingest a batch of files, optionally showing a Rich progress bar. 

348 When *needs_cleanup* is True, old chunks are deleted immediately before 

349 ingesting new ones so the two operations are atomic per file. 

350 When *cancel* is set, pending files raise CancelledError before starting. 

351 """ 

352 semaphore = asyncio.Semaphore(_MAX_CONCURRENT) 

353 total_files = len(files_to_process) 

354 

355 async def _process_one( 

356 name: str, 

357 path: Path, 

358 content_type: str, 

359 fhash: str, 

360 needs_cleanup: bool, 

361 file_index: int, 

362 ) -> _IngestResult: 

363 async with semaphore: 

364 if cancel and cancel.is_set(): 

365 raise asyncio.CancelledError 

366 

367 try: 

368 on_progress( 

369 EventType.FILE_START, 

370 FileStartEvent(file=name, total_files=total_files, current_file=file_index), 

371 ) 

372 except TaskCancelledError as exc: 

373 # FILE_START itself can raise the cooperative cancel signal; 

374 # normalize so _collect_results can drain siblings cleanly. 

375 raise asyncio.CancelledError from exc 

376 try: 

377 if needs_cleanup: 

378 get_services().store.delete_by_source(name) 

379 chunk_count = await _ingest_file( 

380 path, 

381 name, 

382 content_type, 

383 quiet=quiet, 

384 on_progress=on_progress, 

385 ) 

386 on_progress( 

387 EventType.FILE_DONE, 

388 FileDoneEvent(file=name, status="ok", chunks=chunk_count), 

389 ) 

390 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash) 

391 except (asyncio.CancelledError, TaskCancelledError) as exc: 

392 # TaskCancelledError is the TUI's cooperative cancel signal raised 

393 # by reporter.check_cancelled() inside on_progress; treat it as 

394 # asyncio cancellation so _collect_results can drain siblings 

395 # cleanly instead of orphaning their pending exceptions. 

396 raise asyncio.CancelledError from exc 

397 except Exception as exc: 

398 # During shutdown, worker pools raise RuntimeError from 

399 # submit(). Prefer to treat these as cancellation rather than 

400 # as ingest failures. Detect via the cancel flag (source of 

401 # truth) or the executor's well-known shutdown message as a 

402 # fallback when cancel was set after the submit race. 

403 if (cancel and cancel.is_set()) or is_executor_shutdown(exc): 

404 raise asyncio.CancelledError from exc 

405 # Suppress TaskCancelledError on the FILE_DONE notice: the user 

406 # already cancelled, and re-raising here would leak past 

407 # _process_one and strand sibling tasks awaiting in 

408 # _collect_results. 

409 with contextlib.suppress(TaskCancelledError): 

410 on_progress( 

411 EventType.FILE_DONE, 

412 FileDoneEvent(file=name, status="error", chunks=0), 

413 ) 

414 return _IngestResult(name, path, 0, error=exc) 

415 

416 if quiet: 

417 tasks = [ 

418 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

419 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

420 ] 

421 await _collect_results(tasks, added, updated, failed, skipped, on_progress=on_progress) 

422 else: 

423 with Progress( 

424 SpinnerColumn(), 

425 TextColumn("{task.description}"), 

426 BarColumn(), 

427 MofNCompleteColumn(), 

428 TimeElapsedColumn(), 

429 transient=True, 

430 ) as progress: 

431 ptask = progress.add_task("Ingesting documents...", total=total_files) 

432 token = shared_progress.set((progress, ptask)) 

433 try: 

434 tasks = [ 

435 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx)) 

436 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1) 

437 ] 

438 await _collect_results( 

439 tasks, 

440 added, 

441 updated, 

442 failed, 

443 skipped, 

444 on_progress=on_progress, 

445 progress=progress, 

446 ptask=ptask, 

447 ) 

448 finally: 

449 shared_progress.reset(token) 

450 

451 

452async def _collect_results( 

453 tasks: list[asyncio.Task[_IngestResult]], 

454 added: list[str], 

455 updated: list[str], 

456 failed: list[str], 

457 skipped: list[str], 

458 *, 

459 on_progress: DetailedProgressCallback = noop_callback, 

460 progress: Progress | None = None, 

461 ptask: Any = None, 

462) -> None: 

463 """Collect task results, optionally updating a Rich progress bar. 

464 

465 On exception (typically asyncio.CancelledError from a user cancel), 

466 cancel every sibling task and await them with ``return_exceptions=True`` 

467 so their pending CancelledErrors don't surface as 

468 "Task exception was never retrieved" warnings. 

469 """ 

470 try: 

471 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1): 

472 result = await fut 

473 _apply_result(result, added, updated, failed, skipped) 

474 if progress is not None and ptask is not None: 

475 desc = ( 

476 f"Ingested {result.name}" if result.error is None else f"Failed {result.name}" 

477 ) 

478 progress.update(ptask, description=desc) 

479 progress.advance(ptask) 

480 if result.error is not None: 

481 progress_status = BatchStatus.FAILED 

482 elif result.chunk_count == 0: 

483 progress_status = BatchStatus.SKIPPED 

484 else: 

485 progress_status = BatchStatus.INGESTED 

486 with contextlib.suppress(TaskCancelledError): 

487 on_progress( 

488 EventType.BATCH_PROGRESS, 

489 BatchProgressEvent( 

490 file=result.name, 

491 status=progress_status, 

492 current=completed_count, 

493 total=len(tasks), 

494 ), 

495 ) 

496 finally: 

497 pending = [t for t in tasks if not t.done()] 

498 for task in pending: 

499 task.cancel() 

500 if pending: 

501 await asyncio.gather(*pending, return_exceptions=True) 

502 

503 

504def _discard_from_list(lst: list[str], value: str) -> None: 

505 """Remove *value* from *lst* if present.""" 

506 with contextlib.suppress(ValueError): 

507 lst.remove(value) 

508 

509 

510def _apply_result( 

511 result: _IngestResult, 

512 added: list[str], 

513 updated: list[str], 

514 failed: list[str], 

515 skipped: list[str], 

516) -> None: 

517 """Record an ingestion result: update store on success, track failure.""" 

518 if result.error is not None: 

519 # Log the error message without the traceback: ingest failures are 

520 # already surfaced to callers via SyncResult.failed, and the raw 

521 # traceback from log.exception bleeds into the TUI chat pane via the 

522 # stderr bridge. Full stack traces stay reachable by 

523 # lowering LILBEE_LOG_LEVEL to DEBUG. 

524 log.warning("Failed to ingest %s: %s", result.name, result.error) 

525 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error) 

526 _discard_from_list(added, result.name) 

527 _discard_from_list(updated, result.name) 

528 failed.append(result.name) 

529 return 

530 if result.chunk_count == 0: 

531 # No chunks produced (e.g. scanned PDF without vision model, or 

532 # vision OCR returned no text). Don't record as a source so it 

533 # gets retried on next sync, and surface as skipped so the user 

534 # knows the file did not actually land in the store. 

535 _discard_from_list(added, result.name) 

536 _discard_from_list(updated, result.name) 

537 skipped.append(result.name) 

538 return 

539 

540 fhash = result.file_hash or file_hash(result.path) 

541 get_services().store.upsert_source(result.name, fhash, result.chunk_count)