Coverage for src / lilbee / cli / commands / ingest_sync.py: 100%
222 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Sync, rebuild, add, chunks, and remove commands."""
3from __future__ import annotations
5import asyncio
6import threading
7from pathlib import Path
8from typing import TYPE_CHECKING
10import typer
12if TYPE_CHECKING:
13 from collections.abc import Awaitable, Callable
15 from lilbee.runtime.progress import DetailedProgressCallback
17from lilbee.app.ingest import CopyResult, copy_files
18from lilbee.app.search import clean_result
19from lilbee.app.services import get_services
20from lilbee.cli import theme
21from lilbee.cli.app import (
22 apply_overrides,
23 console,
24 data_dir_option,
25 global_option,
26)
27from lilbee.cli.commands._shared import CHUNK_PREVIEW_LEN
28from lilbee.cli.helpers import (
29 add_paths,
30 json_output,
31 sync_result_to_json,
32)
33from lilbee.core.config import cfg
34from lilbee.crawler import is_url
36_ocr_option = typer.Option(None, "--ocr/--no-ocr", help="Force vision OCR on/off for scanned PDFs.")
37_retry_skipped_option = typer.Option(
38 False,
39 "--retry-skipped",
40 help="Retry files that were skipped on a previous sync (clears the failed-file markers).",
41)
42_ocr_timeout_option = typer.Option(
43 None,
44 "--ocr-timeout",
45 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).",
46)
49def _apply_ocr_overrides(ocr: bool | None, ocr_timeout: float | None) -> None:
50 """Apply --ocr/--no-ocr and --ocr-timeout CLI overrides to config."""
51 if ocr is not None:
52 cfg.enable_ocr = ocr
53 if ocr_timeout is not None:
54 cfg.ocr_timeout = ocr_timeout
57_paths_argument = typer.Argument(
58 ...,
59 help="Files, directories, or URLs to add to the knowledge base.",
60)
62_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.")
63_crawl_option = typer.Option(
64 False,
65 "--crawl",
66 help="Recursively crawl URLs (whole site by default; see --depth and --max-pages).",
67)
68_depth_option = typer.Option(
69 None,
70 "--depth",
71 help="Cap link-follow depth for --crawl. Unset = unbounded; 0 = single URL only.",
72)
73_max_pages_option = typer.Option(
74 None,
75 "--max-pages",
76 help="Cap pages for --crawl. Unset = protective default; 0 = unlimited; N = hard cap.",
77)
78_include_subdomains_option = typer.Option(
79 False,
80 "--include-subdomains",
81 help=(
82 "Allow --crawl to follow links into sibling subdomains of the start "
83 "host (e.g. en.wikipedia.org plus af.wikipedia.org). Default scopes "
84 "the crawl to the exact start host only."
85 ),
86)
89def _partition_inputs(inputs: list[str]) -> tuple[list[Path], list[str]]:
90 """Split inputs into file paths and URLs."""
91 paths: list[Path] = []
92 urls: list[str] = []
93 for inp in inputs:
94 if is_url(inp):
95 urls.append(inp)
96 else:
97 paths.append(Path(inp))
98 return paths, urls
101def _crawl_urls_blocking(
102 urls: list[str],
103 *,
104 crawl: bool,
105 depth: int | None,
106 max_pages: int | None,
107 include_subdomains: bool = False,
108) -> list[Path]:
109 """Crawl URLs synchronously (for CLI), returning paths written.
111 Without --crawl, each URL is fetched as a single page (depth=0).
112 With --crawl, the default is whole-site unbounded (depth=None, pages=None).
113 Explicit --depth / --max-pages override both.
115 Ctrl-C is handled by running the crawl through _run_crawl_with_signal_cancel,
116 which installs a signal.signal handler that sets a threading.Event passed
117 into crawl_and_save. crawl_recursive polls the event between pages so the
118 signal flows through as a clean cancel instead of asyncio.run's default
119 KeyboardInterrupt-raising (which left browser contexts mid-teardown).
120 """
121 from rich.progress import Progress, SpinnerColumn, TaskID, TextColumn
123 from lilbee.crawler import crawl_and_save
124 from lilbee.runtime.progress import (
125 CrawlDoneEvent,
126 CrawlPageEvent,
127 EventType,
128 ProgressEvent,
129 )
131 if crawl:
132 effective_depth = depth
133 effective_pages = max_pages
134 else:
135 effective_depth = 0
136 effective_pages = None
138 cancel_event = threading.Event()
140 from rich.console import Console as RichConsole
142 err_console = RichConsole(stderr=True)
143 all_paths: list[Path] = []
144 with Progress(
145 SpinnerColumn(),
146 TextColumn("{task.description}"),
147 transient=True,
148 console=err_console,
149 disable=cfg.json_mode,
150 ) as progress:
151 for url in urls:
152 if cancel_event.is_set():
153 break
154 ptask = progress.add_task(f"Crawling {url}...", total=None)
155 crawled: dict[str, int] = {}
157 def _make_callback(
158 _t: TaskID = ptask, _crawled: dict[str, int] = crawled
159 ) -> DetailedProgressCallback:
160 def on_progress(event_type: EventType, data: ProgressEvent) -> None:
161 if event_type == EventType.CRAWL_PAGE:
162 if not isinstance(data, CrawlPageEvent):
163 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}")
164 total_str = str(data.total) if data.total > 0 else "?"
165 progress.update(
166 _t,
167 description=f"Crawled {data.current}/{total_str}: {data.url}",
168 )
169 elif event_type == EventType.CRAWL_DONE and isinstance(data, CrawlDoneEvent):
170 _crawled["n"] = data.pages_crawled
172 return on_progress
174 paths = _run_crawl_with_signal_cancel(
175 url,
176 depth=effective_depth,
177 max_pages=effective_pages,
178 on_progress=_make_callback(),
179 cancel_event=cancel_event,
180 crawl_and_save=crawl_and_save,
181 include_subdomains=include_subdomains,
182 )
183 all_paths.extend(paths)
184 progress.update(ptask, description=f"Done: {url} ({len(paths)} pages)")
185 # No explicit cap given and the crawl filled the protective default:
186 # tell the user how to go unlimited without editing settings.
187 default_cap = cfg.crawl_max_pages or cfg.crawl_safety_max_pages
188 if crawl and max_pages is None and crawled.get("n", 0) >= default_cap:
189 err_console.print(
190 f"Stopped at the default {default_cap}-page limit; "
191 f"pass --max-pages 0 to crawl unlimited (or --max-pages N for a higher cap)."
192 )
193 return all_paths
196def _run_crawl_with_signal_cancel(
197 url: str,
198 *,
199 depth: int | None,
200 max_pages: int | None,
201 on_progress: DetailedProgressCallback,
202 cancel_event: threading.Event,
203 crawl_and_save: Callable[..., Awaitable[list[Path]]],
204 include_subdomains: bool = False,
205) -> list[Path]:
206 """Run crawl_and_save on a dedicated event loop with a SIGINT->cancel hook.
208 asyncio.run() installs its own SIGINT handler that raises
209 KeyboardInterrupt, which tears the crawl down ungracefully. Registering a
210 plain signal.signal handler on the main thread AND running the crawl on a
211 loop we own (instead of asyncio.run) lets Ctrl-C set our threading.Event,
212 which crawl_recursive polls between pages so it can close the stream and
213 stop dispatch cleanly.
214 """
215 import signal
217 previous_handler = signal.getsignal(signal.SIGINT)
219 def _on_sigint(_signum: int, _frame: object) -> None:
220 # Set the cancel event that crawl_recursive polls between pages, so
221 # a Ctrl-C flows through as a clean cancel instead of asyncio.run's
222 # default KeyboardInterrupt-raising dance.
223 cancel_event.set()
225 signal.signal(signal.SIGINT, _on_sigint)
226 # Manage the event loop explicitly. In the CLI this runs once per process,
227 # but under pytest-xdist the same worker thread runs many tests; leaving a
228 # closed loop set as the "current" loop for the thread poisons every later
229 # asyncio.get_event_loop() call and hangs macOS 3.12/3.13 unit-test CI.
230 # Always clear the thread-current loop in finally.
231 loop = asyncio.new_event_loop()
232 try:
233 asyncio.set_event_loop(loop)
234 coro = crawl_and_save(
235 url,
236 depth=depth,
237 max_pages=max_pages,
238 on_progress=on_progress,
239 cancel=cancel_event,
240 quiet=cfg.json_mode,
241 include_subdomains=include_subdomains,
242 )
243 result: list[Path] = loop.run_until_complete(coro)
244 return result
245 finally:
246 loop.close()
247 asyncio.set_event_loop(None)
248 signal.signal(signal.SIGINT, previous_handler)
251def sync_cmd(
252 data_dir: Path | None = data_dir_option,
253 use_global: bool = global_option,
254 ocr: bool | None = _ocr_option,
255 ocr_timeout: float | None = _ocr_timeout_option,
256 retry_skipped: bool = _retry_skipped_option,
257) -> None:
258 """Manually trigger document sync."""
259 apply_overrides(data_dir=data_dir, use_global=use_global)
260 _apply_ocr_overrides(ocr, ocr_timeout)
261 from lilbee.data.ingest import sync
263 try:
264 result = asyncio.run(sync(quiet=cfg.json_mode, retry_skipped=retry_skipped))
265 except RuntimeError as exc:
266 if cfg.json_mode:
267 json_output({"error": str(exc)})
268 raise SystemExit(1) from None
269 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
270 raise SystemExit(1) from None
271 if cfg.json_mode:
272 json_output(sync_result_to_json(result))
273 return
274 console.print(result)
277def rebuild(
278 data_dir: Path | None = data_dir_option,
279 use_global: bool = global_option,
280 ocr: bool | None = _ocr_option,
281 ocr_timeout: float | None = _ocr_timeout_option,
282) -> None:
283 """Nuke the DB and re-ingest everything from documents/."""
284 apply_overrides(data_dir=data_dir, use_global=use_global)
285 _apply_ocr_overrides(ocr, ocr_timeout)
286 from lilbee.data.ingest import sync
288 try:
289 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode))
290 except RuntimeError as exc:
291 if cfg.json_mode:
292 json_output({"error": str(exc)})
293 raise SystemExit(1) from None
294 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
295 raise SystemExit(1) from None
296 if cfg.json_mode:
297 json_output({"command": "rebuild", "ingested": len(result.added)})
298 return
299 console.print(f"Rebuilt: {len(result.added)} documents ingested")
302def index(
303 data_dir: Path | None = data_dir_option,
304 use_global: bool = global_option,
305) -> None:
306 """Build the search indexes now (vector ANN + full-text).
308 Useful before publishing a large index so downloaders get fast search
309 without waiting for it to build on first query. Forces the vector index
310 even below the auto-build threshold.
311 """
312 apply_overrides(data_dir=data_dir, use_global=use_global)
313 store = get_services().store
314 store.ensure_fts_index()
315 built = store.ensure_vector_index(force=True)
316 if cfg.json_mode:
317 json_output({"command": "index", "vector_index": built})
318 return
319 if built:
320 console.print("Search indexes built (vector ANN + full-text).")
321 else:
322 console.print("Full-text index built; vector index needs more chunks.")
325def _validate_file_paths(file_paths: list[Path]) -> None:
326 """Exit on the first missing path; respects ``cfg.json_mode``."""
327 for fp in file_paths:
328 if fp.exists():
329 continue
330 if cfg.json_mode:
331 json_output({"error": f"Path not found: {fp}"})
332 raise SystemExit(1)
333 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] Path not found: {fp}")
334 raise SystemExit(1)
337def _crawl_urls_step(
338 urls: list[str],
339 *,
340 crawl: bool,
341 depth: int | None,
342 max_pages: int | None,
343 include_subdomains: bool,
344) -> list[Path]:
345 """Crawl URLs (or fail fast when crawler extra is missing). Returns saved paths."""
346 if not urls:
347 return []
348 from lilbee.crawler import crawler_available
350 if not crawler_available():
351 console.print(
352 f"[{theme.ERROR}]Web crawling requires: pip install 'lilbee[crawler]'[/{theme.ERROR}]"
353 )
354 raise SystemExit(1)
355 crawled_paths = _crawl_urls_blocking(
356 urls,
357 crawl=crawl,
358 depth=depth,
359 max_pages=max_pages,
360 include_subdomains=include_subdomains,
361 )
362 if not cfg.json_mode:
363 console.print(
364 f"[{theme.MUTED}]Crawled {len(crawled_paths)} page(s)"
365 f" from {len(urls)} URL(s)[/{theme.MUTED}]"
366 )
367 return crawled_paths
370def _add_json_mode(file_paths: list[Path], crawled_paths: list[Path], *, force: bool) -> None:
371 """Run the JSON-mode finish: copy files, sync, emit one structured result."""
372 from lilbee.data.ingest import sync
374 copy_result = CopyResult()
375 if file_paths:
376 copy_result = copy_files(file_paths, force=force)
377 result = asyncio.run(sync(quiet=True))
378 json_output(
379 {
380 "command": "add",
381 "copied": copy_result.copied,
382 "skipped": copy_result.skipped,
383 "crawled": len(crawled_paths),
384 "sync": sync_result_to_json(result),
385 }
386 )
389def add(
390 paths: list[str] = _paths_argument,
391 data_dir: Path | None = data_dir_option,
392 use_global: bool = global_option,
393 force: bool = _force_option,
394 ocr: bool | None = _ocr_option,
395 ocr_timeout: float | None = _ocr_timeout_option,
396 crawl: bool = _crawl_option,
397 depth: int | None = _depth_option,
398 max_pages: int | None = _max_pages_option,
399 include_subdomains: bool = _include_subdomains_option,
400) -> None:
401 """Copy files or crawl URLs into the knowledge base and ingest them."""
402 apply_overrides(data_dir=data_dir, use_global=use_global)
403 _apply_ocr_overrides(ocr, ocr_timeout)
405 file_paths, urls = _partition_inputs(paths)
406 _validate_file_paths(file_paths)
408 try:
409 crawled_paths = _crawl_urls_step(
410 urls,
411 crawl=crawl,
412 depth=depth,
413 max_pages=max_pages,
414 include_subdomains=include_subdomains,
415 )
417 if cfg.json_mode:
418 _add_json_mode(file_paths, crawled_paths, force=force)
419 return
421 if file_paths:
422 add_paths(file_paths, console, force=force)
423 elif urls:
424 # URLs already saved; just trigger sync
425 from lilbee.data.ingest import sync
427 result = asyncio.run(sync())
428 console.print(result)
429 except RuntimeError as exc:
430 if cfg.json_mode:
431 json_output({"error": str(exc)})
432 raise SystemExit(1) from None
433 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}")
434 raise SystemExit(1) from None
437_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.")
440def chunks(
441 source: str = _chunks_source_argument,
442 data_dir: Path | None = data_dir_option,
443 use_global: bool = global_option,
444) -> None:
445 """Show chunks a document was split into (useful for debugging retrieval)."""
446 apply_overrides(data_dir=data_dir, use_global=use_global)
448 store = get_services().store
449 known = {s["filename"] for s in store.get_sources()}
450 if source not in known:
451 if cfg.json_mode:
452 json_output({"error": f"Source not found: {source}"})
453 raise SystemExit(1)
454 console.print(f"[{theme.ERROR}]Source not found:[/{theme.ERROR}] {source}")
455 raise SystemExit(1)
457 raw_chunks = store.get_chunks_by_source(source)
458 cleaned = sorted(
459 [clean_result(c) for c in raw_chunks],
460 key=lambda c: c.get("chunk_index", 0),
461 )
463 if cfg.json_mode:
464 json_output({"command": "chunks", "source": source, "chunks": cleaned})
465 return
467 console.print(
468 f"[{theme.LABEL}]{len(cleaned)}[/{theme.LABEL}]"
469 f" chunks from [{theme.ACCENT}]{source}[/{theme.ACCENT}]\n"
470 )
471 for c in cleaned:
472 idx = c.get("chunk_index", "?")
473 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN]
474 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN:
475 preview += "..."
476 console.print(f" [{idx}] {preview}")
479_remove_names_argument = typer.Argument(
480 ..., help="Source name(s) to remove from the knowledge base."
481)
483_delete_file_option = typer.Option(
484 False, "--delete", help="Also delete the file from the documents directory."
485)
488def remove(
489 names: list[str] = _remove_names_argument,
490 data_dir: Path | None = data_dir_option,
491 use_global: bool = global_option,
492 delete_file: bool = _delete_file_option,
493) -> None:
494 """Remove documents from the knowledge base by source name."""
495 apply_overrides(data_dir=data_dir, use_global=use_global)
497 result = get_services().store.remove_documents(
498 names, delete_files=delete_file, documents_dir=cfg.documents_dir
499 )
501 if cfg.json_mode:
502 payload: dict = {"command": "remove", "removed": result.removed}
503 if result.not_found:
504 payload["not_found"] = result.not_found
505 json_output(payload)
506 if not result.removed and result.not_found:
507 raise SystemExit(1)
508 return
510 for name in result.removed:
511 console.print(f"Removed [{theme.ACCENT}]{name}[/{theme.ACCENT}]")
512 for name in result.not_found:
513 console.print(f"[{theme.ERROR}]Not found:[/{theme.ERROR}] {name}")
514 if not result.removed and result.not_found:
515 raise SystemExit(1)