Coverage for src / lilbee / mcp_server.py: 100%

278 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""MCP server exposing lilbee as tools for AI agents. 

2 

3Tool handler bodies use function-local ``from lilbee.X import ...`` to keep 

4``lilbee mcp`` boot fast (the same startup discipline AGENTS.md mandates for 

5Typer command bodies). Heavy chains pulled in lazily here: 

6``data.ingest`` / ``wiki.*`` / ``wiki.drafts`` (spaCy via the wiki ingest 

7pipeline), ``crawler`` (crawl4ai + Playwright), ``app.models`` / 

8``modelhub.model_manager`` / ``catalog`` (HF discovery + `huggingface_hub`). 

9``app.ingest`` / ``app.reset`` are individually light but transitively reach 

10``data.ingest`` via the runtime sync handlers, so they share the policy. 

11""" 

12 

13from __future__ import annotations 

14 

15import asyncio 

16import concurrent.futures 

17import logging 

18import os 

19from pathlib import Path 

20from typing import Any 

21 

22from mcp.server.fastmcp import Context, FastMCP 

23 

24from lilbee.app.search import clean_result 

25from lilbee.app.services import get_services, reset_services, reset_store 

26from lilbee.core.config import cfg 

27from lilbee.core.settings import overlay_persisted_settings 

28from lilbee.core.system import LOCAL_ROOT_DIRNAME 

29from lilbee.crawler import is_url, require_valid_crawl_url 

30from lilbee.crawler.task import get_task, start_crawl 

31from lilbee.data.store import SearchScope, scope_to_chunk_type 

32from lilbee.wiki.shared import ( 

33 WIKI_DISABLED_ERROR, 

34 WikiSubdir, 

35) 

36 

37log = logging.getLogger(__name__) 

38 

39mcp = FastMCP("lilbee", instructions="Local RAG knowledge base. Search indexed documents.") 

40 

41 

42def _error(msg: str) -> dict[str, Any]: 

43 """Uniform error envelope MCP tool handlers return on a failure path. 

44 

45 Typed as ``dict[str, Any]`` rather than a TypedDict so it composes 

46 with the success-side returns under the existing handler signatures 

47 without forcing every caller to widen its return type. 

48 """ 

49 return {"error": msg} 

50 

51 

52@mcp.tool() 

53def search( 

54 query: str, top_k: int = 5, scope: str = SearchScope.BOTH.value 

55) -> list[dict[str, Any]] | dict[str, Any]: 

56 """Search the knowledge base for relevant document chunks. 

57 

58 ``scope`` picks the pool: ``"raw"`` (source chunks), ``"wiki"`` (wiki 

59 page bodies), or ``"both"`` (default, unfiltered). Returns chunks 

60 sorted by relevance. No LLM call -- uses pre-computed embeddings. 

61 """ 

62 if not query or not query.strip(): 

63 return _error("query must not be empty") 

64 try: 

65 chunk_type = scope_to_chunk_type(scope) 

66 except ValueError as exc: 

67 return _error(str(exc)) 

68 try: 

69 results = get_services().searcher.search(query, top_k=top_k, chunk_type=chunk_type) 

70 results = [r for r in results if r.distance is None or r.distance <= cfg.max_distance] 

71 return [clean_result(r) for r in results] 

72 except Exception as exc: 

73 return _error(str(exc)) 

74 

75 

76@mcp.tool() 

77def status() -> dict[str, Any]: 

78 """Show indexed documents, configuration, and chunk counts.""" 

79 sources = get_services().store.get_sources() 

80 return { 

81 "config": { 

82 "documents_dir": str(cfg.documents_dir), 

83 "data_dir": str(cfg.data_dir), 

84 "chat_model": cfg.chat_model, 

85 "embedding_model": cfg.embedding_model, 

86 "vision_model": cfg.vision_model, 

87 "reranker_model": cfg.reranker_model, 

88 "enable_ocr": cfg.enable_ocr, 

89 "num_ctx": cfg.num_ctx, 

90 "num_ctx_max": cfg.num_ctx_max, 

91 "flash_attention": cfg.flash_attention, 

92 "kv_cache_type": cfg.kv_cache_type.value, 

93 "n_gpu_layers": cfg.n_gpu_layers, 

94 "main_gpu": cfg.main_gpu, 

95 "gpu_devices": cfg.gpu_devices, 

96 }, 

97 "sources": [ 

98 {"filename": s["filename"], "chunk_count": s["chunk_count"]} 

99 for s in sorted(sources, key=lambda x: x["filename"]) 

100 ], 

101 "total_chunks": sum(s["chunk_count"] for s in sources), 

102 } 

103 

104 

105@mcp.tool() 

106async def sync(force_rebuild: bool = False, retry_skipped: bool = False) -> dict[str, Any]: 

107 """Sync documents directory with the vector store. 

108 

109 Args: 

110 force_rebuild: Drop every table and re-ingest from scratch (equivalent 

111 to ``lilbee rebuild``). Also clears the failed-file skip markers. 

112 retry_skipped: Clear the failed-file skip markers so files that were 

113 skipped on a previous sync get another attempt, without dropping 

114 the store. 

115 """ 

116 from lilbee.data.ingest import sync as run_sync 

117 

118 return ( 

119 await run_sync(quiet=True, force_rebuild=force_rebuild, retry_skipped=retry_skipped) 

120 ).model_dump() 

121 

122 

123@mcp.tool() 

124async def add( 

125 paths: list[str], 

126 force: bool = False, 

127 enable_ocr: bool | None = None, 

128 ocr_timeout: float | None = None, 

129) -> dict[str, Any]: 

130 """Add files, directories, or URLs to the knowledge base and sync. 

131 Copies the given paths into the documents directory, then ingests them. 

132 URLs (http:// or https://) are fetched as markdown and saved to _web/. 

133 Paths must be absolute and accessible from this machine. 

134 

135 Args: 

136 paths: Absolute file/directory paths or URLs to add. 

137 force: Overwrite files that already exist in the knowledge base. 

138 enable_ocr: Force vision OCR on (True), off (False), or auto-detect 

139 from chat model capabilities (None/omit). 

140 ocr_timeout: Per-page timeout in seconds for vision OCR. Overrides 

141 the configured default for this invocation only. 

142 """ 

143 from lilbee.app.ingest import copy_files 

144 from lilbee.data.ingest import sync as run_sync 

145 

146 errors: list[str] = [] 

147 valid: list[Path] = [] 

148 urls: list[str] = [] 

149 for p_str in paths: 

150 if is_url(p_str): 

151 urls.append(p_str) 

152 else: 

153 p = Path(p_str) 

154 if not p.exists(): 

155 errors.append(p_str) 

156 else: 

157 valid.append(p) 

158 

159 # Crawl URLs 

160 crawled_count = 0 

161 if urls: 

162 from lilbee.crawler import crawler_available 

163 

164 if not crawler_available(): 

165 return _error("Web crawling requires: pip install 'lilbee[crawler]'") 

166 from lilbee.crawler import crawl_and_save 

167 

168 for url in urls: 

169 try: 

170 require_valid_crawl_url(url) 

171 except ValueError as exc: 

172 errors.append(f"{url}: {exc}") 

173 continue 

174 crawled_paths = await crawl_and_save(url) 

175 crawled_count += len(crawled_paths) 

176 

177 copy_result = copy_files(valid, force=force) 

178 

179 from lilbee.app.ingest import temporary_ocr_config 

180 

181 with temporary_ocr_config(enable_ocr, ocr_timeout): 

182 sync_result = (await run_sync(quiet=True)).model_dump() 

183 

184 result: dict[str, Any] = { 

185 "command": "add", 

186 "copied": copy_result.copied, 

187 "skipped": copy_result.skipped, 

188 "crawled": crawled_count, 

189 "errors": errors, 

190 "sync": sync_result, 

191 } 

192 if errors or sync_result.get("failed"): 

193 result["warning"] = "some files could not be processed" 

194 return result 

195 

196 

197@mcp.tool() 

198def crawl( 

199 url: str, 

200 depth: int | None = None, 

201 max_pages: int | None = None, 

202) -> dict[str, Any]: 

203 """Crawl a web page and add it to the knowledge base (non-blocking). 

204 Launches the crawl as a background task and returns immediately with a 

205 task_id. Use crawl_status(task_id) to poll progress. 

206 

207 Args: 

208 url: The URL to crawl (must start with http:// or https://). 

209 depth: None (default) crawls the whole site; 0 fetches only this URL; 

210 positive int caps link-follow depth. 

211 max_pages: None (default) means no page limit. Positive int caps total 

212 pages fetched. 

213 """ 

214 from lilbee.crawler import crawler_available 

215 

216 if not crawler_available(): 

217 return _error("Web crawling requires: pip install 'lilbee[crawler]'") 

218 try: 

219 require_valid_crawl_url(url) 

220 except ValueError as exc: 

221 return _error(str(exc)) 

222 

223 task_id = start_crawl(url, depth=depth, max_pages=max_pages) 

224 return {"status": "started", "task_id": task_id, "url": url} 

225 

226 

227@mcp.tool() 

228def crawl_status(task_id: str) -> dict[str, Any]: 

229 """Check the status of a running crawl task. 

230 Returns the current state including status, pages crawled, and any error. 

231 Use this to poll after crawl returns a task_id. 

232 

233 Args: 

234 task_id: The task ID returned by crawl. 

235 """ 

236 task = get_task(task_id) 

237 if task is None: 

238 return _error(f"No task found with id: {task_id}") 

239 return { 

240 "task_id": task.task_id, 

241 "url": task.url, 

242 "status": task.status.value, 

243 "pages_crawled": task.pages_crawled, 

244 "pages_total": task.pages_total, 

245 "error": task.error, 

246 "started_at": task.started_at, 

247 "finished_at": task.finished_at, 

248 } 

249 

250 

251@mcp.tool() 

252def init(path: str = "") -> dict[str, Any]: 

253 """Initialize a local .lilbee/ knowledge base in a directory. 

254 Creates .lilbee/ with documents/, data/, and .gitignore. 

255 If path is empty, uses the current working directory. 

256 Also switches the MCP session to use this knowledge base for 

257 subsequent tool calls. 

258 """ 

259 base = Path(path) if path else Path.cwd() 

260 root = base / LOCAL_ROOT_DIRNAME 

261 

262 created = False 

263 if not root.is_dir(): 

264 (root / "documents").mkdir(parents=True) 

265 (root / "data").mkdir(parents=True) 

266 (root / ".gitignore").write_text("data/\n") 

267 created = True 

268 

269 # Switch MCP session to this project's KB. Overlay any persisted 

270 # config.toml so per-vault model / generation settings take effect, 

271 # matching the CLI's --data-dir behaviour. Env export mirrors 

272 # cli/app.py::_apply_data_root for worker-log parity. 

273 cfg.data_root = base 

274 cfg.documents_dir = root / "documents" 

275 cfg.data_dir = root / "data" 

276 cfg.lancedb_dir = root / "data" / "lancedb" 

277 os.environ["LILBEE_DATA"] = str(base) 

278 overlay_persisted_settings(base) 

279 reset_services() 

280 

281 return {"command": "init", "path": str(root), "created": created} 

282 

283 

284@mcp.tool() 

285def remove(names: list[str], delete_files: bool = False) -> dict[str, Any]: 

286 """Remove documents from the knowledge base by source name. 

287 Args: 

288 names: Source filenames to remove (as shown by status). 

289 delete_files: Also delete the physical files from the documents directory. 

290 """ 

291 result = get_services().store.remove_documents( 

292 names, delete_files=delete_files, documents_dir=cfg.documents_dir 

293 ) 

294 return {"command": "remove", "removed": result.removed, "not_found": result.not_found} 

295 

296 

297@mcp.tool() 

298def list_documents() -> dict[str, Any]: 

299 """List all indexed documents with their chunk counts.""" 

300 sources = get_services().store.get_sources() 

301 return { 

302 "documents": [ 

303 {"filename": s["filename"], "chunk_count": s.get("chunk_count", 0)} for s in sources 

304 ], 

305 "total": len(sources), 

306 } 

307 

308 

309@mcp.tool() 

310def reset(confirm: bool = False) -> dict[str, Any]: 

311 """Delete all documents and data (full factory reset). 

312 WARNING: This permanently removes all indexed documents and vector data. 

313 Pass confirm=true to proceed. 

314 """ 

315 if not confirm: 

316 return _error("pass confirm=true to confirm deletion") 

317 from lilbee.app.reset import perform_reset 

318 

319 result = perform_reset().model_dump() 

320 # Reopen LanceDB against the empty data dir; keep providers loaded. 

321 reset_store() 

322 return result 

323 

324 

325@mcp.tool() 

326def wiki_lint(wiki_source: str = "") -> dict[str, Any]: 

327 """Lint wiki pages for citation staleness, missing sources, and unmarked claims. 

328 If wiki_source is provided, lint only that page. Otherwise, lint all wiki pages. 

329 

330 Args: 

331 wiki_source: Path like "wiki/summaries/doc.md". Empty = lint all. 

332 """ 

333 from lilbee.wiki.lint import lint_all, lint_wiki_page 

334 

335 store = get_services().store 

336 if wiki_source: 

337 issues = lint_wiki_page(wiki_source, store) 

338 else: 

339 report = lint_all(store) 

340 issues = report.issues 

341 return { 

342 "command": "wiki_lint", 

343 "issues": [i.to_dict() for i in issues], 

344 "total": len(issues), 

345 } 

346 

347 

348@mcp.tool() 

349def wiki_citations(wiki_source: str) -> dict[str, Any]: 

350 """Get all citations for a wiki page. 

351 Args: 

352 wiki_source: Wiki page path, e.g. "wiki/summaries/doc.md". 

353 """ 

354 records = get_services().store.get_citations_for_wiki(wiki_source) 

355 return { 

356 "command": "wiki_citations", 

357 "wiki_source": wiki_source, 

358 "citations": [dict(r) for r in records], 

359 "total": len(records), 

360 } 

361 

362 

363@mcp.tool() 

364def wiki_status() -> dict[str, Any]: 

365 """Show wiki layer status: page counts, recent lint issues.""" 

366 from lilbee.wiki.lint import lint_all 

367 

368 wiki_root = cfg.data_root / cfg.wiki_dir 

369 if not wiki_root.exists(): 

370 return {"wiki_enabled": cfg.wiki, "pages": 0, "issues": 0} 

371 

372 summaries_dir = wiki_root / WikiSubdir.SUMMARIES 

373 drafts_dir = wiki_root / WikiSubdir.DRAFTS 

374 summaries = list(summaries_dir.rglob("*.md")) if summaries_dir.exists() else [] 

375 drafts = list(drafts_dir.rglob("*.md")) if drafts_dir.exists() else [] 

376 

377 report = lint_all(get_services().store) 

378 return { 

379 "wiki_enabled": cfg.wiki, 

380 WikiSubdir.SUMMARIES: len(summaries), 

381 WikiSubdir.DRAFTS: len(drafts), 

382 "pages": len(summaries) + len(drafts), 

383 "lint_errors": report.error_count, 

384 "lint_warnings": report.warning_count, 

385 } 

386 

387 

388@mcp.tool() 

389def wiki_list() -> dict[str, Any]: 

390 """List all wiki pages (summaries and concepts) with metadata. 

391 Returns page slugs, titles, types, source counts, and creation dates. 

392 """ 

393 if not cfg.wiki: 

394 return _error(WIKI_DISABLED_ERROR) 

395 from dataclasses import asdict 

396 

397 from lilbee.wiki.browse import list_pages 

398 

399 wiki_root = cfg.data_root / cfg.wiki_dir 

400 pages = list_pages(wiki_root) 

401 return { 

402 "command": "wiki_list", 

403 "pages": [asdict(p) for p in pages], 

404 "total": len(pages), 

405 } 

406 

407 

408@mcp.tool() 

409def wiki_read(slug: str) -> dict[str, Any]: 

410 """Read a wiki page's content and frontmatter by slug. 

411 Args: 

412 slug: Page slug like "summaries/my-doc" or "concepts/typing". 

413 """ 

414 if not cfg.wiki: 

415 return _error(WIKI_DISABLED_ERROR) 

416 from dataclasses import asdict 

417 

418 from lilbee.wiki.browse import read_page 

419 

420 wiki_root = cfg.data_root / cfg.wiki_dir 

421 result = read_page(wiki_root, slug) 

422 if result is None: 

423 return _error(f"wiki page not found: {slug}") 

424 return {"command": "wiki_read", **asdict(result)} 

425 

426 

427@mcp.tool() 

428def wiki_build() -> dict[str, Any]: 

429 """Build the concept and entity wiki across all ingested sources. 

430 

431 Returns ``{paths, entities, count}``. 

432 """ 

433 if not cfg.wiki: 

434 return _error(WIKI_DISABLED_ERROR) 

435 from lilbee.wiki import run_full_build 

436 

437 return {"command": "wiki_build", **run_full_build(cfg)} 

438 

439 

440@mcp.tool() 

441def wiki_update() -> dict[str, Any]: 

442 """Refresh the concept and entity wiki after an ingest. Currently a full rebuild.""" 

443 if not cfg.wiki: 

444 return _error(WIKI_DISABLED_ERROR) 

445 from lilbee.wiki import run_full_build 

446 

447 return {"command": "wiki_update", **run_full_build(cfg)} 

448 

449 

450@mcp.tool() 

451def wiki_synthesize() -> dict[str, Any]: 

452 """Generate synthesis pages for concept clusters spanning three or more sources. 

453 

454 Returns the list of synthesis page paths written to disk. When no 

455 cluster meets the 3+ source threshold, returns an empty list and 

456 ``count: 0``. 

457 """ 

458 if not cfg.wiki: 

459 return _error(WIKI_DISABLED_ERROR) 

460 from lilbee.wiki import run_full_synthesize 

461 

462 return {"command": "wiki_synthesize", **run_full_synthesize(cfg)} 

463 

464 

465@mcp.tool() 

466def wiki_prune() -> dict[str, Any]: 

467 """Prune stale and orphaned wiki pages. 

468 Archives pages whose sources are all deleted or whose concept cluster 

469 dropped below 3 live sources. Flags pages with >50% stale citations 

470 for regeneration. 

471 """ 

472 from lilbee.wiki.prune import prune_wiki 

473 

474 report = prune_wiki(get_services().store) 

475 return { 

476 "command": "wiki_prune", 

477 "records": [r.to_dict() for r in report.records], 

478 "archived": report.archived_count, 

479 "flagged": report.flagged_count, 

480 } 

481 

482 

483@mcp.tool() 

484def model_list(source: str = "", task: str = "") -> dict[str, Any]: 

485 """List installed models across native and SDK-backend sources. 

486 

487 Args: 

488 source: Filter by source: "native", "remote", or "" for all. 

489 task: Filter by task: "chat", "embedding", "vision", "rerank", or "" for all. 

490 """ 

491 from lilbee.app.models import list_models_data 

492 from lilbee.catalog.types import ModelSource, ModelTask 

493 

494 try: 

495 src = ModelSource.parse(source) 

496 except ValueError as exc: 

497 return _error(str(exc)) 

498 try: 

499 parsed_task = ModelTask(task) if task else None 

500 except ValueError as exc: 

501 return _error(str(exc)) 

502 return list_models_data(source=src, task=parsed_task).model_dump() 

503 

504 

505@mcp.tool() 

506def model_show(model: str) -> dict[str, Any]: 

507 """Show catalog and installed metadata for a model ref.""" 

508 from lilbee.app.models import show_model_data 

509 from lilbee.modelhub.model_manager import ModelNotFoundError 

510 

511 try: 

512 return show_model_data(model).model_dump() 

513 except ModelNotFoundError as exc: 

514 return _error(str(exc)) 

515 

516 

517def _log_progress_failure(future: concurrent.futures.Future[None]) -> None: 

518 """Log report_progress failures without raising. 

519 

520 Progress notifications are best-effort: a failure should not abort 

521 an in-flight pull. 

522 """ 

523 try: 

524 future.result() 

525 except Exception: 

526 log.warning("MCP report_progress failed", exc_info=True) 

527 

528 

529@mcp.tool() 

530async def model_pull( 

531 model: str, 

532 source: str = "native", 

533 ctx: Context | None = None, 

534) -> dict[str, Any]: 

535 """Download a model, streaming progress via MCP notifications. 

536 

537 Args: 

538 model: Model ref to pull (e.g. "Qwen/Qwen3-0.6B-GGUF" or 

539 "Qwen/Qwen3-0.6B-GGUF/Qwen3-0.6B-Q8_0.gguf"). 

540 source: "native" (HuggingFace GGUF) or "remote" (SDK-managed). 

541 """ 

542 from lilbee.app.models import pull_model_data 

543 from lilbee.catalog import DownloadProgress 

544 from lilbee.catalog.types import ModelSource 

545 

546 try: 

547 src = ModelSource.parse(source) or ModelSource.NATIVE 

548 except ValueError as exc: 

549 return _error(str(exc)) 

550 

551 loop = asyncio.get_running_loop() 

552 

553 def on_update(p: DownloadProgress) -> None: 

554 if ctx is None: 

555 return 

556 future = asyncio.run_coroutine_threadsafe( 

557 ctx.report_progress(progress=float(p.percent), total=100.0, message=p.detail), 

558 loop, 

559 ) 

560 future.add_done_callback(_log_progress_failure) 

561 

562 try: 

563 result = await asyncio.to_thread(pull_model_data, model, src, on_update=on_update) 

564 except (RuntimeError, PermissionError) as exc: 

565 return _error(str(exc)) 

566 return result.model_dump() 

567 

568 

569@mcp.tool() 

570def model_rm(model: str, source: str = "") -> dict[str, Any]: 

571 """Remove an installed model. 

572 

573 Args: 

574 model: Model ref to remove. 

575 source: Restrict to "native" or "remote"; empty = both. 

576 """ 

577 from lilbee.app.models import remove_model_data 

578 from lilbee.catalog.types import ModelSource 

579 

580 try: 

581 src = ModelSource.parse(source) 

582 except ValueError as exc: 

583 return _error(str(exc)) 

584 return remove_model_data(model, source=src).model_dump() 

585 

586 

587@mcp.tool() 

588def wiki_drafts_list() -> dict[str, Any]: 

589 """List pending wiki drafts with drift, faithfulness, and pairing info. 

590 

591 Read-only. Accept and reject are CLI-only (destructive, explicit). 

592 """ 

593 from lilbee.wiki.drafts import list_drafts 

594 

595 wiki_root = cfg.data_root / cfg.wiki_dir 

596 drafts = list_drafts(wiki_root) 

597 return { 

598 "command": "wiki_drafts_list", 

599 "drafts": [d.to_dict() for d in drafts], 

600 "total": len(drafts), 

601 } 

602 

603 

604@mcp.tool() 

605def wiki_drafts_diff(slug: str) -> dict[str, Any]: 

606 """Return a unified diff of the draft against its published counterpart. 

607 

608 Args: 

609 slug: Draft slug (e.g. ``"chevrolet"``). 

610 """ 

611 from lilbee.wiki.drafts import diff_draft 

612 

613 wiki_root = cfg.data_root / cfg.wiki_dir 

614 try: 

615 diff = diff_draft(slug, wiki_root) 

616 except FileNotFoundError as exc: 

617 return _error(str(exc)) 

618 return {"command": "wiki_drafts_diff", "slug": slug, "diff": diff} 

619 

620 

621def main() -> None: 

622 """Entry point for the MCP server.""" 

623 # Preload so the first tool call doesn't pay the cold-start cost 

624 # of provider/embedder/store init. Failures (missing model, bad 

625 # config) still surface on the first tool call rather than crashing 

626 # the server before it attaches to stdio. 

627 try: 

628 get_services() 

629 except Exception: 

630 log.debug("MCP pre-warm failed; services will init on first call", exc_info=True) 

631 

632 from lilbee.parent_monitor import parse_parent_pid, watch_parent_thread 

633 

634 parent_pid = parse_parent_pid() 

635 if parent_pid is not None: 

636 watch_parent_thread(parent_pid, lambda: os._exit(0)) 

637 

638 mcp.run()