Coverage for src / lilbee / mcp_server.py: 100%
278 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""MCP server exposing lilbee as tools for AI agents.
3Tool handler bodies use function-local ``from lilbee.X import ...`` to keep
4``lilbee mcp`` boot fast (the same startup discipline AGENTS.md mandates for
5Typer command bodies). Heavy chains pulled in lazily here:
6``data.ingest`` / ``wiki.*`` / ``wiki.drafts`` (spaCy via the wiki ingest
7pipeline), ``crawler`` (crawl4ai + Playwright), ``app.models`` /
8``modelhub.model_manager`` / ``catalog`` (HF discovery + `huggingface_hub`).
9``app.ingest`` / ``app.reset`` are individually light but transitively reach
10``data.ingest`` via the runtime sync handlers, so they share the policy.
11"""
13from __future__ import annotations
15import asyncio
16import concurrent.futures
17import logging
18import os
19from pathlib import Path
20from typing import Any
22from mcp.server.fastmcp import Context, FastMCP
24from lilbee.app.search import clean_result
25from lilbee.app.services import get_services, reset_services, reset_store
26from lilbee.core.config import cfg
27from lilbee.core.settings import overlay_persisted_settings
28from lilbee.core.system import LOCAL_ROOT_DIRNAME
29from lilbee.crawler import is_url, require_valid_crawl_url
30from lilbee.crawler.task import get_task, start_crawl
31from lilbee.data.store import SearchScope, scope_to_chunk_type
32from lilbee.wiki.shared import (
33 WIKI_DISABLED_ERROR,
34 WikiSubdir,
35)
37log = logging.getLogger(__name__)
39mcp = FastMCP("lilbee", instructions="Local RAG knowledge base. Search indexed documents.")
42def _error(msg: str) -> dict[str, Any]:
43 """Uniform error envelope MCP tool handlers return on a failure path.
45 Typed as ``dict[str, Any]`` rather than a TypedDict so it composes
46 with the success-side returns under the existing handler signatures
47 without forcing every caller to widen its return type.
48 """
49 return {"error": msg}
52@mcp.tool()
53def search(
54 query: str, top_k: int = 5, scope: str = SearchScope.BOTH.value
55) -> list[dict[str, Any]] | dict[str, Any]:
56 """Search the knowledge base for relevant document chunks.
58 ``scope`` picks the pool: ``"raw"`` (source chunks), ``"wiki"`` (wiki
59 page bodies), or ``"both"`` (default, unfiltered). Returns chunks
60 sorted by relevance. No LLM call -- uses pre-computed embeddings.
61 """
62 if not query or not query.strip():
63 return _error("query must not be empty")
64 try:
65 chunk_type = scope_to_chunk_type(scope)
66 except ValueError as exc:
67 return _error(str(exc))
68 try:
69 results = get_services().searcher.search(query, top_k=top_k, chunk_type=chunk_type)
70 results = [r for r in results if r.distance is None or r.distance <= cfg.max_distance]
71 return [clean_result(r) for r in results]
72 except Exception as exc:
73 return _error(str(exc))
76@mcp.tool()
77def status() -> dict[str, Any]:
78 """Show indexed documents, configuration, and chunk counts."""
79 sources = get_services().store.get_sources()
80 return {
81 "config": {
82 "documents_dir": str(cfg.documents_dir),
83 "data_dir": str(cfg.data_dir),
84 "chat_model": cfg.chat_model,
85 "embedding_model": cfg.embedding_model,
86 "vision_model": cfg.vision_model,
87 "reranker_model": cfg.reranker_model,
88 "enable_ocr": cfg.enable_ocr,
89 "num_ctx": cfg.num_ctx,
90 "num_ctx_max": cfg.num_ctx_max,
91 "flash_attention": cfg.flash_attention,
92 "kv_cache_type": cfg.kv_cache_type.value,
93 "n_gpu_layers": cfg.n_gpu_layers,
94 "main_gpu": cfg.main_gpu,
95 "gpu_devices": cfg.gpu_devices,
96 },
97 "sources": [
98 {"filename": s["filename"], "chunk_count": s["chunk_count"]}
99 for s in sorted(sources, key=lambda x: x["filename"])
100 ],
101 "total_chunks": sum(s["chunk_count"] for s in sources),
102 }
105@mcp.tool()
106async def sync(force_rebuild: bool = False, retry_skipped: bool = False) -> dict[str, Any]:
107 """Sync documents directory with the vector store.
109 Args:
110 force_rebuild: Drop every table and re-ingest from scratch (equivalent
111 to ``lilbee rebuild``). Also clears the failed-file skip markers.
112 retry_skipped: Clear the failed-file skip markers so files that were
113 skipped on a previous sync get another attempt, without dropping
114 the store.
115 """
116 from lilbee.data.ingest import sync as run_sync
118 return (
119 await run_sync(quiet=True, force_rebuild=force_rebuild, retry_skipped=retry_skipped)
120 ).model_dump()
123@mcp.tool()
124async def add(
125 paths: list[str],
126 force: bool = False,
127 enable_ocr: bool | None = None,
128 ocr_timeout: float | None = None,
129) -> dict[str, Any]:
130 """Add files, directories, or URLs to the knowledge base and sync.
131 Copies the given paths into the documents directory, then ingests them.
132 URLs (http:// or https://) are fetched as markdown and saved to _web/.
133 Paths must be absolute and accessible from this machine.
135 Args:
136 paths: Absolute file/directory paths or URLs to add.
137 force: Overwrite files that already exist in the knowledge base.
138 enable_ocr: Force vision OCR on (True), off (False), or auto-detect
139 from chat model capabilities (None/omit).
140 ocr_timeout: Per-page timeout in seconds for vision OCR. Overrides
141 the configured default for this invocation only.
142 """
143 from lilbee.app.ingest import copy_files
144 from lilbee.data.ingest import sync as run_sync
146 errors: list[str] = []
147 valid: list[Path] = []
148 urls: list[str] = []
149 for p_str in paths:
150 if is_url(p_str):
151 urls.append(p_str)
152 else:
153 p = Path(p_str)
154 if not p.exists():
155 errors.append(p_str)
156 else:
157 valid.append(p)
159 # Crawl URLs
160 crawled_count = 0
161 if urls:
162 from lilbee.crawler import crawler_available
164 if not crawler_available():
165 return _error("Web crawling requires: pip install 'lilbee[crawler]'")
166 from lilbee.crawler import crawl_and_save
168 for url in urls:
169 try:
170 require_valid_crawl_url(url)
171 except ValueError as exc:
172 errors.append(f"{url}: {exc}")
173 continue
174 crawled_paths = await crawl_and_save(url)
175 crawled_count += len(crawled_paths)
177 copy_result = copy_files(valid, force=force)
179 from lilbee.app.ingest import temporary_ocr_config
181 with temporary_ocr_config(enable_ocr, ocr_timeout):
182 sync_result = (await run_sync(quiet=True)).model_dump()
184 result: dict[str, Any] = {
185 "command": "add",
186 "copied": copy_result.copied,
187 "skipped": copy_result.skipped,
188 "crawled": crawled_count,
189 "errors": errors,
190 "sync": sync_result,
191 }
192 if errors or sync_result.get("failed"):
193 result["warning"] = "some files could not be processed"
194 return result
197@mcp.tool()
198def crawl(
199 url: str,
200 depth: int | None = None,
201 max_pages: int | None = None,
202) -> dict[str, Any]:
203 """Crawl a web page and add it to the knowledge base (non-blocking).
204 Launches the crawl as a background task and returns immediately with a
205 task_id. Use crawl_status(task_id) to poll progress.
207 Args:
208 url: The URL to crawl (must start with http:// or https://).
209 depth: None (default) crawls the whole site; 0 fetches only this URL;
210 positive int caps link-follow depth.
211 max_pages: None (default) means no page limit. Positive int caps total
212 pages fetched.
213 """
214 from lilbee.crawler import crawler_available
216 if not crawler_available():
217 return _error("Web crawling requires: pip install 'lilbee[crawler]'")
218 try:
219 require_valid_crawl_url(url)
220 except ValueError as exc:
221 return _error(str(exc))
223 task_id = start_crawl(url, depth=depth, max_pages=max_pages)
224 return {"status": "started", "task_id": task_id, "url": url}
227@mcp.tool()
228def crawl_status(task_id: str) -> dict[str, Any]:
229 """Check the status of a running crawl task.
230 Returns the current state including status, pages crawled, and any error.
231 Use this to poll after crawl returns a task_id.
233 Args:
234 task_id: The task ID returned by crawl.
235 """
236 task = get_task(task_id)
237 if task is None:
238 return _error(f"No task found with id: {task_id}")
239 return {
240 "task_id": task.task_id,
241 "url": task.url,
242 "status": task.status.value,
243 "pages_crawled": task.pages_crawled,
244 "pages_total": task.pages_total,
245 "error": task.error,
246 "started_at": task.started_at,
247 "finished_at": task.finished_at,
248 }
251@mcp.tool()
252def init(path: str = "") -> dict[str, Any]:
253 """Initialize a local .lilbee/ knowledge base in a directory.
254 Creates .lilbee/ with documents/, data/, and .gitignore.
255 If path is empty, uses the current working directory.
256 Also switches the MCP session to use this knowledge base for
257 subsequent tool calls.
258 """
259 base = Path(path) if path else Path.cwd()
260 root = base / LOCAL_ROOT_DIRNAME
262 created = False
263 if not root.is_dir():
264 (root / "documents").mkdir(parents=True)
265 (root / "data").mkdir(parents=True)
266 (root / ".gitignore").write_text("data/\n")
267 created = True
269 # Switch MCP session to this project's KB. Overlay any persisted
270 # config.toml so per-vault model / generation settings take effect,
271 # matching the CLI's --data-dir behaviour. Env export mirrors
272 # cli/app.py::_apply_data_root for worker-log parity.
273 cfg.data_root = base
274 cfg.documents_dir = root / "documents"
275 cfg.data_dir = root / "data"
276 cfg.lancedb_dir = root / "data" / "lancedb"
277 os.environ["LILBEE_DATA"] = str(base)
278 overlay_persisted_settings(base)
279 reset_services()
281 return {"command": "init", "path": str(root), "created": created}
284@mcp.tool()
285def remove(names: list[str], delete_files: bool = False) -> dict[str, Any]:
286 """Remove documents from the knowledge base by source name.
287 Args:
288 names: Source filenames to remove (as shown by status).
289 delete_files: Also delete the physical files from the documents directory.
290 """
291 result = get_services().store.remove_documents(
292 names, delete_files=delete_files, documents_dir=cfg.documents_dir
293 )
294 return {"command": "remove", "removed": result.removed, "not_found": result.not_found}
297@mcp.tool()
298def list_documents() -> dict[str, Any]:
299 """List all indexed documents with their chunk counts."""
300 sources = get_services().store.get_sources()
301 return {
302 "documents": [
303 {"filename": s["filename"], "chunk_count": s.get("chunk_count", 0)} for s in sources
304 ],
305 "total": len(sources),
306 }
309@mcp.tool()
310def reset(confirm: bool = False) -> dict[str, Any]:
311 """Delete all documents and data (full factory reset).
312 WARNING: This permanently removes all indexed documents and vector data.
313 Pass confirm=true to proceed.
314 """
315 if not confirm:
316 return _error("pass confirm=true to confirm deletion")
317 from lilbee.app.reset import perform_reset
319 result = perform_reset().model_dump()
320 # Reopen LanceDB against the empty data dir; keep providers loaded.
321 reset_store()
322 return result
325@mcp.tool()
326def wiki_lint(wiki_source: str = "") -> dict[str, Any]:
327 """Lint wiki pages for citation staleness, missing sources, and unmarked claims.
328 If wiki_source is provided, lint only that page. Otherwise, lint all wiki pages.
330 Args:
331 wiki_source: Path like "wiki/summaries/doc.md". Empty = lint all.
332 """
333 from lilbee.wiki.lint import lint_all, lint_wiki_page
335 store = get_services().store
336 if wiki_source:
337 issues = lint_wiki_page(wiki_source, store)
338 else:
339 report = lint_all(store)
340 issues = report.issues
341 return {
342 "command": "wiki_lint",
343 "issues": [i.to_dict() for i in issues],
344 "total": len(issues),
345 }
348@mcp.tool()
349def wiki_citations(wiki_source: str) -> dict[str, Any]:
350 """Get all citations for a wiki page.
351 Args:
352 wiki_source: Wiki page path, e.g. "wiki/summaries/doc.md".
353 """
354 records = get_services().store.get_citations_for_wiki(wiki_source)
355 return {
356 "command": "wiki_citations",
357 "wiki_source": wiki_source,
358 "citations": [dict(r) for r in records],
359 "total": len(records),
360 }
363@mcp.tool()
364def wiki_status() -> dict[str, Any]:
365 """Show wiki layer status: page counts, recent lint issues."""
366 from lilbee.wiki.lint import lint_all
368 wiki_root = cfg.data_root / cfg.wiki_dir
369 if not wiki_root.exists():
370 return {"wiki_enabled": cfg.wiki, "pages": 0, "issues": 0}
372 summaries_dir = wiki_root / WikiSubdir.SUMMARIES
373 drafts_dir = wiki_root / WikiSubdir.DRAFTS
374 summaries = list(summaries_dir.rglob("*.md")) if summaries_dir.exists() else []
375 drafts = list(drafts_dir.rglob("*.md")) if drafts_dir.exists() else []
377 report = lint_all(get_services().store)
378 return {
379 "wiki_enabled": cfg.wiki,
380 WikiSubdir.SUMMARIES: len(summaries),
381 WikiSubdir.DRAFTS: len(drafts),
382 "pages": len(summaries) + len(drafts),
383 "lint_errors": report.error_count,
384 "lint_warnings": report.warning_count,
385 }
388@mcp.tool()
389def wiki_list() -> dict[str, Any]:
390 """List all wiki pages (summaries and concepts) with metadata.
391 Returns page slugs, titles, types, source counts, and creation dates.
392 """
393 if not cfg.wiki:
394 return _error(WIKI_DISABLED_ERROR)
395 from dataclasses import asdict
397 from lilbee.wiki.browse import list_pages
399 wiki_root = cfg.data_root / cfg.wiki_dir
400 pages = list_pages(wiki_root)
401 return {
402 "command": "wiki_list",
403 "pages": [asdict(p) for p in pages],
404 "total": len(pages),
405 }
408@mcp.tool()
409def wiki_read(slug: str) -> dict[str, Any]:
410 """Read a wiki page's content and frontmatter by slug.
411 Args:
412 slug: Page slug like "summaries/my-doc" or "concepts/typing".
413 """
414 if not cfg.wiki:
415 return _error(WIKI_DISABLED_ERROR)
416 from dataclasses import asdict
418 from lilbee.wiki.browse import read_page
420 wiki_root = cfg.data_root / cfg.wiki_dir
421 result = read_page(wiki_root, slug)
422 if result is None:
423 return _error(f"wiki page not found: {slug}")
424 return {"command": "wiki_read", **asdict(result)}
427@mcp.tool()
428def wiki_build() -> dict[str, Any]:
429 """Build the concept and entity wiki across all ingested sources.
431 Returns ``{paths, entities, count}``.
432 """
433 if not cfg.wiki:
434 return _error(WIKI_DISABLED_ERROR)
435 from lilbee.wiki import run_full_build
437 return {"command": "wiki_build", **run_full_build(cfg)}
440@mcp.tool()
441def wiki_update() -> dict[str, Any]:
442 """Refresh the concept and entity wiki after an ingest. Currently a full rebuild."""
443 if not cfg.wiki:
444 return _error(WIKI_DISABLED_ERROR)
445 from lilbee.wiki import run_full_build
447 return {"command": "wiki_update", **run_full_build(cfg)}
450@mcp.tool()
451def wiki_synthesize() -> dict[str, Any]:
452 """Generate synthesis pages for concept clusters spanning three or more sources.
454 Returns the list of synthesis page paths written to disk. When no
455 cluster meets the 3+ source threshold, returns an empty list and
456 ``count: 0``.
457 """
458 if not cfg.wiki:
459 return _error(WIKI_DISABLED_ERROR)
460 from lilbee.wiki import run_full_synthesize
462 return {"command": "wiki_synthesize", **run_full_synthesize(cfg)}
465@mcp.tool()
466def wiki_prune() -> dict[str, Any]:
467 """Prune stale and orphaned wiki pages.
468 Archives pages whose sources are all deleted or whose concept cluster
469 dropped below 3 live sources. Flags pages with >50% stale citations
470 for regeneration.
471 """
472 from lilbee.wiki.prune import prune_wiki
474 report = prune_wiki(get_services().store)
475 return {
476 "command": "wiki_prune",
477 "records": [r.to_dict() for r in report.records],
478 "archived": report.archived_count,
479 "flagged": report.flagged_count,
480 }
483@mcp.tool()
484def model_list(source: str = "", task: str = "") -> dict[str, Any]:
485 """List installed models across native and SDK-backend sources.
487 Args:
488 source: Filter by source: "native", "remote", or "" for all.
489 task: Filter by task: "chat", "embedding", "vision", "rerank", or "" for all.
490 """
491 from lilbee.app.models import list_models_data
492 from lilbee.catalog.types import ModelSource, ModelTask
494 try:
495 src = ModelSource.parse(source)
496 except ValueError as exc:
497 return _error(str(exc))
498 try:
499 parsed_task = ModelTask(task) if task else None
500 except ValueError as exc:
501 return _error(str(exc))
502 return list_models_data(source=src, task=parsed_task).model_dump()
505@mcp.tool()
506def model_show(model: str) -> dict[str, Any]:
507 """Show catalog and installed metadata for a model ref."""
508 from lilbee.app.models import show_model_data
509 from lilbee.modelhub.model_manager import ModelNotFoundError
511 try:
512 return show_model_data(model).model_dump()
513 except ModelNotFoundError as exc:
514 return _error(str(exc))
517def _log_progress_failure(future: concurrent.futures.Future[None]) -> None:
518 """Log report_progress failures without raising.
520 Progress notifications are best-effort: a failure should not abort
521 an in-flight pull.
522 """
523 try:
524 future.result()
525 except Exception:
526 log.warning("MCP report_progress failed", exc_info=True)
529@mcp.tool()
530async def model_pull(
531 model: str,
532 source: str = "native",
533 ctx: Context | None = None,
534) -> dict[str, Any]:
535 """Download a model, streaming progress via MCP notifications.
537 Args:
538 model: Model ref to pull (e.g. "Qwen/Qwen3-0.6B-GGUF" or
539 "Qwen/Qwen3-0.6B-GGUF/Qwen3-0.6B-Q8_0.gguf").
540 source: "native" (HuggingFace GGUF) or "remote" (SDK-managed).
541 """
542 from lilbee.app.models import pull_model_data
543 from lilbee.catalog import DownloadProgress
544 from lilbee.catalog.types import ModelSource
546 try:
547 src = ModelSource.parse(source) or ModelSource.NATIVE
548 except ValueError as exc:
549 return _error(str(exc))
551 loop = asyncio.get_running_loop()
553 def on_update(p: DownloadProgress) -> None:
554 if ctx is None:
555 return
556 future = asyncio.run_coroutine_threadsafe(
557 ctx.report_progress(progress=float(p.percent), total=100.0, message=p.detail),
558 loop,
559 )
560 future.add_done_callback(_log_progress_failure)
562 try:
563 result = await asyncio.to_thread(pull_model_data, model, src, on_update=on_update)
564 except (RuntimeError, PermissionError) as exc:
565 return _error(str(exc))
566 return result.model_dump()
569@mcp.tool()
570def model_rm(model: str, source: str = "") -> dict[str, Any]:
571 """Remove an installed model.
573 Args:
574 model: Model ref to remove.
575 source: Restrict to "native" or "remote"; empty = both.
576 """
577 from lilbee.app.models import remove_model_data
578 from lilbee.catalog.types import ModelSource
580 try:
581 src = ModelSource.parse(source)
582 except ValueError as exc:
583 return _error(str(exc))
584 return remove_model_data(model, source=src).model_dump()
587@mcp.tool()
588def wiki_drafts_list() -> dict[str, Any]:
589 """List pending wiki drafts with drift, faithfulness, and pairing info.
591 Read-only. Accept and reject are CLI-only (destructive, explicit).
592 """
593 from lilbee.wiki.drafts import list_drafts
595 wiki_root = cfg.data_root / cfg.wiki_dir
596 drafts = list_drafts(wiki_root)
597 return {
598 "command": "wiki_drafts_list",
599 "drafts": [d.to_dict() for d in drafts],
600 "total": len(drafts),
601 }
604@mcp.tool()
605def wiki_drafts_diff(slug: str) -> dict[str, Any]:
606 """Return a unified diff of the draft against its published counterpart.
608 Args:
609 slug: Draft slug (e.g. ``"chevrolet"``).
610 """
611 from lilbee.wiki.drafts import diff_draft
613 wiki_root = cfg.data_root / cfg.wiki_dir
614 try:
615 diff = diff_draft(slug, wiki_root)
616 except FileNotFoundError as exc:
617 return _error(str(exc))
618 return {"command": "wiki_drafts_diff", "slug": slug, "diff": diff}
621def main() -> None:
622 """Entry point for the MCP server."""
623 # Preload so the first tool call doesn't pay the cold-start cost
624 # of provider/embedder/store init. Failures (missing model, bad
625 # config) still surface on the first tool call rather than crashing
626 # the server before it attaches to stdio.
627 try:
628 get_services()
629 except Exception:
630 log.debug("MCP pre-warm failed; services will init on first call", exc_info=True)
632 from lilbee.parent_monitor import parse_parent_pid, watch_parent_thread
634 parent_pid = parse_parent_pid()
635 if parent_pid is not None:
636 watch_parent_thread(parent_pid, lambda: os._exit(0))
638 mcp.run()