Coverage for src / lilbee / data / ingest / pipeline.py: 100%
211 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Top-level sync orchestration: discovery, dispatch, batching, post-sync hooks."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8import threading
9from pathlib import Path
10from typing import Any, cast
12from rich.progress import (
13 BarColumn,
14 MofNCompleteColumn,
15 Progress,
16 SpinnerColumn,
17 TextColumn,
18 TimeElapsedColumn,
19)
21from lilbee.app.services import get_services
22from lilbee.core.config import cfg
23from lilbee.data.ingest.code import ingest_code_sync
24from lilbee.data.ingest.discovery import classify_file, discover_files, file_hash
25from lilbee.data.ingest.extract import ingest_document, ingest_markdown
26from lilbee.data.ingest.skip_marker import (
27 clear_skip_markers,
28 load_skip_markers,
29 write_skip_markers,
30)
31from lilbee.data.ingest.types import ChunkRecord, FileToProcess, SyncResult, _IngestResult
32from lilbee.runtime.asyncio_loop import is_executor_shutdown
33from lilbee.runtime.cancellation import TaskCancelledError
34from lilbee.runtime.cpu import cpu_quota
35from lilbee.runtime.progress import (
36 BatchProgressEvent,
37 BatchStatus,
38 DetailedProgressCallback,
39 EventType,
40 FileDoneEvent,
41 FileStartEvent,
42 SyncDoneEvent,
43 noop_callback,
44 shared_progress,
45)
47log = logging.getLogger(__name__)
49# Limit concurrent ingestion. Sourced from cpu_quota() so worker storms
50# can't starve the TUI's asyncio main thread on macOS.
51_MAX_CONCURRENT = cpu_quota()
54async def _rebuild_concept_clusters() -> None:
55 """Re-run Leiden clustering after sync. No-op if disabled."""
56 if not cfg.concept_graph:
57 return
58 from lilbee.retrieval.concepts import concepts_available
60 if not concepts_available():
61 return
62 try:
63 cg = get_services().concepts
64 if not cg.get_graph():
65 return
66 await asyncio.to_thread(cg.rebuild_clusters)
67 except Exception:
68 log.warning("Concept cluster rebuild failed", exc_info=True)
71async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None:
72 """Extract and index concepts for ingested chunks. No-op if disabled."""
73 if not cfg.concept_graph or not records:
74 return
75 from lilbee.retrieval.concepts import concepts_available
77 if not concepts_available():
78 return
79 try:
80 cg = get_services().concepts
81 texts = [r["chunk"] for r in records]
82 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts)
83 chunk_ids = [(source_name, r["chunk_index"]) for r in records]
84 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists)
85 except Exception:
86 log.warning("Concept indexing failed for %s", source_name, exc_info=True)
89async def _ingest_file(
90 path: Path,
91 source_name: str,
92 content_type: str,
93 *,
94 quiet: bool = False,
95 on_progress: DetailedProgressCallback = noop_callback,
96) -> int:
97 """Ingest a single file. Returns chunk count."""
98 records: list[ChunkRecord]
99 if content_type == "code":
100 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress)
101 elif path.suffix.lower() == ".md":
102 records = await ingest_markdown(path, source_name, on_progress)
103 else:
104 records = await ingest_document(
105 path,
106 source_name,
107 content_type,
108 quiet=quiet,
109 on_progress=on_progress,
110 )
112 store = get_services().store
113 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records))
114 await _index_concepts(records, source_name)
115 return chunk_count
118def _plan_file_changes(
119 disk_files: dict[str, Path],
120 existing_sources: dict[str, str],
121 cancel: threading.Event | None,
122 skip_markers: dict[str, str] | None = None,
123) -> tuple[list[FileToProcess], list[str], list[str], int]:
124 """Diff disk against the store. Returns (to_process, added, updated, unchanged_count).
126 A file whose current hash matches a marker in ``skip_markers`` (set by a
127 prior failed attempt) is treated as unchanged so we don't retry every
128 sync. Edit the file or run ``/sync --force-rebuild`` to clear the marker
129 and try again.
130 """
131 skip_markers = skip_markers or {}
132 files_to_process: list[FileToProcess] = []
133 added: list[str] = []
134 updated: list[str] = []
135 unchanged = 0
136 for name, path in sorted(disk_files.items()):
137 if cancel and cancel.is_set():
138 break
139 content_type = classify_file(path)
140 if content_type is None:
141 raise ValueError(f"Unsupported file slipped through discovery: {name}")
142 old_hash = existing_sources.get(name)
143 current_hash = file_hash(path)
144 if old_hash == current_hash:
145 unchanged += 1
146 continue
147 if skip_markers.get(name) == current_hash:
148 # Failed last sync at this exact hash; skip the retry.
149 unchanged += 1
150 continue
151 # needs_cleanup=True unconditionally: delete_by_source is idempotent,
152 # and this closes the race where a prior ingest wrote chunks but died
153 # before upsert_source, leaving orphaned chunks that would duplicate.
154 files_to_process.append(
155 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True)
156 )
157 if old_hash is not None:
158 updated.append(name)
159 else:
160 added.append(name)
161 return files_to_process, added, updated, unchanged
164def detect_pending() -> int:
165 """Count files in documents/ that are out of sync with the store.
167 Cheap operation: filesystem walk + SHA-256 hashing + a single
168 sources-table read. No embedding, no writes. Returns the total of
169 added + updated + removed, which is what the TaskBar hint surfaces.
170 Reuses ``_plan_file_changes`` so the diff logic stays single-sourced.
171 Honors skip markers: a file that failed last time at this hash does
172 not show up as pending.
173 """
174 if not cfg.documents_dir.exists():
175 return 0
176 disk_files = discover_files()
177 existing_sources = {s["filename"]: s["file_hash"] for s in get_services().store.get_sources()}
178 removed = sum(1 for name in existing_sources if name not in disk_files)
179 skip_markers = load_skip_markers(cfg.data_root)
180 files_to_process, _, _, _ = _plan_file_changes(
181 disk_files, existing_sources, cancel=None, skip_markers=skip_markers
182 )
183 return len(files_to_process) + removed
186def _load_pruned_skip_markers(disk_files: dict[str, Path], *, clear_first: bool) -> dict[str, str]:
187 """Read the skip-marker file (optionally clearing it first) and drop entries
188 for files no longer on disk, so the marker set tracks the current corpus."""
189 if clear_first:
190 # Clearing the markers makes the diff re-include the skipped files.
191 clear_skip_markers(cfg.data_root)
192 markers = load_skip_markers(cfg.data_root)
193 if not markers:
194 return markers
195 return {name: fhash for name, fhash in markers.items() if name in disk_files}
198def _persist_skip_markers(
199 markers: dict[str, str],
200 pending_hashes: dict[str, str],
201 *,
202 succeeded: list[str],
203 failed: list[str],
204) -> None:
205 """Mark files that produced no chunks so the next sync skips them, clear the
206 markers for files that ingested cleanly, then write the file back."""
207 for name in succeeded:
208 markers.pop(name, None)
209 for name in failed:
210 fhash = pending_hashes.get(name)
211 if fhash:
212 markers[name] = fhash
213 write_skip_markers(cfg.data_root, markers)
216async def sync(
217 force_rebuild: bool = False,
218 quiet: bool = False,
219 *,
220 on_progress: DetailedProgressCallback = noop_callback,
221 cancel: threading.Event | None = None,
222 retry_skipped: bool = False,
223) -> SyncResult:
224 """Sync documents/ with the vector store.
225 Returns a SyncResult with the added/updated/removed/unchanged/failed/skipped lists.
226 When *quiet* is True, the Rich progress bar is suppressed (for JSON output).
227 When *cancel* is set, processing stops between files without data loss.
228 When *retry_skipped* (or *force_rebuild*) is set, the failed-file skip
229 markers are cleared so this sync attempts every file.
230 """
231 _store = get_services().store
233 if force_rebuild:
234 _store.drop_all()
236 cfg.documents_dir.mkdir(parents=True, exist_ok=True)
238 disk_files = discover_files()
239 existing_sources = {s["filename"]: s["file_hash"] for s in _store.get_sources()}
240 skip_markers = _load_pruned_skip_markers(disk_files, clear_first=force_rebuild or retry_skipped)
242 removed: list[str] = []
243 failed: list[str] = []
244 skipped: list[str] = []
246 # Find files to remove (in DB but not on disk)
247 for name in existing_sources:
248 if name not in disk_files:
249 _store.delete_by_source(name)
250 _store.delete_source(name)
251 removed.append(name)
253 files_to_process, added, updated, unchanged = _plan_file_changes(
254 disk_files, existing_sources, cancel, skip_markers=skip_markers
255 )
256 # Track skip markers for files processed this run, keyed by name → hash.
257 pending_hashes = {entry.name: entry.file_hash for entry in files_to_process}
259 # Ingest files (with optional progress bar)
260 if files_to_process:
261 get_services().embedder.validate_model()
262 await ingest_batch(
263 files_to_process,
264 added,
265 updated,
266 failed,
267 skipped,
268 quiet=quiet,
269 on_progress=on_progress,
270 cancel=cancel,
271 )
273 _persist_skip_markers(
274 skip_markers, pending_hashes, succeeded=added + updated, failed=failed + skipped
275 )
277 if files_to_process or removed:
278 _store.ensure_fts_index()
279 await _rebuild_concept_clusters()
280 # circular: lilbee.wiki imports lilbee.data.ingest.file_hash, so the
281 # post-ingest hook stays function-local at this boundary.
282 from lilbee.wiki.ingest import incremental_update
284 await incremental_update(set(added) | set(updated) | set(removed))
286 result = SyncResult(
287 added=added,
288 updated=updated,
289 removed=removed,
290 unchanged=unchanged,
291 failed=failed,
292 skipped=skipped,
293 )
294 on_progress(
295 EventType.DONE,
296 SyncDoneEvent(
297 added=len(result.added),
298 updated=len(result.updated),
299 removed=len(result.removed),
300 failed=len(result.failed),
301 skipped=len(result.skipped),
302 ),
303 )
304 return result
307async def ingest_batch(
308 files_to_process: list[FileToProcess],
309 added: list[str],
310 updated: list[str],
311 failed: list[str],
312 skipped: list[str],
313 *,
314 quiet: bool = False,
315 on_progress: DetailedProgressCallback = noop_callback,
316 cancel: threading.Event | None = None,
317) -> None:
318 """Ingest a batch of files, optionally showing a Rich progress bar.
319 When *needs_cleanup* is True, old chunks are deleted immediately before
320 ingesting new ones so the two operations are atomic per file.
321 When *cancel* is set, pending files raise CancelledError before starting.
322 """
323 semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
324 total_files = len(files_to_process)
326 async def _process_one(
327 name: str,
328 path: Path,
329 content_type: str,
330 fhash: str,
331 needs_cleanup: bool,
332 file_index: int,
333 ) -> _IngestResult:
334 async with semaphore:
335 if cancel and cancel.is_set():
336 raise asyncio.CancelledError
338 try:
339 on_progress(
340 EventType.FILE_START,
341 FileStartEvent(file=name, total_files=total_files, current_file=file_index),
342 )
343 except TaskCancelledError as exc:
344 # FILE_START itself can raise the cooperative cancel signal;
345 # normalize so _collect_results can drain siblings cleanly.
346 raise asyncio.CancelledError from exc
347 try:
348 if needs_cleanup:
349 get_services().store.delete_by_source(name)
350 chunk_count = await _ingest_file(
351 path,
352 name,
353 content_type,
354 quiet=quiet,
355 on_progress=on_progress,
356 )
357 on_progress(
358 EventType.FILE_DONE,
359 FileDoneEvent(file=name, status="ok", chunks=chunk_count),
360 )
361 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash)
362 except (asyncio.CancelledError, TaskCancelledError) as exc:
363 # TaskCancelledError is the TUI's cooperative cancel signal raised
364 # by reporter.check_cancelled() inside on_progress; treat it as
365 # asyncio cancellation so _collect_results can drain siblings
366 # cleanly instead of orphaning their pending exceptions.
367 raise asyncio.CancelledError from exc
368 except Exception as exc:
369 # During shutdown, worker pools raise RuntimeError from
370 # submit(). Prefer to treat these as cancellation rather than
371 # as ingest failures. Detect via the cancel flag (source of
372 # truth) or the executor's well-known shutdown message as a
373 # fallback when cancel was set after the submit race.
374 if (cancel and cancel.is_set()) or is_executor_shutdown(exc):
375 raise asyncio.CancelledError from exc
376 # Suppress TaskCancelledError on the FILE_DONE notice: the user
377 # already cancelled, and re-raising here would leak past
378 # _process_one and strand sibling tasks awaiting in
379 # _collect_results.
380 with contextlib.suppress(TaskCancelledError):
381 on_progress(
382 EventType.FILE_DONE,
383 FileDoneEvent(file=name, status="error", chunks=0),
384 )
385 return _IngestResult(name, path, 0, error=exc)
387 if quiet:
388 tasks = [
389 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
390 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
391 ]
392 await _collect_results(tasks, added, updated, failed, skipped, on_progress=on_progress)
393 else:
394 with Progress(
395 SpinnerColumn(),
396 TextColumn("{task.description}"),
397 BarColumn(),
398 MofNCompleteColumn(),
399 TimeElapsedColumn(),
400 transient=True,
401 ) as progress:
402 ptask = progress.add_task("Ingesting documents...", total=total_files)
403 token = shared_progress.set((progress, ptask))
404 try:
405 tasks = [
406 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
407 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
408 ]
409 await _collect_results(
410 tasks,
411 added,
412 updated,
413 failed,
414 skipped,
415 on_progress=on_progress,
416 progress=progress,
417 ptask=ptask,
418 )
419 finally:
420 shared_progress.reset(token)
423async def _collect_results(
424 tasks: list[asyncio.Task[_IngestResult]],
425 added: list[str],
426 updated: list[str],
427 failed: list[str],
428 skipped: list[str],
429 *,
430 on_progress: DetailedProgressCallback = noop_callback,
431 progress: Progress | None = None,
432 ptask: Any = None,
433) -> None:
434 """Collect task results, optionally updating a Rich progress bar.
436 On exception (typically asyncio.CancelledError from a user cancel),
437 cancel every sibling task and await them with ``return_exceptions=True``
438 so their pending CancelledErrors don't surface as
439 "Task exception was never retrieved" warnings.
440 """
441 try:
442 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1):
443 result = await fut
444 _apply_result(result, added, updated, failed, skipped)
445 if progress is not None and ptask is not None:
446 desc = (
447 f"Ingested {result.name}" if result.error is None else f"Failed {result.name}"
448 )
449 progress.update(ptask, description=desc)
450 progress.advance(ptask)
451 if result.error is not None:
452 progress_status = BatchStatus.FAILED
453 elif result.chunk_count == 0:
454 progress_status = BatchStatus.SKIPPED
455 else:
456 progress_status = BatchStatus.INGESTED
457 with contextlib.suppress(TaskCancelledError):
458 on_progress(
459 EventType.BATCH_PROGRESS,
460 BatchProgressEvent(
461 file=result.name,
462 status=progress_status,
463 current=completed_count,
464 total=len(tasks),
465 ),
466 )
467 finally:
468 pending = [t for t in tasks if not t.done()]
469 for task in pending:
470 task.cancel()
471 if pending:
472 await asyncio.gather(*pending, return_exceptions=True)
475def _discard_from_list(lst: list[str], value: str) -> None:
476 """Remove *value* from *lst* if present."""
477 with contextlib.suppress(ValueError):
478 lst.remove(value)
481def _apply_result(
482 result: _IngestResult,
483 added: list[str],
484 updated: list[str],
485 failed: list[str],
486 skipped: list[str],
487) -> None:
488 """Record an ingestion result: update store on success, track failure."""
489 if result.error is not None:
490 # Log the error message without the traceback: ingest failures are
491 # already surfaced to callers via SyncResult.failed, and the raw
492 # traceback from log.exception bleeds into the TUI chat pane via the
493 # stderr bridge. Full stack traces stay reachable by
494 # lowering LILBEE_LOG_LEVEL to DEBUG.
495 log.warning("Failed to ingest %s: %s", result.name, result.error)
496 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error)
497 _discard_from_list(added, result.name)
498 _discard_from_list(updated, result.name)
499 failed.append(result.name)
500 return
501 if result.chunk_count == 0:
502 # No chunks produced (e.g. scanned PDF without vision model, or
503 # vision OCR returned no text). Don't record as a source so it
504 # gets retried on next sync, and surface as skipped so the user
505 # knows the file did not actually land in the store.
506 _discard_from_list(added, result.name)
507 _discard_from_list(updated, result.name)
508 skipped.append(result.name)
509 return
511 fhash = result.file_hash or file_hash(result.path)
512 get_services().store.upsert_source(result.name, fhash, result.chunk_count)