Coverage for src / lilbee / mcp_server.py: 100%
390 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""MCP server exposing lilbee as tools for AI agents."""
3from __future__ import annotations
5import asyncio
6import concurrent.futures
7import logging
8import os
9import re
10from pathlib import Path
11from typing import Any
13from mcp.server.fastmcp import Context, FastMCP
15from lilbee.app.memory import (
16 MEMORY_DISABLED_HINT,
17 forget,
18 list_memories,
19 memory_enabled,
20 recall,
21 remember,
22)
23from lilbee.app.search import clean_result
24from lilbee.app.services import get_services, reset_services, reset_store
25from lilbee.app.settings import (
26 SettingInfo,
27 apply_settings_update,
28 get_setting,
29 list_settings,
30 reset_settings,
31)
32from lilbee.catalog.types import ModelSource
33from lilbee.core.config import cfg
34from lilbee.core.config.enums import CrawlRenderMode
35from lilbee.core.settings import overlay_persisted_settings
36from lilbee.core.system import LOCAL_ROOT_DIRNAME
37from lilbee.crawler import is_url, require_valid_crawl_url
38from lilbee.crawler.task import get_task, start_crawl
39from lilbee.data.store import (
40 MemoryKind,
41 MemorySource,
42 SearchScope,
43 agent_owner,
44 scope_to_chunk_type,
45)
46from lilbee.wiki.shared import (
47 WIKI_DISABLED_ERROR,
48 WikiSubdir,
49)
51log = logging.getLogger(__name__)
53mcp = FastMCP(
54 "lilbee",
55 instructions="Local search engine over your files, code, and crawled pages. "
56 "Search indexed documents for answers with file and line citations.",
57)
60def _error(msg: str) -> dict[str, Any]:
61 """Uniform error envelope MCP tool handlers return on a failure path.
63 Typed as ``dict[str, Any]`` rather than a TypedDict so it composes
64 with the success-side returns under the existing handler signatures
65 without forcing every caller to widen its return type.
66 """
67 return {"error": msg}
70@mcp.tool()
71def search(
72 query: str, top_k: int | None = None, scope: str = SearchScope.BOTH.value
73) -> list[dict[str, Any]] | dict[str, Any]:
74 """Search the knowledge base for relevant document chunks.
76 ``top_k`` defaults to the ``top_k`` config field so ``settings_set``
77 governs the candidate count for agents that don't pass it explicitly.
78 ``scope`` picks the pool: ``"raw"`` (source chunks), ``"wiki"`` (wiki
79 page bodies), or ``"both"`` (default, unfiltered). Returns chunks
80 sorted by relevance. No LLM call -- uses pre-computed embeddings.
81 """
82 if not query or not query.strip():
83 return _error("query must not be empty")
84 try:
85 chunk_type = scope_to_chunk_type(scope)
86 except ValueError as exc:
87 return _error(str(exc))
88 effective_top_k = top_k if top_k is not None else cfg.top_k
89 try:
90 results = get_services().searcher.search(
91 query, top_k=effective_top_k, chunk_type=chunk_type
92 )
93 results = [r for r in results if r.distance is None or r.distance <= cfg.max_distance]
94 return [clean_result(r) for r in results]
95 except Exception as exc:
96 return _error(str(exc))
99@mcp.tool()
100def status() -> dict[str, Any]:
101 """Show indexed documents, configuration, and chunk counts."""
102 sources = get_services().store.get_sources()
103 return {
104 "config": {
105 "documents_dir": str(cfg.documents_dir),
106 "data_dir": str(cfg.data_dir),
107 "chat_model": cfg.chat_model,
108 "embedding_model": cfg.embedding_model,
109 "vision_model": cfg.vision_model,
110 "reranker_model": cfg.reranker_model,
111 "enable_ocr": cfg.enable_ocr,
112 "num_ctx": cfg.num_ctx,
113 "num_ctx_max": cfg.num_ctx_max,
114 "chat_n_ctx_target": cfg.chat_n_ctx_target,
115 "flash_attention": cfg.flash_attention,
116 "kv_cache_type": cfg.kv_cache_type.value,
117 "n_gpu_layers": cfg.n_gpu_layers,
118 "main_gpu": cfg.main_gpu,
119 "gpu_devices": cfg.gpu_devices,
120 },
121 "sources": [
122 {"filename": s["filename"], "chunk_count": s["chunk_count"]}
123 for s in sorted(sources, key=lambda x: x["filename"])
124 ],
125 "total_chunks": sum(s["chunk_count"] for s in sources),
126 }
129@mcp.tool()
130async def sync(force_rebuild: bool = False, retry_skipped: bool = False) -> dict[str, Any]:
131 """Sync documents directory with the vector store.
133 Args:
134 force_rebuild: Drop every table and re-ingest from scratch (equivalent
135 to ``lilbee rebuild``). Also clears the failed-file skip markers.
136 retry_skipped: Clear the failed-file skip markers so files that were
137 skipped on a previous sync get another attempt, without dropping
138 the store.
139 """
140 from lilbee.data.ingest import sync as run_sync
142 return (
143 await run_sync(quiet=True, force_rebuild=force_rebuild, retry_skipped=retry_skipped)
144 ).model_dump()
147@mcp.tool()
148async def add(
149 paths: list[str],
150 force: bool = False,
151 enable_ocr: bool | None = None,
152 ocr_timeout: float | None = None,
153 render_mode: CrawlRenderMode | None = None,
154) -> dict[str, Any]:
155 """Add files, directories, or URLs to the knowledge base and sync.
156 Copies the given paths into the documents directory, then ingests them.
157 URLs (http:// or https://) are fetched as markdown and saved to _web/.
158 Paths must be absolute and accessible from this machine.
160 Args:
161 paths: Absolute file/directory paths or URLs to add.
162 force: Overwrite files that already exist in the knowledge base.
163 enable_ocr: Force vision OCR on (True), off (False), or auto-detect
164 from chat model capabilities (None/omit).
165 ocr_timeout: Per-page timeout in seconds for vision OCR. Overrides
166 the configured default for this invocation only.
167 render_mode: How any URLs are crawled. None (default) uses the
168 configured crawl_render_mode; "http" is browserless, "browser"
169 runs Chromium with JavaScript.
170 """
171 from lilbee.app.ingest import copy_files
172 from lilbee.data.ingest import sync as run_sync
174 errors: list[str] = []
175 valid: list[Path] = []
176 urls: list[str] = []
177 for p_str in paths:
178 if is_url(p_str):
179 urls.append(p_str)
180 else:
181 p = Path(p_str)
182 if not p.exists():
183 errors.append(p_str)
184 else:
185 valid.append(p)
187 # Crawl URLs
188 crawled_count = 0
189 if urls:
190 from lilbee.crawler import crawler_available
192 if not crawler_available():
193 return _error("Web crawling requires: pip install 'lilbee[crawler]'")
194 from lilbee.crawler import crawl_and_save
196 for url in urls:
197 try:
198 require_valid_crawl_url(url)
199 except ValueError as exc:
200 errors.append(f"{url}: {exc}")
201 continue
202 crawled_paths = await crawl_and_save(url, render_mode=render_mode)
203 crawled_count += len(crawled_paths)
205 copy_result = copy_files(valid, force=force)
207 from lilbee.app.ingest import temporary_ocr_config
209 with temporary_ocr_config(enable_ocr, ocr_timeout):
210 sync_result = (await run_sync(quiet=True)).model_dump()
212 result: dict[str, Any] = {
213 "command": "add",
214 "copied": copy_result.copied,
215 "skipped": copy_result.skipped,
216 "crawled": crawled_count,
217 "errors": errors,
218 "sync": sync_result,
219 }
220 if errors or sync_result.get("failed"):
221 result["warning"] = "some files could not be processed"
222 return result
225@mcp.tool()
226def crawl(
227 url: str,
228 depth: int | None = None,
229 max_pages: int | None = None,
230 render_mode: CrawlRenderMode | None = None,
231) -> dict[str, Any]:
232 """Crawl a web page and add it to the knowledge base (non-blocking).
233 Launches the crawl as a background task and returns immediately with a
234 task_id. Use crawl_status(task_id) to poll progress.
236 Args:
237 url: The URL to crawl (must start with http:// or https://).
238 depth: None (default) crawls the whole site; 0 fetches only this URL;
239 positive int caps link-follow depth.
240 max_pages: None (default) means no page limit. Positive int caps total
241 pages fetched.
242 render_mode: None (default) uses the configured crawl_render_mode.
243 "http" fetches without a browser (lightweight); "browser" runs
244 Chromium with JavaScript enabled for client-rendered sites.
245 """
246 from lilbee.crawler import crawler_available
248 if not crawler_available():
249 return _error("Web crawling requires: pip install 'lilbee[crawler]'")
250 try:
251 require_valid_crawl_url(url)
252 except ValueError as exc:
253 return _error(str(exc))
255 task_id = start_crawl(url, depth=depth, max_pages=max_pages, render_mode=render_mode)
256 return {"status": "started", "task_id": task_id, "url": url}
259@mcp.tool()
260def crawl_status(task_id: str) -> dict[str, Any]:
261 """Check the status of a running crawl task.
262 Returns the current state including status, pages crawled, and any error.
263 Use this to poll after crawl returns a task_id.
265 Args:
266 task_id: The task ID returned by crawl.
267 """
268 task = get_task(task_id)
269 if task is None:
270 return _error(f"No task found with id: {task_id}")
271 return {
272 "task_id": task.task_id,
273 "url": task.url,
274 "status": task.status.value,
275 "pages_crawled": task.pages_crawled,
276 "pages_total": task.pages_total,
277 "error": task.error,
278 "started_at": task.started_at,
279 "finished_at": task.finished_at,
280 }
283@mcp.tool()
284def init(path: str = "") -> dict[str, Any]:
285 """Initialize a local .lilbee/ knowledge base in a directory.
286 Creates .lilbee/ with documents/, data/, and .gitignore.
287 If path is empty, uses the current working directory.
288 Also switches the MCP session to use this knowledge base for
289 subsequent tool calls.
290 """
291 base = Path(path) if path else Path.cwd()
292 root = base / LOCAL_ROOT_DIRNAME
294 created = False
295 if not root.is_dir():
296 (root / "documents").mkdir(parents=True)
297 (root / "data").mkdir(parents=True)
298 (root / ".gitignore").write_text("data/\n")
299 created = True
301 # Switch MCP session to this project's KB. Overlay any persisted
302 # config.toml so per-vault model / generation settings take effect,
303 # matching the CLI's --data-dir behaviour. Env export mirrors
304 # cli/app.py::_apply_data_root for worker-log parity.
305 cfg.data_root = base
306 cfg.documents_dir = root / "documents"
307 cfg.data_dir = root / "data"
308 cfg.lancedb_dir = root / "data" / "lancedb"
309 os.environ["LILBEE_DATA"] = str(base)
310 overlay_persisted_settings(base)
311 reset_services()
313 return {"command": "init", "path": str(root), "created": created}
316@mcp.tool()
317def remove(names: list[str], delete_files: bool = False) -> dict[str, Any]:
318 """Remove documents from the knowledge base by source name.
319 Args:
320 names: Source filenames to remove (as shown by status).
321 delete_files: Also delete the physical files from the documents directory.
322 """
323 result = get_services().store.remove_documents(
324 names, delete_files=delete_files, documents_dir=cfg.documents_dir
325 )
326 return {"command": "remove", "removed": result.removed, "not_found": result.not_found}
329@mcp.tool()
330def list_documents() -> dict[str, Any]:
331 """List all indexed documents with their chunk counts."""
332 sources = get_services().store.get_sources()
333 return {
334 "documents": [
335 {"filename": s["filename"], "chunk_count": s.get("chunk_count", 0)} for s in sources
336 ],
337 "total": len(sources),
338 }
341@mcp.tool()
342def export_dataset(output: str, fmt: str = "", source: str = "") -> dict[str, Any]:
343 """Write the per-page {source, page, text} dataset to a file (no vectors).
345 ``fmt`` is "parquet" or "jsonl"; empty infers from the output suffix.
346 ``source`` limits the export to one source filename. No embedding.
347 """
348 from lilbee.app.dataset import DatasetError, export_to_path
350 try:
351 summary = export_to_path(Path(output), fmt, source or None)
352 except DatasetError as exc:
353 return _error(str(exc))
354 return summary.model_dump()
357@mcp.tool()
358async def import_dataset(dataset: str, fmt: str = "", ctx: Context | None = None) -> dict[str, Any]:
359 """Import a per-page text dataset file, re-embedding it under the current model.
361 Replaces existing copies of each source; imported sources are detached, so
362 sync won't delete them. Streams progress notifications. ``fmt`` empty
363 infers from the dataset suffix (parquet or jsonl).
364 """
365 from lilbee.app.dataset import DatasetError, import_from_path
366 from lilbee.runtime.progress import EmbedEvent, EventType, ProgressEvent
368 loop = asyncio.get_running_loop()
370 def on_progress(event_type: EventType, data: ProgressEvent) -> None:
371 # EMBED events carry chunk/total_chunks; other event types don't map to a percent.
372 if ctx is None or not isinstance(data, EmbedEvent):
373 return
374 future = asyncio.run_coroutine_threadsafe(
375 ctx.report_progress(
376 progress=float(data.chunk), total=float(data.total_chunks), message=data.file
377 ),
378 loop,
379 )
380 future.add_done_callback(_log_progress_failure)
382 try:
383 summary = await import_from_path(Path(dataset), fmt, on_progress=on_progress)
384 except DatasetError as exc:
385 return _error(str(exc))
386 return summary.model_dump()
389@mcp.tool()
390def reset(confirm: bool = False) -> dict[str, Any]:
391 """Delete all documents and data (full factory reset).
392 WARNING: This permanently removes all indexed documents and vector data.
393 Pass confirm=true to proceed.
394 """
395 if not confirm:
396 return _error("pass confirm=true to confirm deletion")
397 from lilbee.app.reset import perform_reset
399 result = perform_reset().model_dump()
400 # Reopen LanceDB against the empty data dir; keep providers loaded.
401 reset_store()
402 return result
405def wiki_lint(wiki_source: str = "") -> dict[str, Any]:
406 """Lint wiki pages for citation staleness, missing sources, and unmarked claims.
407 If wiki_source is provided, lint only that page. Otherwise, lint all wiki pages.
409 Args:
410 wiki_source: Path like "wiki/summaries/doc.md". Empty = lint all.
411 """
412 from lilbee.wiki.lint import lint_all, lint_wiki_page
414 store = get_services().store
415 if wiki_source:
416 issues = lint_wiki_page(wiki_source, store)
417 else:
418 report = lint_all(store)
419 issues = report.issues
420 return {
421 "command": "wiki_lint",
422 "issues": [i.to_dict() for i in issues],
423 "total": len(issues),
424 }
427def wiki_citations(wiki_source: str) -> dict[str, Any]:
428 """Get all citations for a wiki page.
429 Args:
430 wiki_source: Wiki page path, e.g. "wiki/summaries/doc.md".
431 """
432 records = get_services().store.get_citations_for_wiki(wiki_source)
433 return {
434 "command": "wiki_citations",
435 "wiki_source": wiki_source,
436 "citations": [dict(r) for r in records],
437 "total": len(records),
438 }
441def wiki_status() -> dict[str, Any]:
442 """Show wiki layer status: page counts, recent lint issues."""
443 from lilbee.wiki.lint import lint_all
445 wiki_root = cfg.data_root / cfg.wiki_dir
446 if not wiki_root.exists():
447 return {"wiki_enabled": cfg.wiki, "pages": 0, "issues": 0}
449 summaries_dir = wiki_root / WikiSubdir.SUMMARIES
450 drafts_dir = wiki_root / WikiSubdir.DRAFTS
451 summaries = list(summaries_dir.rglob("*.md")) if summaries_dir.exists() else []
452 drafts = list(drafts_dir.rglob("*.md")) if drafts_dir.exists() else []
454 report = lint_all(get_services().store)
455 return {
456 "wiki_enabled": cfg.wiki,
457 WikiSubdir.SUMMARIES: len(summaries),
458 WikiSubdir.DRAFTS: len(drafts),
459 "pages": len(summaries) + len(drafts),
460 "lint_errors": report.error_count,
461 "lint_warnings": report.warning_count,
462 }
465def wiki_list() -> dict[str, Any]:
466 """List all wiki pages (summaries and concepts) with metadata.
467 Returns page slugs, titles, types, source counts, and creation dates.
468 """
469 if not cfg.wiki:
470 return _error(WIKI_DISABLED_ERROR)
471 from dataclasses import asdict
473 from lilbee.wiki.browse import list_pages
475 wiki_root = cfg.data_root / cfg.wiki_dir
476 pages = list_pages(wiki_root)
477 return {
478 "command": "wiki_list",
479 "pages": [asdict(p) for p in pages],
480 "total": len(pages),
481 }
484def wiki_read(slug: str) -> dict[str, Any]:
485 """Read a wiki page's content and frontmatter by slug.
486 Args:
487 slug: Page slug like "summaries/my-doc" or "concepts/typing".
488 """
489 if not cfg.wiki:
490 return _error(WIKI_DISABLED_ERROR)
491 from dataclasses import asdict
493 from lilbee.wiki.browse import read_page
495 wiki_root = cfg.data_root / cfg.wiki_dir
496 result = read_page(wiki_root, slug)
497 if result is None:
498 return _error(f"wiki page not found: {slug}")
499 return {"command": "wiki_read", **asdict(result)}
502def wiki_build() -> dict[str, Any]:
503 """Build the concept and entity wiki across all ingested sources.
505 Returns ``{paths, entities, count}``.
506 """
507 if not cfg.wiki:
508 return _error(WIKI_DISABLED_ERROR)
509 from lilbee.wiki import run_full_build
511 return {"command": "wiki_build", **run_full_build(cfg)}
514def wiki_update() -> dict[str, Any]:
515 """Refresh the concept and entity wiki after an ingest. Currently a full rebuild."""
516 if not cfg.wiki:
517 return _error(WIKI_DISABLED_ERROR)
518 from lilbee.wiki import run_full_build
520 return {"command": "wiki_update", **run_full_build(cfg)}
523def wiki_synthesize() -> dict[str, Any]:
524 """Generate synthesis pages for concept clusters spanning three or more sources.
526 Returns the list of synthesis page paths written to disk. When no
527 cluster meets the 3+ source threshold, returns an empty list and
528 ``count: 0``.
529 """
530 if not cfg.wiki:
531 return _error(WIKI_DISABLED_ERROR)
532 from lilbee.wiki import run_full_synthesize
534 return {"command": "wiki_synthesize", **run_full_synthesize(cfg)}
537def wiki_prune() -> dict[str, Any]:
538 """Prune stale and orphaned wiki pages.
539 Archives pages whose sources are all deleted or whose concept cluster
540 dropped below 3 live sources. Flags pages with >50% stale citations
541 for regeneration.
542 """
543 from lilbee.wiki.prune import prune_wiki
545 report = prune_wiki(get_services().store)
546 return {
547 "command": "wiki_prune",
548 "records": [r.to_dict() for r in report.records],
549 "archived": report.archived_count,
550 "flagged": report.flagged_count,
551 }
554def _setting_info_to_dict(info: SettingInfo) -> dict[str, Any]:
555 """Render a SettingInfo as a JSON-safe dict for the MCP wire format."""
556 return {
557 "key": info.key,
558 "value": _json_safe(info.value),
559 "default": _json_safe(info.default),
560 "type": info.type,
561 "nullable": info.nullable,
562 "group": info.group.value,
563 "help": info.help_text,
564 "choices": list(info.choices) if info.choices else None,
565 "reindex_required": info.reindex_required,
566 }
569def _json_safe(value: Any) -> Any:
570 """Coerce Path / frozenset / tuple to JSON-friendly primitives."""
571 if isinstance(value, str | int | float | bool | list | type(None)):
572 return value
573 return str(value)
576@mcp.tool()
577def settings_list(group: str = "") -> dict[str, Any]:
578 """List every writable lilbee setting with its current value and metadata.
580 Returns one row per setting with ``key``, ``value``, ``default``,
581 ``type`` (``int``, ``float``, ``bool``, ``str``, ``list``, or a
582 ``foo|null`` union), ``nullable``, ``group`` (Retrieval, Generation,
583 Models, Ingest, Wiki, Crawling, API-Keys, System, Display),
584 ``help`` text, ``choices`` (for enum-typed fields), and
585 ``reindex_required`` (whether changing the value invalidates the
586 persisted vector store).
588 Args:
589 group: Filter by group name (case-insensitive). Empty = all.
590 """
592 try:
593 infos = list_settings(group or None)
594 except ValueError as exc:
595 return _error(str(exc))
596 return {
597 "command": "settings_list",
598 "settings": [_setting_info_to_dict(info) for info in infos],
599 "total": len(infos),
600 }
603@mcp.tool()
604def settings_get(key: str) -> dict[str, Any]:
605 """Get the current value and metadata for a single lilbee setting.
607 The ``nullable`` field on the returned setting means the writer
608 accepts ``None`` to delete the persisted entry; ``vision_model`` /
609 ``reranker_model`` report ``nullable=false`` even though an empty
610 string clears them.
612 Args:
613 key: Setting name (e.g. ``"top_k"``, ``"chunk_size"``,
614 ``"chat_model"``). Use ``settings_list`` to discover keys.
615 """
617 try:
618 info = get_setting(key)
619 except KeyError as exc:
620 return _error(str(exc))
621 return {"command": "settings_get", "setting": _setting_info_to_dict(info)}
624@mcp.tool()
625def settings_set(updates: dict[str, Any]) -> dict[str, Any]:
626 """Update one or more writable lilbee settings atomically.
628 Validates every key and value upfront; if any value fails validation
629 the entire batch rolls back and nothing is persisted. Successful
630 writes flush to ``config.toml`` and invalidate the in-process model
631 architecture, provider load, and API-key caches so the next call
632 observes the new configuration.
634 Args:
635 updates: Map of setting key to new value. Pass ``null`` to clear
636 a nullable field (falls back to the pydantic default on next
637 process start). Numeric strings are coerced to int / float by
638 pydantic; booleans accept ``true`` / ``false`` / ``1`` / ``0``.
640 Returns ``updated`` (the keys persisted) and ``reindex_required``
641 (true when one of ``chunk_size`` / ``chunk_overlap`` changed; the
642 caller should run ``sync(force_rebuild=true)`` to refresh the index).
643 """
645 try:
646 result = apply_settings_update(updates)
647 except (ValueError, TypeError) as exc:
648 return _error(str(exc))
649 return {
650 "command": "settings_set",
651 "updated": result.updated,
652 "reindex_required": result.reindex_required,
653 }
656@mcp.tool()
657def settings_reset(keys: list[str]) -> dict[str, Any]:
658 """Reset writable settings to their built-in defaults.
660 Nullable fields are cleared (the next process start falls back to
661 the pydantic default); non-nullable fields are written to disk with
662 their default value. Invalidates the same caches as ``settings_set``.
664 Args:
665 keys: Setting keys to reset. Use ``settings_list`` to discover
666 available keys.
667 """
669 try:
670 result = reset_settings(keys)
671 except (ValueError, TypeError) as exc:
672 return _error(str(exc))
673 return {
674 "command": "settings_reset",
675 "updated": result.updated,
676 "reindex_required": result.reindex_required,
677 }
680@mcp.tool()
681def model_list(source: str = "", task: str = "") -> dict[str, Any]:
682 """List installed models across native and SDK-backend sources.
684 Args:
685 source: Filter by source: "native", "remote", or "" for all.
686 task: Filter by task: "chat", "embedding", "vision", "rerank", or "" for all.
687 """
688 from lilbee.app.models import list_models_data
689 from lilbee.catalog.types import ModelTask
691 try:
692 src = ModelSource.parse(source)
693 except ValueError as exc:
694 return _error(str(exc))
695 try:
696 parsed_task = ModelTask(task) if task else None
697 except ValueError as exc:
698 return _error(str(exc))
699 return list_models_data(source=src, task=parsed_task).model_dump()
702@mcp.tool()
703def catalog_browse(
704 task: str = "",
705 search: str = "",
706 size: str = "",
707 installed: bool | None = None,
708 featured: bool | None = None,
709 sort: str = "featured",
710 limit: int = 20,
711 offset: int = 0,
712) -> dict[str, Any]:
713 """Browse the lilbee model catalog (featured entries + Hugging Face).
715 Lets an agent discover candidates for any model role before pulling.
716 The catalog is the same one the TUI browses: a curated featured list
717 augmented with live Hugging Face results when ``featured=false``.
719 Args:
720 task: ``"chat"``, ``"embedding"``, ``"vision"``, ``"rerank"``, or
721 ``""`` for all.
722 search: Substring filter on name / repo / description.
723 size: ``"small"`` (<3 GB), ``"medium"`` (3-10 GB), or ``"large"``
724 (>10 GB). Empty = no size filter.
725 installed: ``true`` shows only installed repos, ``false`` only
726 uninstalled, ``null`` shows both.
727 featured: ``true`` restricts to the curated featured list,
728 ``false`` skips it (HF results only), ``null`` includes both.
729 sort: ``"featured"``, ``"downloads"``, ``"name"``,
730 ``"size_asc"``, or ``"size_desc"``.
731 limit: Page size (default 20).
732 offset: Page offset for pagination.
734 Returns ``{total, limit, offset, has_more, models}`` where each model
735 is ``{ref, display_name, task, size_gb, min_ram_gb, downloads,
736 featured, description}``.
737 """
738 from lilbee.catalog.query import get_catalog
739 from lilbee.catalog.types import CatalogSize, CatalogSort, ModelTask
741 try:
742 parsed_task = ModelTask(task) if task else None
743 parsed_size = CatalogSize(size) if size else None
744 parsed_sort = CatalogSort(sort)
745 except ValueError as exc:
746 return _error(str(exc))
747 try:
748 result = get_catalog(
749 task=parsed_task,
750 search=search,
751 size=parsed_size,
752 installed=installed,
753 featured=featured,
754 sort=parsed_sort,
755 limit=limit,
756 offset=offset,
757 model_manager=get_services().model_manager,
758 )
759 except ValueError as exc:
760 return _error(str(exc))
761 return {
762 "command": "catalog_browse",
763 "total": result.total,
764 "limit": result.limit,
765 "offset": result.offset,
766 "has_more": result.has_more,
767 "models": [
768 {
769 "ref": m.hf_repo,
770 "display_name": m.display_name,
771 "task": m.task.value,
772 "size_gb": m.size_gb,
773 "min_ram_gb": m.min_ram_gb,
774 "downloads": m.downloads,
775 "featured": m.featured,
776 "description": m.description,
777 "architecture": m.architecture,
778 "compat": m.compat.value,
779 }
780 for m in result.models
781 ],
782 }
785@mcp.tool()
786def model_show(model: str) -> dict[str, Any]:
787 """Show catalog and installed metadata for a model ref."""
788 from lilbee.app.models import show_model_data
789 from lilbee.modelhub.model_manager import ModelNotFoundError
791 try:
792 return show_model_data(model).model_dump()
793 except ModelNotFoundError as exc:
794 return _error(str(exc))
797def _log_progress_failure(future: concurrent.futures.Future[None]) -> None:
798 """Log report_progress failures without raising.
800 Progress notifications are best-effort: a failure should not abort
801 an in-flight pull.
802 """
803 try:
804 future.result()
805 except Exception:
806 log.warning("MCP report_progress failed", exc_info=True)
809@mcp.tool()
810async def model_pull(
811 model: str,
812 source: str = ModelSource.NATIVE.value,
813 allow_unsupported: bool = False,
814 ctx: Context | None = None,
815) -> dict[str, Any]:
816 """Download a model, streaming progress via MCP notifications.
818 Args:
819 model: Model ref to pull (e.g. "Qwen/Qwen3-0.6B-GGUF" or
820 "Qwen/Qwen3-0.6B-GGUF/Qwen3-0.6B-Q8_0.gguf").
821 source: "native" (HuggingFace GGUF) or "remote" (SDK-managed).
822 allow_unsupported: Set true to pull even when the model's architecture
823 isn't supported by this lilbee build. Default refuses with a
824 structured error and the list of supported architectures.
825 """
826 from lilbee.app.models import pull_model_data
827 from lilbee.catalog import DownloadProgress
828 from lilbee.catalog.compat import SUPPORTED_ARCHS, UnsupportedArchError
830 try:
831 src = ModelSource.parse(source) or ModelSource.NATIVE
832 except ValueError as exc:
833 return _error(str(exc))
835 loop = asyncio.get_running_loop()
837 def on_update(p: DownloadProgress) -> None:
838 if ctx is None:
839 return
840 future = asyncio.run_coroutine_threadsafe(
841 ctx.report_progress(progress=float(p.percent), total=100.0, message=p.detail),
842 loop,
843 )
844 future.add_done_callback(_log_progress_failure)
846 try:
847 result = await asyncio.to_thread(
848 pull_model_data, model, src, on_update=on_update, allow_unsupported=allow_unsupported
849 )
850 except UnsupportedArchError as exc:
851 return {
852 "ok": False,
853 "command": "model_pull",
854 "error": {
855 "code": "unsupported_arch",
856 "arch": exc.architecture,
857 "ref": exc.ref,
858 "supported_examples": sorted(SUPPORTED_ARCHS)[:5],
859 "total_supported": len(SUPPORTED_ARCHS),
860 },
861 }
862 except (RuntimeError, PermissionError) as exc:
863 return _error(str(exc))
864 return result.model_dump()
867@mcp.tool()
868def model_rm(model: str, source: str = "") -> dict[str, Any]:
869 """Remove an installed model.
871 Args:
872 model: Model ref to remove. lilbee removes only native models it
873 downloaded; Ollama and LM Studio models are read-only.
874 source: Restrict to a known source; empty = resolve from the ref.
875 """
876 from lilbee.app.models import remove_model_data
878 try:
879 src = ModelSource.parse(source)
880 return remove_model_data(model, source=src).model_dump()
881 except ValueError as exc:
882 return _error(str(exc))
885def wiki_drafts_list() -> dict[str, Any]:
886 """List pending wiki drafts with drift, faithfulness, and pairing info.
888 Read-only. Accept and reject are CLI-only (destructive, explicit).
889 """
890 from lilbee.wiki.drafts import list_drafts
892 wiki_root = cfg.data_root / cfg.wiki_dir
893 drafts = list_drafts(wiki_root)
894 return {
895 "command": "wiki_drafts_list",
896 "drafts": [d.to_dict() for d in drafts],
897 "total": len(drafts),
898 }
901def wiki_drafts_diff(slug: str) -> dict[str, Any]:
902 """Return a unified diff of the draft against its published counterpart.
904 Args:
905 slug: Draft slug (e.g. ``"chevrolet"``).
906 """
907 from lilbee.wiki.drafts import diff_draft
909 wiki_root = cfg.data_root / cfg.wiki_dir
910 try:
911 diff = diff_draft(slug, wiki_root)
912 except FileNotFoundError as exc:
913 return _error(str(exc))
914 return {"command": "wiki_drafts_diff", "slug": slug, "diff": diff}
917def _client_name(ctx: Context | None) -> str:
918 """The MCP client's self-reported name from the initialize handshake, or empty."""
919 if ctx is None:
920 return ""
921 params = ctx.session.client_params
922 return params.clientInfo.name if params is not None else ""
925def _slug(value: str) -> str:
926 """Lowercase, hyphenated id fragment; falls back to ``generic`` when empty."""
927 slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
928 return slug or "generic"
931def _derive_owner(agent_id: str, ctx: Context | None) -> str:
932 """Resolve the calling agent's stable owner namespace.
934 Precedence: explicit ``agent_id`` argument, then the ``LILBEE_AGENT_ID`` env var
935 (pinned in the client's MCP config), then the MCP client name, then ``generic``.
936 """
937 explicit = agent_id or os.environ.get("LILBEE_AGENT_ID", "")
938 return agent_owner(_slug(explicit or _client_name(ctx)))
941def memory_remember(
942 text: str,
943 kind: MemoryKind = MemoryKind.FACT,
944 shared: bool = False,
945 agent_id: str = "",
946 ctx: Context | None = None,
947) -> dict[str, Any]:
948 """Store a durable memory in this agent's own namespace.
950 Args:
951 text: The fact or preference to remember.
952 kind: "fact" (recalled by similarity) or "preference" (always recalled).
953 shared: Set true to also expose this memory to the human's TUI/CLI.
954 agent_id: Stable id for this agent's namespace; otherwise derived from
955 LILBEE_AGENT_ID or the MCP client name.
956 """
957 if not memory_enabled():
958 return _error(MEMORY_DISABLED_HINT)
959 owner = _derive_owner(agent_id, ctx)
960 memory_id = remember(text, owner=owner, kind=kind, source=MemorySource.AGENT, shared=shared)
961 return {"ok": True, "id": memory_id, "owner": owner}
964def memory_recall(
965 query: str, limit: int = 0, agent_id: str = "", ctx: Context | None = None
966) -> dict[str, Any]:
967 """Recall this agent's memories (plus any the human shared) relevant to *query*."""
968 if not memory_enabled():
969 return _error(MEMORY_DISABLED_HINT)
970 owner = _derive_owner(agent_id, ctx)
971 memories = recall(query, owner, top_k=limit if limit > 0 else None)
972 return {
973 "memories": [
974 {"id": m.id, "text": m.text, "kind": m.kind.value, "owner": m.owner} for m in memories
975 ]
976 }
979def memory_list(agent_id: str = "", ctx: Context | None = None) -> dict[str, Any]:
980 """List every memory in this agent's namespace (any kind, newest first)."""
981 if not memory_enabled():
982 return _error(MEMORY_DISABLED_HINT)
983 owner = _derive_owner(agent_id, ctx)
984 memories = list_memories(owner)
985 return {
986 "memories": [
987 {"id": m.id, "text": m.text, "kind": m.kind.value, "shared": m.shared} for m in memories
988 ]
989 }
992def memory_forget(memory_id: str) -> dict[str, Any]:
993 """Delete a memory by id."""
994 if not memory_enabled():
995 return _error(MEMORY_DISABLED_HINT)
996 forget(memory_id)
997 return {"ok": True, "id": memory_id}
1000_WIKI_TOOLS = (
1001 wiki_status,
1002 wiki_list,
1003 wiki_read,
1004 wiki_lint,
1005 wiki_citations,
1006 wiki_build,
1007 wiki_update,
1008 wiki_synthesize,
1009 wiki_prune,
1010 wiki_drafts_list,
1011 wiki_drafts_diff,
1012)
1013_MEMORY_TOOLS = (memory_remember, memory_recall, memory_list, memory_forget)
1016def register_conditional_tools() -> None:
1017 """Register wiki and memory tools only when their subsystems are enabled."""
1018 if cfg.wiki:
1019 for wiki_tool in _WIKI_TOOLS:
1020 mcp.tool()(wiki_tool)
1021 if memory_enabled():
1022 for memory_tool in _MEMORY_TOOLS:
1023 mcp.tool()(memory_tool)
1026def main() -> None:
1027 """Entry point for the MCP server."""
1028 register_conditional_tools()
1029 # Preload so the first tool call doesn't pay the cold-start cost
1030 # of provider/embedder/store init. Failures (missing model, bad
1031 # config) still surface on the first tool call rather than crashing
1032 # the server before it attaches to stdio.
1033 try:
1034 get_services()
1035 except Exception:
1036 log.debug("MCP pre-warm failed; services will init on first call", exc_info=True)
1038 from lilbee.parent_monitor import parse_parent_pid, watch_parent_thread
1040 parent_pid = parse_parent_pid()
1041 if parent_pid is not None:
1042 watch_parent_thread(parent_pid, lambda: os._exit(0))
1044 mcp.run()