Coverage for src / lilbee / cli / commands / ingest_sync.py: 100%
205 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Sync, rebuild, add, chunks, and remove commands."""
3from __future__ import annotations
5import asyncio
6import threading
7from pathlib import Path
8from typing import TYPE_CHECKING
10import typer
12if TYPE_CHECKING:
13 from collections.abc import Awaitable, Callable
15 from lilbee.runtime.progress import DetailedProgressCallback
17from lilbee.app.ingest import CopyResult, copy_files
18from lilbee.app.search import clean_result
19from lilbee.app.services import get_services
20from lilbee.cli import theme
21from lilbee.cli.app import (
22 apply_overrides,
23 console,
24 data_dir_option,
25 global_option,
26)
27from lilbee.cli.commands._shared import CHUNK_PREVIEW_LEN
28from lilbee.cli.helpers import (
29 add_paths,
30 json_output,
31 sync_result_to_json,
32)
33from lilbee.core.config import cfg
34from lilbee.crawler import is_url
36_ocr_option = typer.Option(None, "--ocr/--no-ocr", help="Force vision OCR on/off for scanned PDFs.")
37_retry_skipped_option = typer.Option(
38 False,
39 "--retry-skipped",
40 help="Retry files that were skipped on a previous sync (clears the failed-file markers).",
41)
42_ocr_timeout_option = typer.Option(
43 None,
44 "--ocr-timeout",
45 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).",
46)
49def _apply_ocr_overrides(ocr: bool | None, ocr_timeout: float | None) -> None:
50 """Apply --ocr/--no-ocr and --ocr-timeout CLI overrides to config."""
51 if ocr is not None:
52 cfg.enable_ocr = ocr
53 if ocr_timeout is not None:
54 cfg.ocr_timeout = ocr_timeout
57_paths_argument = typer.Argument(
58 ...,
59 help="Files, directories, or URLs to add to the knowledge base.",
60)
62_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.")
63_crawl_option = typer.Option(
64 False,
65 "--crawl",
66 help="Recursively crawl URLs (whole site by default; see --depth and --max-pages).",
67)
68_depth_option = typer.Option(
69 None,
70 "--depth",
71 help="Cap link-follow depth for --crawl. Unset = unbounded; 0 = single URL only.",
72)
73_max_pages_option = typer.Option(
74 None,
75 "--max-pages",
76 help="Cap total pages for --crawl. Unset = no limit; positive int = hard cap.",
77)
78_include_subdomains_option = typer.Option(
79 False,
80 "--include-subdomains",
81 help=(
82 "Allow --crawl to follow links into sibling subdomains of the start "
83 "host (e.g. en.wikipedia.org plus af.wikipedia.org). Default scopes "
84 "the crawl to the exact start host only."
85 ),
86)
89def _partition_inputs(inputs: list[str]) -> tuple[list[Path], list[str]]:
90 """Split inputs into file paths and URLs."""
91 paths: list[Path] = []
92 urls: list[str] = []
93 for inp in inputs:
94 if is_url(inp):
95 urls.append(inp)
96 else:
97 paths.append(Path(inp))
98 return paths, urls
101def _crawl_urls_blocking(
102 urls: list[str],
103 *,
104 crawl: bool,
105 depth: int | None,
106 max_pages: int | None,
107 include_subdomains: bool = False,
108) -> list[Path]:
109 """Crawl URLs synchronously (for CLI), returning paths written.
111 Without --crawl, each URL is fetched as a single page (depth=0).
112 With --crawl, the default is whole-site unbounded (depth=None, pages=None).
113 Explicit --depth / --max-pages override both.
115 Ctrl-C is handled by running the crawl through _run_crawl_with_signal_cancel,
116 which installs a signal.signal handler that sets a threading.Event passed
117 into crawl_and_save. crawl_recursive polls the event between pages so the
118 signal flows through as a clean cancel instead of asyncio.run's default
119 KeyboardInterrupt-raising (which left browser contexts mid-teardown).
120 """
121 from rich.progress import Progress, SpinnerColumn, TaskID, TextColumn
123 from lilbee.crawler import crawl_and_save
124 from lilbee.runtime.progress import (
125 CrawlPageEvent,
126 EventType,
127 ProgressEvent,
128 )
130 if crawl:
131 effective_depth = depth
132 effective_pages = max_pages
133 else:
134 effective_depth = 0
135 effective_pages = None
137 cancel_event = threading.Event()
139 from rich.console import Console as RichConsole
141 err_console = RichConsole(stderr=True)
142 all_paths: list[Path] = []
143 with Progress(
144 SpinnerColumn(),
145 TextColumn("{task.description}"),
146 transient=True,
147 console=err_console,
148 disable=cfg.json_mode,
149 ) as progress:
150 for url in urls:
151 if cancel_event.is_set():
152 break
153 ptask = progress.add_task(f"Crawling {url}...", total=None)
155 def _make_callback(_t: TaskID = ptask) -> DetailedProgressCallback:
156 def on_progress(event_type: EventType, data: ProgressEvent) -> None:
157 if event_type == EventType.CRAWL_PAGE:
158 if not isinstance(data, CrawlPageEvent):
159 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}")
160 total_str = str(data.total) if data.total > 0 else "?"
161 progress.update(
162 _t,
163 description=f"Crawled {data.current}/{total_str}: {data.url}",
164 )
166 return on_progress
168 paths = _run_crawl_with_signal_cancel(
169 url,
170 depth=effective_depth,
171 max_pages=effective_pages,
172 on_progress=_make_callback(),
173 cancel_event=cancel_event,
174 crawl_and_save=crawl_and_save,
175 include_subdomains=include_subdomains,
176 )
177 all_paths.extend(paths)
178 progress.update(ptask, description=f"Done: {url} ({len(paths)} pages)")
179 return all_paths
182def _run_crawl_with_signal_cancel(
183 url: str,
184 *,
185 depth: int | None,
186 max_pages: int | None,
187 on_progress: DetailedProgressCallback,
188 cancel_event: threading.Event,
189 crawl_and_save: Callable[..., Awaitable[list[Path]]],
190 include_subdomains: bool = False,
191) -> list[Path]:
192 """Run crawl_and_save on a dedicated event loop with a SIGINT->cancel hook.
194 asyncio.run() installs its own SIGINT handler that raises
195 KeyboardInterrupt, which tears the crawl down ungracefully. Registering a
196 plain signal.signal handler on the main thread AND running the crawl on a
197 loop we own (instead of asyncio.run) lets Ctrl-C set our threading.Event,
198 which crawl_recursive polls between pages so it can close the stream and
199 stop dispatch cleanly.
200 """
201 import signal
203 previous_handler = signal.getsignal(signal.SIGINT)
205 def _on_sigint(_signum: int, _frame: object) -> None:
206 # Set the cancel event that crawl_recursive polls between pages, so
207 # a Ctrl-C flows through as a clean cancel instead of asyncio.run's
208 # default KeyboardInterrupt-raising dance.
209 cancel_event.set()
211 signal.signal(signal.SIGINT, _on_sigint)
212 # Manage the event loop explicitly. In the CLI this runs once per process,
213 # but under pytest-xdist the same worker thread runs many tests; leaving a
214 # closed loop set as the "current" loop for the thread poisons every later
215 # asyncio.get_event_loop() call and hangs macOS 3.12/3.13 unit-test CI.
216 # Always clear the thread-current loop in finally.
217 loop = asyncio.new_event_loop()
218 try:
219 asyncio.set_event_loop(loop)
220 coro = crawl_and_save(
221 url,
222 depth=depth,
223 max_pages=max_pages,
224 on_progress=on_progress,
225 cancel=cancel_event,
226 quiet=cfg.json_mode,
227 include_subdomains=include_subdomains,
228 )
229 result: list[Path] = loop.run_until_complete(coro)
230 return result
231 finally:
232 loop.close()
233 asyncio.set_event_loop(None)
234 signal.signal(signal.SIGINT, previous_handler)
237def sync_cmd(
238 data_dir: Path | None = data_dir_option,
239 use_global: bool = global_option,
240 ocr: bool | None = _ocr_option,
241 ocr_timeout: float | None = _ocr_timeout_option,
242 retry_skipped: bool = _retry_skipped_option,
243) -> None:
244 """Manually trigger document sync."""
245 apply_overrides(data_dir=data_dir, use_global=use_global)
246 _apply_ocr_overrides(ocr, ocr_timeout)
247 from lilbee.data.ingest import sync
249 try:
250 result = asyncio.run(sync(quiet=cfg.json_mode, retry_skipped=retry_skipped))
251 except RuntimeError as exc:
252 if cfg.json_mode:
253 json_output({"error": str(exc)})
254 raise SystemExit(1) from None
255 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
256 raise SystemExit(1) from None
257 if cfg.json_mode:
258 json_output(sync_result_to_json(result))
259 return
260 console.print(result)
263def rebuild(
264 data_dir: Path | None = data_dir_option,
265 use_global: bool = global_option,
266 ocr: bool | None = _ocr_option,
267 ocr_timeout: float | None = _ocr_timeout_option,
268) -> None:
269 """Nuke the DB and re-ingest everything from documents/."""
270 apply_overrides(data_dir=data_dir, use_global=use_global)
271 _apply_ocr_overrides(ocr, ocr_timeout)
272 from lilbee.data.ingest import sync
274 try:
275 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode))
276 except RuntimeError as exc:
277 if cfg.json_mode:
278 json_output({"error": str(exc)})
279 raise SystemExit(1) from None
280 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
281 raise SystemExit(1) from None
282 if cfg.json_mode:
283 json_output({"command": "rebuild", "ingested": len(result.added)})
284 return
285 console.print(f"Rebuilt: {len(result.added)} documents ingested")
288def _validate_file_paths(file_paths: list[Path]) -> None:
289 """Exit on the first missing path; respects ``cfg.json_mode``."""
290 for fp in file_paths:
291 if fp.exists():
292 continue
293 if cfg.json_mode:
294 json_output({"error": f"Path not found: {fp}"})
295 raise SystemExit(1)
296 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] Path not found: {fp}")
297 raise SystemExit(1)
300def _crawl_urls_step(
301 urls: list[str],
302 *,
303 crawl: bool,
304 depth: int | None,
305 max_pages: int | None,
306 include_subdomains: bool,
307) -> list[Path]:
308 """Crawl URLs (or fail fast when crawler extra is missing). Returns saved paths."""
309 if not urls:
310 return []
311 from lilbee.crawler import crawler_available
313 if not crawler_available():
314 console.print(
315 f"[{theme.ERROR}]Web crawling requires: pip install 'lilbee[crawler]'[/{theme.ERROR}]"
316 )
317 raise SystemExit(1)
318 crawled_paths = _crawl_urls_blocking(
319 urls,
320 crawl=crawl,
321 depth=depth,
322 max_pages=max_pages,
323 include_subdomains=include_subdomains,
324 )
325 if not cfg.json_mode:
326 console.print(
327 f"[{theme.MUTED}]Crawled {len(crawled_paths)} page(s)"
328 f" from {len(urls)} URL(s)[/{theme.MUTED}]"
329 )
330 return crawled_paths
333def _add_json_mode(file_paths: list[Path], crawled_paths: list[Path], *, force: bool) -> None:
334 """Run the JSON-mode finish: copy files, sync, emit one structured result."""
335 from lilbee.data.ingest import sync
337 copy_result = CopyResult()
338 if file_paths:
339 copy_result = copy_files(file_paths, force=force)
340 result = asyncio.run(sync(quiet=True))
341 json_output(
342 {
343 "command": "add",
344 "copied": copy_result.copied,
345 "skipped": copy_result.skipped,
346 "crawled": len(crawled_paths),
347 "sync": sync_result_to_json(result),
348 }
349 )
352def add(
353 paths: list[str] = _paths_argument,
354 data_dir: Path | None = data_dir_option,
355 use_global: bool = global_option,
356 force: bool = _force_option,
357 ocr: bool | None = _ocr_option,
358 ocr_timeout: float | None = _ocr_timeout_option,
359 crawl: bool = _crawl_option,
360 depth: int | None = _depth_option,
361 max_pages: int | None = _max_pages_option,
362 include_subdomains: bool = _include_subdomains_option,
363) -> None:
364 """Copy files or crawl URLs into the knowledge base and ingest them."""
365 apply_overrides(data_dir=data_dir, use_global=use_global)
366 _apply_ocr_overrides(ocr, ocr_timeout)
368 file_paths, urls = _partition_inputs(paths)
369 _validate_file_paths(file_paths)
371 try:
372 crawled_paths = _crawl_urls_step(
373 urls,
374 crawl=crawl,
375 depth=depth,
376 max_pages=max_pages,
377 include_subdomains=include_subdomains,
378 )
380 if cfg.json_mode:
381 _add_json_mode(file_paths, crawled_paths, force=force)
382 return
384 if file_paths:
385 add_paths(file_paths, console, force=force)
386 elif urls:
387 # URLs already saved; just trigger sync
388 from lilbee.data.ingest import sync
390 result = asyncio.run(sync())
391 console.print(result)
392 except RuntimeError as exc:
393 if cfg.json_mode:
394 json_output({"error": str(exc)})
395 raise SystemExit(1) from None
396 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
397 raise SystemExit(1) from None
400_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.")
403def chunks(
404 source: str = _chunks_source_argument,
405 data_dir: Path | None = data_dir_option,
406 use_global: bool = global_option,
407) -> None:
408 """Show chunks a document was split into (useful for debugging retrieval)."""
409 apply_overrides(data_dir=data_dir, use_global=use_global)
411 store = get_services().store
412 known = {s["filename"] for s in store.get_sources()}
413 if source not in known:
414 if cfg.json_mode:
415 json_output({"error": f"Source not found: {source}"})
416 raise SystemExit(1)
417 console.print(f"[{theme.ERROR}]Source not found:[/{theme.ERROR}] {source}")
418 raise SystemExit(1)
420 raw_chunks = store.get_chunks_by_source(source)
421 cleaned = sorted(
422 [clean_result(c) for c in raw_chunks],
423 key=lambda c: c.get("chunk_index", 0),
424 )
426 if cfg.json_mode:
427 json_output({"command": "chunks", "source": source, "chunks": cleaned})
428 return
430 console.print(
431 f"[{theme.LABEL}]{len(cleaned)}[/{theme.LABEL}]"
432 f" chunks from [{theme.ACCENT}]{source}[/{theme.ACCENT}]\n"
433 )
434 for c in cleaned:
435 idx = c.get("chunk_index", "?")
436 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN]
437 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN:
438 preview += "..."
439 console.print(f" [{idx}] {preview}")
442_remove_names_argument = typer.Argument(
443 ..., help="Source name(s) to remove from the knowledge base."
444)
446_delete_file_option = typer.Option(
447 False, "--delete", help="Also delete the file from the documents directory."
448)
451def remove(
452 names: list[str] = _remove_names_argument,
453 data_dir: Path | None = data_dir_option,
454 use_global: bool = global_option,
455 delete_file: bool = _delete_file_option,
456) -> None:
457 """Remove documents from the knowledge base by source name."""
458 apply_overrides(data_dir=data_dir, use_global=use_global)
460 result = get_services().store.remove_documents(
461 names, delete_files=delete_file, documents_dir=cfg.documents_dir
462 )
464 if cfg.json_mode:
465 payload: dict = {"command": "remove", "removed": result.removed}
466 if result.not_found:
467 payload["not_found"] = result.not_found
468 json_output(payload)
469 if not result.removed and result.not_found:
470 raise SystemExit(1)
471 return
473 for name in result.removed:
474 console.print(f"Removed [{theme.ACCENT}]{name}[/{theme.ACCENT}]")
475 for name in result.not_found:
476 console.print(f"[{theme.ERROR}]Not found:[/{theme.ERROR}] {name}")
477 if not result.removed and result.not_found:
478 raise SystemExit(1)