Coverage for src / lilbee / cli / commands / ingest_sync.py: 100%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Sync, rebuild, add, chunks, and remove commands.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import threading 

7from pathlib import Path 

8from typing import TYPE_CHECKING 

9 

10import typer 

11 

12if TYPE_CHECKING: 

13 from collections.abc import Awaitable, Callable 

14 

15 from lilbee.runtime.progress import DetailedProgressCallback 

16 

17from lilbee.app.ingest import CopyResult, copy_files 

18from lilbee.app.search import clean_result 

19from lilbee.app.services import get_services 

20from lilbee.cli import theme 

21from lilbee.cli.app import ( 

22 apply_overrides, 

23 console, 

24 data_dir_option, 

25 global_option, 

26) 

27from lilbee.cli.commands._shared import CHUNK_PREVIEW_LEN 

28from lilbee.cli.helpers import ( 

29 add_paths, 

30 json_output, 

31 sync_result_to_json, 

32) 

33from lilbee.core.config import cfg 

34from lilbee.crawler import is_url 

35 

36_ocr_option = typer.Option(None, "--ocr/--no-ocr", help="Force vision OCR on/off for scanned PDFs.") 

37_retry_skipped_option = typer.Option( 

38 False, 

39 "--retry-skipped", 

40 help="Retry files that were skipped on a previous sync (clears the failed-file markers).", 

41) 

42_ocr_timeout_option = typer.Option( 

43 None, 

44 "--ocr-timeout", 

45 help="Per-page timeout in seconds for vision OCR (default: 120, 0 = no limit).", 

46) 

47 

48 

49def _apply_ocr_overrides(ocr: bool | None, ocr_timeout: float | None) -> None: 

50 """Apply --ocr/--no-ocr and --ocr-timeout CLI overrides to config.""" 

51 if ocr is not None: 

52 cfg.enable_ocr = ocr 

53 if ocr_timeout is not None: 

54 cfg.ocr_timeout = ocr_timeout 

55 

56 

57_paths_argument = typer.Argument( 

58 ..., 

59 help="Files, directories, or URLs to add to the knowledge base.", 

60) 

61 

62_force_option = typer.Option(False, "--force", "-f", help="Overwrite existing files.") 

63_crawl_option = typer.Option( 

64 False, 

65 "--crawl", 

66 help="Recursively crawl URLs (whole site by default; see --depth and --max-pages).", 

67) 

68_depth_option = typer.Option( 

69 None, 

70 "--depth", 

71 help="Cap link-follow depth for --crawl. Unset = unbounded; 0 = single URL only.", 

72) 

73_max_pages_option = typer.Option( 

74 None, 

75 "--max-pages", 

76 help="Cap total pages for --crawl. Unset = no limit; positive int = hard cap.", 

77) 

78_include_subdomains_option = typer.Option( 

79 False, 

80 "--include-subdomains", 

81 help=( 

82 "Allow --crawl to follow links into sibling subdomains of the start " 

83 "host (e.g. en.wikipedia.org plus af.wikipedia.org). Default scopes " 

84 "the crawl to the exact start host only." 

85 ), 

86) 

87 

88 

89def _partition_inputs(inputs: list[str]) -> tuple[list[Path], list[str]]: 

90 """Split inputs into file paths and URLs.""" 

91 paths: list[Path] = [] 

92 urls: list[str] = [] 

93 for inp in inputs: 

94 if is_url(inp): 

95 urls.append(inp) 

96 else: 

97 paths.append(Path(inp)) 

98 return paths, urls 

99 

100 

101def _crawl_urls_blocking( 

102 urls: list[str], 

103 *, 

104 crawl: bool, 

105 depth: int | None, 

106 max_pages: int | None, 

107 include_subdomains: bool = False, 

108) -> list[Path]: 

109 """Crawl URLs synchronously (for CLI), returning paths written. 

110 

111 Without --crawl, each URL is fetched as a single page (depth=0). 

112 With --crawl, the default is whole-site unbounded (depth=None, pages=None). 

113 Explicit --depth / --max-pages override both. 

114 

115 Ctrl-C is handled by running the crawl through _run_crawl_with_signal_cancel, 

116 which installs a signal.signal handler that sets a threading.Event passed 

117 into crawl_and_save. crawl_recursive polls the event between pages so the 

118 signal flows through as a clean cancel instead of asyncio.run's default 

119 KeyboardInterrupt-raising (which left browser contexts mid-teardown). 

120 """ 

121 from rich.progress import Progress, SpinnerColumn, TaskID, TextColumn 

122 

123 from lilbee.crawler import crawl_and_save 

124 from lilbee.runtime.progress import ( 

125 CrawlPageEvent, 

126 EventType, 

127 ProgressEvent, 

128 ) 

129 

130 if crawl: 

131 effective_depth = depth 

132 effective_pages = max_pages 

133 else: 

134 effective_depth = 0 

135 effective_pages = None 

136 

137 cancel_event = threading.Event() 

138 

139 from rich.console import Console as RichConsole 

140 

141 err_console = RichConsole(stderr=True) 

142 all_paths: list[Path] = [] 

143 with Progress( 

144 SpinnerColumn(), 

145 TextColumn("{task.description}"), 

146 transient=True, 

147 console=err_console, 

148 disable=cfg.json_mode, 

149 ) as progress: 

150 for url in urls: 

151 if cancel_event.is_set(): 

152 break 

153 ptask = progress.add_task(f"Crawling {url}...", total=None) 

154 

155 def _make_callback(_t: TaskID = ptask) -> DetailedProgressCallback: 

156 def on_progress(event_type: EventType, data: ProgressEvent) -> None: 

157 if event_type == EventType.CRAWL_PAGE: 

158 if not isinstance(data, CrawlPageEvent): 

159 raise TypeError(f"Expected CrawlPageEvent, got {type(data).__name__}") 

160 total_str = str(data.total) if data.total > 0 else "?" 

161 progress.update( 

162 _t, 

163 description=f"Crawled {data.current}/{total_str}: {data.url}", 

164 ) 

165 

166 return on_progress 

167 

168 paths = _run_crawl_with_signal_cancel( 

169 url, 

170 depth=effective_depth, 

171 max_pages=effective_pages, 

172 on_progress=_make_callback(), 

173 cancel_event=cancel_event, 

174 crawl_and_save=crawl_and_save, 

175 include_subdomains=include_subdomains, 

176 ) 

177 all_paths.extend(paths) 

178 progress.update(ptask, description=f"Done: {url} ({len(paths)} pages)") 

179 return all_paths 

180 

181 

182def _run_crawl_with_signal_cancel( 

183 url: str, 

184 *, 

185 depth: int | None, 

186 max_pages: int | None, 

187 on_progress: DetailedProgressCallback, 

188 cancel_event: threading.Event, 

189 crawl_and_save: Callable[..., Awaitable[list[Path]]], 

190 include_subdomains: bool = False, 

191) -> list[Path]: 

192 """Run crawl_and_save on a dedicated event loop with a SIGINT->cancel hook. 

193 

194 asyncio.run() installs its own SIGINT handler that raises 

195 KeyboardInterrupt, which tears the crawl down ungracefully. Registering a 

196 plain signal.signal handler on the main thread AND running the crawl on a 

197 loop we own (instead of asyncio.run) lets Ctrl-C set our threading.Event, 

198 which crawl_recursive polls between pages so it can close the stream and 

199 stop dispatch cleanly. 

200 """ 

201 import signal 

202 

203 previous_handler = signal.getsignal(signal.SIGINT) 

204 

205 def _on_sigint(_signum: int, _frame: object) -> None: 

206 # Set the cancel event that crawl_recursive polls between pages, so 

207 # a Ctrl-C flows through as a clean cancel instead of asyncio.run's 

208 # default KeyboardInterrupt-raising dance. 

209 cancel_event.set() 

210 

211 signal.signal(signal.SIGINT, _on_sigint) 

212 # Manage the event loop explicitly. In the CLI this runs once per process, 

213 # but under pytest-xdist the same worker thread runs many tests; leaving a 

214 # closed loop set as the "current" loop for the thread poisons every later 

215 # asyncio.get_event_loop() call and hangs macOS 3.12/3.13 unit-test CI. 

216 # Always clear the thread-current loop in finally. 

217 loop = asyncio.new_event_loop() 

218 try: 

219 asyncio.set_event_loop(loop) 

220 coro = crawl_and_save( 

221 url, 

222 depth=depth, 

223 max_pages=max_pages, 

224 on_progress=on_progress, 

225 cancel=cancel_event, 

226 quiet=cfg.json_mode, 

227 include_subdomains=include_subdomains, 

228 ) 

229 result: list[Path] = loop.run_until_complete(coro) 

230 return result 

231 finally: 

232 loop.close() 

233 asyncio.set_event_loop(None) 

234 signal.signal(signal.SIGINT, previous_handler) 

235 

236 

237def sync_cmd( 

238 data_dir: Path | None = data_dir_option, 

239 use_global: bool = global_option, 

240 ocr: bool | None = _ocr_option, 

241 ocr_timeout: float | None = _ocr_timeout_option, 

242 retry_skipped: bool = _retry_skipped_option, 

243) -> None: 

244 """Manually trigger document sync.""" 

245 apply_overrides(data_dir=data_dir, use_global=use_global) 

246 _apply_ocr_overrides(ocr, ocr_timeout) 

247 from lilbee.data.ingest import sync 

248 

249 try: 

250 result = asyncio.run(sync(quiet=cfg.json_mode, retry_skipped=retry_skipped)) 

251 except RuntimeError as exc: 

252 if cfg.json_mode: 

253 json_output({"error": str(exc)}) 

254 raise SystemExit(1) from None 

255 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

256 raise SystemExit(1) from None 

257 if cfg.json_mode: 

258 json_output(sync_result_to_json(result)) 

259 return 

260 console.print(result) 

261 

262 

263def rebuild( 

264 data_dir: Path | None = data_dir_option, 

265 use_global: bool = global_option, 

266 ocr: bool | None = _ocr_option, 

267 ocr_timeout: float | None = _ocr_timeout_option, 

268) -> None: 

269 """Nuke the DB and re-ingest everything from documents/.""" 

270 apply_overrides(data_dir=data_dir, use_global=use_global) 

271 _apply_ocr_overrides(ocr, ocr_timeout) 

272 from lilbee.data.ingest import sync 

273 

274 try: 

275 result = asyncio.run(sync(force_rebuild=True, quiet=cfg.json_mode)) 

276 except RuntimeError as exc: 

277 if cfg.json_mode: 

278 json_output({"error": str(exc)}) 

279 raise SystemExit(1) from None 

280 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

281 raise SystemExit(1) from None 

282 if cfg.json_mode: 

283 json_output({"command": "rebuild", "ingested": len(result.added)}) 

284 return 

285 console.print(f"Rebuilt: {len(result.added)} documents ingested") 

286 

287 

288def _validate_file_paths(file_paths: list[Path]) -> None: 

289 """Exit on the first missing path; respects ``cfg.json_mode``.""" 

290 for fp in file_paths: 

291 if fp.exists(): 

292 continue 

293 if cfg.json_mode: 

294 json_output({"error": f"Path not found: {fp}"}) 

295 raise SystemExit(1) 

296 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] Path not found: {fp}") 

297 raise SystemExit(1) 

298 

299 

300def _crawl_urls_step( 

301 urls: list[str], 

302 *, 

303 crawl: bool, 

304 depth: int | None, 

305 max_pages: int | None, 

306 include_subdomains: bool, 

307) -> list[Path]: 

308 """Crawl URLs (or fail fast when crawler extra is missing). Returns saved paths.""" 

309 if not urls: 

310 return [] 

311 from lilbee.crawler import crawler_available 

312 

313 if not crawler_available(): 

314 console.print( 

315 f"[{theme.ERROR}]Web crawling requires: pip install 'lilbee[crawler]'[/{theme.ERROR}]" 

316 ) 

317 raise SystemExit(1) 

318 crawled_paths = _crawl_urls_blocking( 

319 urls, 

320 crawl=crawl, 

321 depth=depth, 

322 max_pages=max_pages, 

323 include_subdomains=include_subdomains, 

324 ) 

325 if not cfg.json_mode: 

326 console.print( 

327 f"[{theme.MUTED}]Crawled {len(crawled_paths)} page(s)" 

328 f" from {len(urls)} URL(s)[/{theme.MUTED}]" 

329 ) 

330 return crawled_paths 

331 

332 

333def _add_json_mode(file_paths: list[Path], crawled_paths: list[Path], *, force: bool) -> None: 

334 """Run the JSON-mode finish: copy files, sync, emit one structured result.""" 

335 from lilbee.data.ingest import sync 

336 

337 copy_result = CopyResult() 

338 if file_paths: 

339 copy_result = copy_files(file_paths, force=force) 

340 result = asyncio.run(sync(quiet=True)) 

341 json_output( 

342 { 

343 "command": "add", 

344 "copied": copy_result.copied, 

345 "skipped": copy_result.skipped, 

346 "crawled": len(crawled_paths), 

347 "sync": sync_result_to_json(result), 

348 } 

349 ) 

350 

351 

352def add( 

353 paths: list[str] = _paths_argument, 

354 data_dir: Path | None = data_dir_option, 

355 use_global: bool = global_option, 

356 force: bool = _force_option, 

357 ocr: bool | None = _ocr_option, 

358 ocr_timeout: float | None = _ocr_timeout_option, 

359 crawl: bool = _crawl_option, 

360 depth: int | None = _depth_option, 

361 max_pages: int | None = _max_pages_option, 

362 include_subdomains: bool = _include_subdomains_option, 

363) -> None: 

364 """Copy files or crawl URLs into the knowledge base and ingest them.""" 

365 apply_overrides(data_dir=data_dir, use_global=use_global) 

366 _apply_ocr_overrides(ocr, ocr_timeout) 

367 

368 file_paths, urls = _partition_inputs(paths) 

369 _validate_file_paths(file_paths) 

370 

371 try: 

372 crawled_paths = _crawl_urls_step( 

373 urls, 

374 crawl=crawl, 

375 depth=depth, 

376 max_pages=max_pages, 

377 include_subdomains=include_subdomains, 

378 ) 

379 

380 if cfg.json_mode: 

381 _add_json_mode(file_paths, crawled_paths, force=force) 

382 return 

383 

384 if file_paths: 

385 add_paths(file_paths, console, force=force) 

386 elif urls: 

387 # URLs already saved; just trigger sync 

388 from lilbee.data.ingest import sync 

389 

390 result = asyncio.run(sync()) 

391 console.print(result) 

392 except RuntimeError as exc: 

393 if cfg.json_mode: 

394 json_output({"error": str(exc)}) 

395 raise SystemExit(1) from None 

396 console.print(f"[{theme.ERROR}]Error:[/{theme.ERROR}] {exc}") 

397 raise SystemExit(1) from None 

398 

399 

400_chunks_source_argument = typer.Argument(..., help="Source name to inspect chunks for.") 

401 

402 

403def chunks( 

404 source: str = _chunks_source_argument, 

405 data_dir: Path | None = data_dir_option, 

406 use_global: bool = global_option, 

407) -> None: 

408 """Show chunks a document was split into (useful for debugging retrieval).""" 

409 apply_overrides(data_dir=data_dir, use_global=use_global) 

410 

411 store = get_services().store 

412 known = {s["filename"] for s in store.get_sources()} 

413 if source not in known: 

414 if cfg.json_mode: 

415 json_output({"error": f"Source not found: {source}"}) 

416 raise SystemExit(1) 

417 console.print(f"[{theme.ERROR}]Source not found:[/{theme.ERROR}] {source}") 

418 raise SystemExit(1) 

419 

420 raw_chunks = store.get_chunks_by_source(source) 

421 cleaned = sorted( 

422 [clean_result(c) for c in raw_chunks], 

423 key=lambda c: c.get("chunk_index", 0), 

424 ) 

425 

426 if cfg.json_mode: 

427 json_output({"command": "chunks", "source": source, "chunks": cleaned}) 

428 return 

429 

430 console.print( 

431 f"[{theme.LABEL}]{len(cleaned)}[/{theme.LABEL}]" 

432 f" chunks from [{theme.ACCENT}]{source}[/{theme.ACCENT}]\n" 

433 ) 

434 for c in cleaned: 

435 idx = c.get("chunk_index", "?") 

436 preview = c.get("chunk", "")[:CHUNK_PREVIEW_LEN] 

437 if len(c.get("chunk", "")) > CHUNK_PREVIEW_LEN: 

438 preview += "..." 

439 console.print(f" [{idx}] {preview}") 

440 

441 

442_remove_names_argument = typer.Argument( 

443 ..., help="Source name(s) to remove from the knowledge base." 

444) 

445 

446_delete_file_option = typer.Option( 

447 False, "--delete", help="Also delete the file from the documents directory." 

448) 

449 

450 

451def remove( 

452 names: list[str] = _remove_names_argument, 

453 data_dir: Path | None = data_dir_option, 

454 use_global: bool = global_option, 

455 delete_file: bool = _delete_file_option, 

456) -> None: 

457 """Remove documents from the knowledge base by source name.""" 

458 apply_overrides(data_dir=data_dir, use_global=use_global) 

459 

460 result = get_services().store.remove_documents( 

461 names, delete_files=delete_file, documents_dir=cfg.documents_dir 

462 ) 

463 

464 if cfg.json_mode: 

465 payload: dict = {"command": "remove", "removed": result.removed} 

466 if result.not_found: 

467 payload["not_found"] = result.not_found 

468 json_output(payload) 

469 if not result.removed and result.not_found: 

470 raise SystemExit(1) 

471 return 

472 

473 for name in result.removed: 

474 console.print(f"Removed [{theme.ACCENT}]{name}[/{theme.ACCENT}]") 

475 for name in result.not_found: 

476 console.print(f"[{theme.ERROR}]Not found:[/{theme.ERROR}] {name}") 

477 if not result.removed and result.not_found: 

478 raise SystemExit(1)