Coverage for src / lilbee / cli / commands / ingest_sync.py: 100%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Sync, rebuild, add, chunks, and remove commands.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import threading 

7from pathlib import Path 

8from typing import TYPE_CHECKING 

9 

10import typer 

11 

12if TYPE_CHECKING: 

13 from collections.abc import Awaitable, Callable 

14 

15 from lilbee.runtime.progress import DetailedProgressCallback 

16 

17from lilbee.app.ingest import CopyResult, copy_files 

18from lilbee.app.search import clean_result 

19from lilbee.app.services import get_services 

20from lilbee.cli import theme 

21from lilbee.cli.app import ( 

22 apply_overrides, 

23 console, 

24 data_dir_option, 

25 global_option, 

26) 

27from lilbee.cli.commands._shared import CHUNK_PREVIEW_LEN 

28from lilbee.cli.helpers import ( 

29 add_paths, 

30 json_output, 

31 sync_result_to_json, 

32) 

33from lilbee.core.config import cfg 

34from lilbee.crawler import is_url 

35 

36_ocr_option = typer.Option(None, "--ocr/--no-ocr", help="Force vision OCR on/off for scanned PDFs.") 

37_retry_skipped_option = typer.Option( 

38 False, 

39 "--retry-skipped", 

40 help="Retry files that were skipped on a previous sync (clears the failed-file markers).", 

41) 

42_ocr_timeout_option = typer.Option( 

43 None, 

44 "--ocr-timeout", 

45 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).", 

46) 

47 

48 

49def _apply_ocr_overrides(ocr: bool | None, ocr_timeout: float | None) -> None: 

50 """Apply --ocr/--no-ocr and --ocr-timeout CLI overrides to config.""" 

51 if ocr is not None: 

52 cfg.enable_ocr = ocr 

53 if ocr_timeout is not None: 

54 cfg.ocr_timeout = ocr_timeout 

55 

56 

57_paths_argument = typer.Argument( 

58 ..., 

59 help="Files, directories, or URLs to add to the knowledge base.", 

60) 

61 

62_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.") 

63_crawl_option = typer.Option( 

64 False, 

65 "--crawl", 

66 help="Recursively crawl URLs (whole site by default; see --depth and --max-pages).", 

67) 

68_depth_option = typer.Option( 

69 None, 

70 "--depth", 

71 help="Cap link-follow depth for --crawl. Unset = unbounded; 0 = single URL only.", 

72) 

73_max_pages_option = typer.Option( 

74 None, 

75 "--max-pages", 

76 help="Cap pages for --crawl. Unset = protective default; 0 = unlimited; N = hard cap.", 

77) 

78_include_subdomains_option = typer.Option( 

79 False, 

80 "--include-subdomains", 

81 help=( 

82 "Allow --crawl to follow links into sibling subdomains of the start " 

83 "host (e.g. en.wikipedia.org plus af.wikipedia.org). Default scopes " 

84 "the crawl to the exact start host only." 

85 ), 

86) 

87 

88 

89def _partition_inputs(inputs: list[str]) -> tuple[list[Path], list[str]]: 

90 """Split inputs into file paths and URLs.""" 

91 paths: list[Path] = [] 

92 urls: list[str] = [] 

93 for inp in inputs: 

94 if is_url(inp): 

95 urls.append(inp) 

96 else: 

97 paths.append(Path(inp)) 

98 return paths, urls 

99 

100 

101def _crawl_urls_blocking( 

102 urls: list[str], 

103 *, 

104 crawl: bool, 

105 depth: int | None, 

106 max_pages: int | None, 

107 include_subdomains: bool = False, 

108) -> list[Path]: 

109 """Crawl URLs synchronously (for CLI), returning paths written. 

110 

111 Without --crawl, each URL is fetched as a single page (depth=0). 

112 With --crawl, the default is whole-site unbounded (depth=None, pages=None). 

113 Explicit --depth / --max-pages override both. 

114 

115 Ctrl-C is handled by running the crawl through _run_crawl_with_signal_cancel, 

116 which installs a signal.signal handler that sets a threading.Event passed 

117 into crawl_and_save. crawl_recursive polls the event between pages so the 

118 signal flows through as a clean cancel instead of asyncio.run's default 

119 KeyboardInterrupt-raising (which left browser contexts mid-teardown). 

120 """ 

121 from rich.progress import Progress, SpinnerColumn, TaskID, TextColumn 

122 

123 from lilbee.crawler import crawl_and_save 

124 from lilbee.runtime.progress import ( 

125 CrawlDoneEvent, 

126 CrawlPageEvent, 

127 EventType, 

128 ProgressEvent, 

129 ) 

130 

131 if crawl: 

132 effective_depth = depth 

133 effective_pages = max_pages 

134 else: 

135 effective_depth = 0 

136 effective_pages = None 

137 

138 cancel_event = threading.Event() 

139 

140 from rich.console import Console as RichConsole 

141 

142 err_console = RichConsole(stderr=True) 

143 all_paths: list[Path] = [] 

144 with Progress( 

145 SpinnerColumn(), 

146 TextColumn("{task.description}"), 

147 transient=True, 

148 console=err_console, 

149 disable=cfg.json_mode, 

150 ) as progress: 

151 for url in urls: 

152 if cancel_event.is_set(): 

153 break 

154 ptask = progress.add_task(f"Crawling {url}...", total=None) 

155 crawled: dict[str, int] = {} 

156 

157 def _make_callback( 

158 _t: TaskID = ptask, _crawled: dict[str, int] = crawled 

159 ) -> DetailedProgressCallback: 

160 def on_progress(event_type: EventType, data: ProgressEvent) -> None: 

161 if event_type == EventType.CRAWL_PAGE: 

162 if not isinstance(data, CrawlPageEvent): 

163 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}") 

164 total_str = str(data.total) if data.total > 0 else "?" 

165 progress.update( 

166 _t, 

167 description=f"Crawled {data.current}/{total_str}: {data.url}", 

168 ) 

169 elif event_type == EventType.CRAWL_DONE and isinstance(data, CrawlDoneEvent): 

170 _crawled["n"] = data.pages_crawled 

171 

172 return on_progress 

173 

174 paths = _run_crawl_with_signal_cancel( 

175 url, 

176 depth=effective_depth, 

177 max_pages=effective_pages, 

178 on_progress=_make_callback(), 

179 cancel_event=cancel_event, 

180 crawl_and_save=crawl_and_save, 

181 include_subdomains=include_subdomains, 

182 ) 

183 all_paths.extend(paths) 

184 progress.update(ptask, description=f"Done: {url} ({len(paths)} pages)") 

185 # No explicit cap given and the crawl filled the protective default: 

186 # tell the user how to go unlimited without editing settings. 

187 default_cap = cfg.crawl_max_pages or cfg.crawl_safety_max_pages 

188 if crawl and max_pages is None and crawled.get("n", 0) >= default_cap: 

189 err_console.print( 

190 f"Stopped at the default {default_cap}-page limit; " 

191 f"pass --max-pages 0 to crawl unlimited (or --max-pages N for a higher cap)." 

192 ) 

193 return all_paths 

194 

195 

196def _run_crawl_with_signal_cancel( 

197 url: str, 

198 *, 

199 depth: int | None, 

200 max_pages: int | None, 

201 on_progress: DetailedProgressCallback, 

202 cancel_event: threading.Event, 

203 crawl_and_save: Callable[..., Awaitable[list[Path]]], 

204 include_subdomains: bool = False, 

205) -> list[Path]: 

206 """Run crawl_and_save on a dedicated event loop with a SIGINT->cancel hook. 

207 

208 asyncio.run() installs its own SIGINT handler that raises 

209 KeyboardInterrupt, which tears the crawl down ungracefully. Registering a 

210 plain signal.signal handler on the main thread AND running the crawl on a 

211 loop we own (instead of asyncio.run) lets Ctrl-C set our threading.Event, 

212 which crawl_recursive polls between pages so it can close the stream and 

213 stop dispatch cleanly. 

214 """ 

215 import signal 

216 

217 previous_handler = signal.getsignal(signal.SIGINT) 

218 

219 def _on_sigint(_signum: int, _frame: object) -> None: 

220 # Set the cancel event that crawl_recursive polls between pages, so 

221 # a Ctrl-C flows through as a clean cancel instead of asyncio.run's 

222 # default KeyboardInterrupt-raising dance. 

223 cancel_event.set() 

224 

225 signal.signal(signal.SIGINT, _on_sigint) 

226 # Manage the event loop explicitly. In the CLI this runs once per process, 

227 # but under pytest-xdist the same worker thread runs many tests; leaving a 

228 # closed loop set as the "current" loop for the thread poisons every later 

229 # asyncio.get_event_loop() call and hangs macOS 3.12/3.13 unit-test CI. 

230 # Always clear the thread-current loop in finally. 

231 loop = asyncio.new_event_loop() 

232 try: 

233 asyncio.set_event_loop(loop) 

234 coro = crawl_and_save( 

235 url, 

236 depth=depth, 

237 max_pages=max_pages, 

238 on_progress=on_progress, 

239 cancel=cancel_event, 

240 quiet=cfg.json_mode, 

241 include_subdomains=include_subdomains, 

242 ) 

243 result: list[Path] = loop.run_until_complete(coro) 

244 return result 

245 finally: 

246 loop.close() 

247 asyncio.set_event_loop(None) 

248 signal.signal(signal.SIGINT, previous_handler) 

249 

250 

251def sync_cmd( 

252 data_dir: Path | None = data_dir_option, 

253 use_global: bool = global_option, 

254 ocr: bool | None = _ocr_option, 

255 ocr_timeout: float | None = _ocr_timeout_option, 

256 retry_skipped: bool = _retry_skipped_option, 

257) -> None: 

258 """Manually trigger document sync.""" 

259 apply_overrides(data_dir=data_dir, use_global=use_global) 

260 _apply_ocr_overrides(ocr, ocr_timeout) 

261 from lilbee.data.ingest import sync 

262 

263 try: 

264 result = asyncio.run(sync(quiet=cfg.json_mode, retry_skipped=retry_skipped)) 

265 except RuntimeError as exc: 

266 if cfg.json_mode: 

267 json_output({"error": str(exc)}) 

268 raise SystemExit(1) from None 

269 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

270 raise SystemExit(1) from None 

271 if cfg.json_mode: 

272 json_output(sync_result_to_json(result)) 

273 return 

274 console.print(result) 

275 

276 

277def rebuild( 

278 data_dir: Path | None = data_dir_option, 

279 use_global: bool = global_option, 

280 ocr: bool | None = _ocr_option, 

281 ocr_timeout: float | None = _ocr_timeout_option, 

282) -> None: 

283 """Nuke the DB and re-ingest everything from documents/.""" 

284 apply_overrides(data_dir=data_dir, use_global=use_global) 

285 _apply_ocr_overrides(ocr, ocr_timeout) 

286 from lilbee.data.ingest import sync 

287 

288 try: 

289 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode)) 

290 except RuntimeError as exc: 

291 if cfg.json_mode: 

292 json_output({"error": str(exc)}) 

293 raise SystemExit(1) from None 

294 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

295 raise SystemExit(1) from None 

296 if cfg.json_mode: 

297 json_output({"command": "rebuild", "ingested": len(result.added)}) 

298 return 

299 console.print(f"Rebuilt: {len(result.added)} documents ingested") 

300 

301 

302def index( 

303 data_dir: Path | None = data_dir_option, 

304 use_global: bool = global_option, 

305) -> None: 

306 """Build the search indexes now (vector ANN + full-text). 

307 

308 Useful before publishing a large index so downloaders get fast search 

309 without waiting for it to build on first query. Forces the vector index 

310 even below the auto-build threshold. 

311 """ 

312 apply_overrides(data_dir=data_dir, use_global=use_global) 

313 store = get_services().store 

314 store.ensure_fts_index() 

315 built = store.ensure_vector_index(force=True) 

316 if cfg.json_mode: 

317 json_output({"command": "index", "vector_index": built}) 

318 return 

319 if built: 

320 console.print("Search indexes built (vector ANN + full-text).") 

321 else: 

322 console.print("Full-text index built; vector index needs more chunks.") 

323 

324 

325def _validate_file_paths(file_paths: list[Path]) -> None: 

326 """Exit on the first missing path; respects ``cfg.json_mode``.""" 

327 for fp in file_paths: 

328 if fp.exists(): 

329 continue 

330 if cfg.json_mode: 

331 json_output({"error": f"Path not found: {fp}"}) 

332 raise SystemExit(1) 

333 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] Path not found: {fp}") 

334 raise SystemExit(1) 

335 

336 

337def _crawl_urls_step( 

338 urls: list[str], 

339 *, 

340 crawl: bool, 

341 depth: int | None, 

342 max_pages: int | None, 

343 include_subdomains: bool, 

344) -> list[Path]: 

345 """Crawl URLs (or fail fast when crawler extra is missing). Returns saved paths.""" 

346 if not urls: 

347 return [] 

348 from lilbee.crawler import crawler_available 

349 

350 if not crawler_available(): 

351 console.print( 

352 f"[{theme.ERROR}]Web crawling requires: pip install 'lilbee[crawler]'[/{theme.ERROR}]" 

353 ) 

354 raise SystemExit(1) 

355 crawled_paths = _crawl_urls_blocking( 

356 urls, 

357 crawl=crawl, 

358 depth=depth, 

359 max_pages=max_pages, 

360 include_subdomains=include_subdomains, 

361 ) 

362 if not cfg.json_mode: 

363 console.print( 

364 f"[{theme.MUTED}]Crawled {len(crawled_paths)} page(s)" 

365 f" from {len(urls)} URL(s)[/{theme.MUTED}]" 

366 ) 

367 return crawled_paths 

368 

369 

370def _add_json_mode(file_paths: list[Path], crawled_paths: list[Path], *, force: bool) -> None: 

371 """Run the JSON-mode finish: copy files, sync, emit one structured result.""" 

372 from lilbee.data.ingest import sync 

373 

374 copy_result = CopyResult() 

375 if file_paths: 

376 copy_result = copy_files(file_paths, force=force) 

377 result = asyncio.run(sync(quiet=True)) 

378 json_output( 

379 { 

380 "command": "add", 

381 "copied": copy_result.copied, 

382 "skipped": copy_result.skipped, 

383 "crawled": len(crawled_paths), 

384 "sync": sync_result_to_json(result), 

385 } 

386 ) 

387 

388 

389def add( 

390 paths: list[str] = _paths_argument, 

391 data_dir: Path | None = data_dir_option, 

392 use_global: bool = global_option, 

393 force: bool = _force_option, 

394 ocr: bool | None = _ocr_option, 

395 ocr_timeout: float | None = _ocr_timeout_option, 

396 crawl: bool = _crawl_option, 

397 depth: int | None = _depth_option, 

398 max_pages: int | None = _max_pages_option, 

399 include_subdomains: bool = _include_subdomains_option, 

400) -> None: 

401 """Copy files or crawl URLs into the knowledge base and ingest them.""" 

402 apply_overrides(data_dir=data_dir, use_global=use_global) 

403 _apply_ocr_overrides(ocr, ocr_timeout) 

404 

405 file_paths, urls = _partition_inputs(paths) 

406 _validate_file_paths(file_paths) 

407 

408 try: 

409 crawled_paths = _crawl_urls_step( 

410 urls, 

411 crawl=crawl, 

412 depth=depth, 

413 max_pages=max_pages, 

414 include_subdomains=include_subdomains, 

415 ) 

416 

417 if cfg.json_mode: 

418 _add_json_mode(file_paths, crawled_paths, force=force) 

419 return 

420 

421 if file_paths: 

422 add_paths(file_paths, console, force=force) 

423 elif urls: 

424 # URLs already saved; just trigger sync 

425 from lilbee.data.ingest import sync 

426 

427 result = asyncio.run(sync()) 

428 console.print(result) 

429 except RuntimeError as exc: 

430 if cfg.json_mode: 

431 json_output({"error": str(exc)}) 

432 raise SystemExit(1) from None 

433 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

434 raise SystemExit(1) from None 

435 

436 

437_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.") 

438 

439 

440def chunks( 

441 source: str = _chunks_source_argument, 

442 data_dir: Path | None = data_dir_option, 

443 use_global: bool = global_option, 

444) -> None: 

445 """Show chunks a document was split into (useful for debugging retrieval).""" 

446 apply_overrides(data_dir=data_dir, use_global=use_global) 

447 

448 store = get_services().store 

449 known = {s["filename"] for s in store.get_sources()} 

450 if source not in known: 

451 if cfg.json_mode: 

452 json_output({"error": f"Source not found: {source}"}) 

453 raise SystemExit(1) 

454 console.print(f"[{theme.ERROR}]Source not found:[/{theme.ERROR}] {source}") 

455 raise SystemExit(1) 

456 

457 raw_chunks = store.get_chunks_by_source(source) 

458 cleaned = sorted( 

459 [clean_result(c) for c in raw_chunks], 

460 key=lambda c: c.get("chunk_index", 0), 

461 ) 

462 

463 if cfg.json_mode: 

464 json_output({"command": "chunks", "source": source, "chunks": cleaned}) 

465 return 

466 

467 console.print( 

468 f"[{theme.LABEL}]{len(cleaned)}[/{theme.LABEL}]" 

469 f" chunks from [{theme.ACCENT}]{source}[/{theme.ACCENT}]\n" 

470 ) 

471 for c in cleaned: 

472 idx = c.get("chunk_index", "?") 

473 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN] 

474 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN: 

475 preview += "..." 

476 console.print(f" [{idx}] {preview}") 

477 

478 

479_remove_names_argument = typer.Argument( 

480 ..., help="Source name(s) to remove from the knowledge base." 

481) 

482 

483_delete_file_option = typer.Option( 

484 False, "--delete", help="Also delete the file from the documents directory." 

485) 

486 

487 

488def remove( 

489 names: list[str] = _remove_names_argument, 

490 data_dir: Path | None = data_dir_option, 

491 use_global: bool = global_option, 

492 delete_file: bool = _delete_file_option, 

493) -> None: 

494 """Remove documents from the knowledge base by source name.""" 

495 apply_overrides(data_dir=data_dir, use_global=use_global) 

496 

497 result = get_services().store.remove_documents( 

498 names, delete_files=delete_file, documents_dir=cfg.documents_dir 

499 ) 

500 

501 if cfg.json_mode: 

502 payload: dict = {"command": "remove", "removed": result.removed} 

503 if result.not_found: 

504 payload["not_found"] = result.not_found 

505 json_output(payload) 

506 if not result.removed and result.not_found: 

507 raise SystemExit(1) 

508 return 

509 

510 for name in result.removed: 

511 console.print(f"Removed [{theme.ACCENT}]{name}[/{theme.ACCENT}]") 

512 for name in result.not_found: 

513 console.print(f"[{theme.ERROR}]Not found:[/{theme.ERROR}] {name}") 

514 if not result.removed and result.not_found: 

515 raise SystemExit(1)