Coverage for src / lilbee / data / ingest / pipeline.py: 100%
222 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Top-level sync orchestration: discovery, dispatch, batching, post-sync hooks."""
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8import threading
9from pathlib import Path
10from typing import Any, cast
12from rich.progress import (
13 BarColumn,
14 MofNCompleteColumn,
15 Progress,
16 SpinnerColumn,
17 TextColumn,
18 TimeElapsedColumn,
19)
21from lilbee.app.services import get_services
22from lilbee.core.config import cfg
23from lilbee.data.ingest.code import ingest_code_sync
24from lilbee.data.ingest.discovery import classify_file, discover_files, file_hash
25from lilbee.data.ingest.extract import ingest_document, ingest_markdown
26from lilbee.data.ingest.skip_marker import (
27 clear_skip_markers,
28 load_skip_markers,
29 write_skip_markers,
30)
31from lilbee.data.ingest.types import ChunkRecord, FileToProcess, SyncResult, _IngestResult
32from lilbee.data.store import PageTextRecord, SourceRecord, SourceType
33from lilbee.runtime.asyncio_loop import is_executor_shutdown
34from lilbee.runtime.cancellation import TaskCancelledError
35from lilbee.runtime.cpu import cpu_quota
36from lilbee.runtime.progress import (
37 BatchProgressEvent,
38 BatchStatus,
39 DetailedProgressCallback,
40 EventType,
41 FileDoneEvent,
42 FileStartEvent,
43 SyncDoneEvent,
44 noop_callback,
45 shared_progress,
46)
48log = logging.getLogger(__name__)
50# Limit concurrent ingestion. Sourced from cpu_quota() so worker storms
51# can't starve the TUI's asyncio main thread on macOS.
52_MAX_CONCURRENT = cpu_quota()
55async def _rebuild_concept_clusters() -> None:
56 """Re-run Leiden clustering after sync. No-op if disabled."""
57 if not cfg.concept_graph:
58 return
59 from lilbee.retrieval.concepts import concepts_available
61 if not concepts_available():
62 return
63 try:
64 cg = get_services().concepts
65 if not cg.get_graph():
66 return
67 await asyncio.to_thread(cg.rebuild_clusters)
68 except Exception:
69 log.warning("Concept cluster rebuild failed", exc_info=True)
72async def _index_concepts(records: list[ChunkRecord], source_name: str) -> None:
73 """Extract and index concepts for ingested chunks. No-op if disabled."""
74 if not cfg.concept_graph or not records:
75 return
76 from lilbee.retrieval.concepts import concepts_available
78 if not concepts_available():
79 return
80 try:
81 cg = get_services().concepts
82 texts = [r["chunk"] for r in records]
83 concept_lists = await asyncio.to_thread(cg.extract_concepts_batch, texts)
84 chunk_ids = [(source_name, r["chunk_index"]) for r in records]
85 await asyncio.to_thread(cg.build_from_chunks, chunk_ids, concept_lists)
86 except Exception:
87 log.warning("Concept indexing failed for %s", source_name, exc_info=True)
90async def _ingest_file(
91 path: Path,
92 source_name: str,
93 content_type: str,
94 *,
95 quiet: bool = False,
96 on_progress: DetailedProgressCallback = noop_callback,
97) -> int:
98 """Ingest a single file. Returns chunk count."""
99 records: list[ChunkRecord]
100 page_texts: list[PageTextRecord] = []
101 if content_type == "code":
102 records = await asyncio.to_thread(ingest_code_sync, path, source_name, on_progress)
103 elif path.suffix.lower() == ".md":
104 records = await ingest_markdown(path, source_name, on_progress, page_texts_out=page_texts)
105 else:
106 records = await ingest_document(
107 path,
108 source_name,
109 content_type,
110 quiet=quiet,
111 on_progress=on_progress,
112 page_texts_out=page_texts,
113 )
115 store = get_services().store
116 chunk_count = await asyncio.to_thread(store.add_chunks, cast(list[dict], records))
117 await asyncio.to_thread(store.add_page_texts, cast(list[dict], page_texts))
118 await _index_concepts(records, source_name)
119 return chunk_count
122def _plan_file_changes(
123 disk_files: dict[str, Path],
124 existing_sources: dict[str, str],
125 cancel: threading.Event | None,
126 skip_markers: dict[str, str] | None = None,
127) -> tuple[list[FileToProcess], list[str], list[str], int]:
128 """Diff disk against the store. Returns (to_process, added, updated, unchanged_count).
130 A file whose current hash matches a marker in ``skip_markers`` (set by a
131 prior failed attempt) is treated as unchanged so we don't retry every
132 sync. Edit the file or run ``/sync --force-rebuild`` to clear the marker
133 and try again.
134 """
135 skip_markers = skip_markers or {}
136 files_to_process: list[FileToProcess] = []
137 added: list[str] = []
138 updated: list[str] = []
139 unchanged = 0
140 for name, path in sorted(disk_files.items()):
141 if cancel and cancel.is_set():
142 break
143 content_type = classify_file(path)
144 if content_type is None:
145 raise ValueError(f"Unsupported file slipped through discovery: {name}")
146 old_hash = existing_sources.get(name)
147 current_hash = file_hash(path)
148 if old_hash == current_hash:
149 unchanged += 1
150 continue
151 if skip_markers.get(name) == current_hash:
152 # Failed last sync at this exact hash; skip the retry.
153 unchanged += 1
154 continue
155 # needs_cleanup=True unconditionally: delete_by_source is idempotent,
156 # and this closes the race where a prior ingest wrote chunks but died
157 # before upsert_source, leaving orphaned chunks that would duplicate.
158 files_to_process.append(
159 FileToProcess(name, path, content_type, current_hash, needs_cleanup=True)
160 )
161 if old_hash is not None:
162 updated.append(name)
163 else:
164 added.append(name)
165 return files_to_process, added, updated, unchanged
168def _removable_sources(sources: list[SourceRecord], disk_files: dict[str, Path]) -> list[str]:
169 """Document sources whose backing file is gone.
171 Imported sources are detached (no file under documents/), so a missing
172 disk file must not mark them for removal.
173 """
174 return [
175 s["filename"]
176 for s in sources
177 if s["filename"] not in disk_files and s["source_type"] != SourceType.IMPORTED
178 ]
181def detect_pending() -> int:
182 """Count files in documents/ that are out of sync with the store.
184 Cheap operation: filesystem walk + SHA-256 hashing + a single
185 sources-table read. No embedding, no writes. Returns the total of
186 added + updated + removed, which is what the TaskBar hint surfaces.
187 Reuses ``_plan_file_changes`` so the diff logic stays single-sourced.
188 Honors skip markers: a file that failed last time at this hash does
189 not show up as pending.
190 """
191 if not cfg.documents_dir.exists():
192 return 0
193 disk_files = discover_files()
194 sources = get_services().store.get_sources()
195 existing_sources = {s["filename"]: s["file_hash"] for s in sources}
196 removed = len(_removable_sources(sources, disk_files))
197 skip_markers = load_skip_markers(cfg.data_root)
198 files_to_process, _, _, _ = _plan_file_changes(
199 disk_files, existing_sources, cancel=None, skip_markers=skip_markers
200 )
201 return len(files_to_process) + removed
204def _load_pruned_skip_markers(disk_files: dict[str, Path], *, clear_first: bool) -> dict[str, str]:
205 """Read the skip-marker file (optionally clearing it first) and drop entries
206 for files no longer on disk, so the marker set tracks the current corpus."""
207 if clear_first:
208 # Clearing the markers makes the diff re-include the skipped files.
209 clear_skip_markers(cfg.data_root)
210 markers = load_skip_markers(cfg.data_root)
211 if not markers:
212 return markers
213 return {name: fhash for name, fhash in markers.items() if name in disk_files}
216def _persist_skip_markers(
217 markers: dict[str, str],
218 pending_hashes: dict[str, str],
219 *,
220 succeeded: list[str],
221 failed: list[str],
222) -> None:
223 """Mark files that produced no chunks so the next sync skips them, clear the
224 markers for files that ingested cleanly, then write the file back."""
225 for name in succeeded:
226 markers.pop(name, None)
227 for name in failed:
228 fhash = pending_hashes.get(name)
229 if fhash:
230 markers[name] = fhash
231 write_skip_markers(cfg.data_root, markers)
234async def sync(
235 force_rebuild: bool = False,
236 quiet: bool = False,
237 *,
238 on_progress: DetailedProgressCallback = noop_callback,
239 cancel: threading.Event | None = None,
240 retry_skipped: bool = False,
241) -> SyncResult:
242 """Sync documents/ with the vector store.
243 Returns a SyncResult with the added/updated/removed/unchanged/failed/skipped lists.
244 When *quiet* is True, the Rich progress bar is suppressed (for JSON output).
245 When *cancel* is set, processing stops between files without data loss.
246 When *retry_skipped* (or *force_rebuild*) is set, the failed-file skip
247 markers are cleared so this sync attempts every file.
248 """
249 _store = get_services().store
251 if force_rebuild:
252 _store.drop_all()
253 # drop_all preserves the memories table, so refresh its vectors under the
254 # (possibly changed) embedding model. No-op when empty or no embedder.
255 _embedder = get_services().embedder
256 if _embedder.embedding_available():
257 _store.rebuild_memory_embeddings(lambda texts: _embedder.embed_batch(texts))
259 cfg.documents_dir.mkdir(parents=True, exist_ok=True)
261 disk_files = discover_files()
262 sources = _store.get_sources()
263 existing_sources = {s["filename"]: s["file_hash"] for s in sources}
264 skip_markers = _load_pruned_skip_markers(disk_files, clear_first=force_rebuild or retry_skipped)
266 removed: list[str] = []
267 failed: list[str] = []
268 skipped: list[str] = []
270 # Find files to remove (document sources whose file is gone; imports are kept)
271 to_remove = _removable_sources(sources, disk_files)
272 if to_remove:
273 _store.remove_documents(to_remove)
274 removed.extend(to_remove)
276 files_to_process, added, updated, unchanged = _plan_file_changes(
277 disk_files, existing_sources, cancel, skip_markers=skip_markers
278 )
279 # Track skip markers for files processed this run, keyed by name → hash.
280 pending_hashes = {entry.name: entry.file_hash for entry in files_to_process}
282 # Snapshot the cumulative truncation counter so the delta over this sync can
283 # surface "N chunks truncated" instead of being lost in per-chunk debug logs.
284 truncated_before = get_services().embedder.truncated_total
286 # Ingest files (with optional progress bar)
287 if files_to_process:
288 get_services().embedder.validate_model()
289 await ingest_batch(
290 files_to_process,
291 added,
292 updated,
293 failed,
294 skipped,
295 quiet=quiet,
296 on_progress=on_progress,
297 cancel=cancel,
298 )
300 _persist_skip_markers(
301 skip_markers, pending_hashes, succeeded=added + updated, failed=failed + skipped
302 )
304 if files_to_process or removed:
305 _store.ensure_fts_index()
306 _store.ensure_vector_index()
307 await _rebuild_concept_clusters()
308 # circular: lilbee.wiki imports lilbee.data.ingest.file_hash, so the
309 # post-ingest hook stays function-local at this boundary.
310 from lilbee.wiki.ingest import incremental_update
312 await incremental_update(set(added) | set(updated) | set(removed))
314 result = SyncResult(
315 added=added,
316 updated=updated,
317 removed=removed,
318 unchanged=unchanged,
319 failed=failed,
320 skipped=skipped,
321 truncated=get_services().embedder.truncated_total - truncated_before,
322 )
323 on_progress(
324 EventType.DONE,
325 SyncDoneEvent(
326 added=len(result.added),
327 updated=len(result.updated),
328 removed=len(result.removed),
329 failed=len(result.failed),
330 skipped=len(result.skipped),
331 ),
332 )
333 return result
336async def ingest_batch(
337 files_to_process: list[FileToProcess],
338 added: list[str],
339 updated: list[str],
340 failed: list[str],
341 skipped: list[str],
342 *,
343 quiet: bool = False,
344 on_progress: DetailedProgressCallback = noop_callback,
345 cancel: threading.Event | None = None,
346) -> None:
347 """Ingest a batch of files, optionally showing a Rich progress bar.
348 When *needs_cleanup* is True, old chunks are deleted immediately before
349 ingesting new ones so the two operations are atomic per file.
350 When *cancel* is set, pending files raise CancelledError before starting.
351 """
352 semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
353 total_files = len(files_to_process)
355 async def _process_one(
356 name: str,
357 path: Path,
358 content_type: str,
359 fhash: str,
360 needs_cleanup: bool,
361 file_index: int,
362 ) -> _IngestResult:
363 async with semaphore:
364 if cancel and cancel.is_set():
365 raise asyncio.CancelledError
367 try:
368 on_progress(
369 EventType.FILE_START,
370 FileStartEvent(file=name, total_files=total_files, current_file=file_index),
371 )
372 except TaskCancelledError as exc:
373 # FILE_START itself can raise the cooperative cancel signal;
374 # normalize so _collect_results can drain siblings cleanly.
375 raise asyncio.CancelledError from exc
376 try:
377 if needs_cleanup:
378 get_services().store.delete_by_source(name)
379 chunk_count = await _ingest_file(
380 path,
381 name,
382 content_type,
383 quiet=quiet,
384 on_progress=on_progress,
385 )
386 on_progress(
387 EventType.FILE_DONE,
388 FileDoneEvent(file=name, status="ok", chunks=chunk_count),
389 )
390 return _IngestResult(name, path, chunk_count, error=None, file_hash=fhash)
391 except (asyncio.CancelledError, TaskCancelledError) as exc:
392 # TaskCancelledError is the TUI's cooperative cancel signal raised
393 # by reporter.check_cancelled() inside on_progress; treat it as
394 # asyncio cancellation so _collect_results can drain siblings
395 # cleanly instead of orphaning their pending exceptions.
396 raise asyncio.CancelledError from exc
397 except Exception as exc:
398 # During shutdown, worker pools raise RuntimeError from
399 # submit(). Prefer to treat these as cancellation rather than
400 # as ingest failures. Detect via the cancel flag (source of
401 # truth) or the executor's well-known shutdown message as a
402 # fallback when cancel was set after the submit race.
403 if (cancel and cancel.is_set()) or is_executor_shutdown(exc):
404 raise asyncio.CancelledError from exc
405 # Suppress TaskCancelledError on the FILE_DONE notice: the user
406 # already cancelled, and re-raising here would leak past
407 # _process_one and strand sibling tasks awaiting in
408 # _collect_results.
409 with contextlib.suppress(TaskCancelledError):
410 on_progress(
411 EventType.FILE_DONE,
412 FileDoneEvent(file=name, status="error", chunks=0),
413 )
414 return _IngestResult(name, path, 0, error=exc)
416 if quiet:
417 tasks = [
418 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
419 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
420 ]
421 await _collect_results(tasks, added, updated, failed, skipped, on_progress=on_progress)
422 else:
423 with Progress(
424 SpinnerColumn(),
425 TextColumn("{task.description}"),
426 BarColumn(),
427 MofNCompleteColumn(),
428 TimeElapsedColumn(),
429 transient=True,
430 ) as progress:
431 ptask = progress.add_task("Ingesting documents...", total=total_files)
432 token = shared_progress.set((progress, ptask))
433 try:
434 tasks = [
435 asyncio.ensure_future(_process_one(name, path, ct, fh, cleanup, idx))
436 for idx, (name, path, ct, fh, cleanup) in enumerate(files_to_process, 1)
437 ]
438 await _collect_results(
439 tasks,
440 added,
441 updated,
442 failed,
443 skipped,
444 on_progress=on_progress,
445 progress=progress,
446 ptask=ptask,
447 )
448 finally:
449 shared_progress.reset(token)
452async def _collect_results(
453 tasks: list[asyncio.Task[_IngestResult]],
454 added: list[str],
455 updated: list[str],
456 failed: list[str],
457 skipped: list[str],
458 *,
459 on_progress: DetailedProgressCallback = noop_callback,
460 progress: Progress | None = None,
461 ptask: Any = None,
462) -> None:
463 """Collect task results, optionally updating a Rich progress bar.
465 On exception (typically asyncio.CancelledError from a user cancel),
466 cancel every sibling task and await them with ``return_exceptions=True``
467 so their pending CancelledErrors don't surface as
468 "Task exception was never retrieved" warnings.
469 """
470 try:
471 for completed_count, fut in enumerate(asyncio.as_completed(tasks), 1):
472 result = await fut
473 _apply_result(result, added, updated, failed, skipped)
474 if progress is not None and ptask is not None:
475 desc = (
476 f"Ingested {result.name}" if result.error is None else f"Failed {result.name}"
477 )
478 progress.update(ptask, description=desc)
479 progress.advance(ptask)
480 if result.error is not None:
481 progress_status = BatchStatus.FAILED
482 elif result.chunk_count == 0:
483 progress_status = BatchStatus.SKIPPED
484 else:
485 progress_status = BatchStatus.INGESTED
486 with contextlib.suppress(TaskCancelledError):
487 on_progress(
488 EventType.BATCH_PROGRESS,
489 BatchProgressEvent(
490 file=result.name,
491 status=progress_status,
492 current=completed_count,
493 total=len(tasks),
494 ),
495 )
496 finally:
497 pending = [t for t in tasks if not t.done()]
498 for task in pending:
499 task.cancel()
500 if pending:
501 await asyncio.gather(*pending, return_exceptions=True)
504def _discard_from_list(lst: list[str], value: str) -> None:
505 """Remove *value* from *lst* if present."""
506 with contextlib.suppress(ValueError):
507 lst.remove(value)
510def _apply_result(
511 result: _IngestResult,
512 added: list[str],
513 updated: list[str],
514 failed: list[str],
515 skipped: list[str],
516) -> None:
517 """Record an ingestion result: update store on success, track failure."""
518 if result.error is not None:
519 # Log the error message without the traceback: ingest failures are
520 # already surfaced to callers via SyncResult.failed, and the raw
521 # traceback from log.exception bleeds into the TUI chat pane via the
522 # stderr bridge. Full stack traces stay reachable by
523 # lowering LILBEE_LOG_LEVEL to DEBUG.
524 log.warning("Failed to ingest %s: %s", result.name, result.error)
525 log.debug("Traceback for failed ingest of %s", result.name, exc_info=result.error)
526 _discard_from_list(added, result.name)
527 _discard_from_list(updated, result.name)
528 failed.append(result.name)
529 return
530 if result.chunk_count == 0:
531 # No chunks produced (e.g. scanned PDF without vision model, or
532 # vision OCR returned no text). Don't record as a source so it
533 # gets retried on next sync, and surface as skipped so the user
534 # knows the file did not actually land in the store.
535 _discard_from_list(added, result.name)
536 _discard_from_list(updated, result.name)
537 skipped.append(result.name)
538 return
540 fhash = result.file_hash or file_hash(result.path)
541 get_services().store.upsert_source(result.name, fhash, result.chunk_count)