Coverage for src / lilbee / data / store / lance_helpers.py: 100%
78 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""LanceDB plumbing helpers: table introspection, safe deletes, SQL escaping, error text."""
3from __future__ import annotations
5import logging
6import threading
7from typing import TYPE_CHECKING
9from lilbee.catalog.refs import hf_repo_from_ref
10from lilbee.runtime.lock import write_lock
12from .types import LOCAL_OWNER, ChunkType
14if TYPE_CHECKING:
15 import lancedb
16 import lancedb.table
17 import pyarrow as pa
19log = logging.getLogger(__name__)
22def install_lancedb_thread_error_suppressor() -> None:
23 """Install a ``threading.excepthook`` that swallows lancedb shutdown noise.
24 lancedb has no ``close()`` API and its internal event loop thread crashes
25 during Python interpreter teardown. The exception is harmless (the process
26 is exiting anyway) but pollutes CLI/TUI output. This is opt-in so importing
27 ``lilbee.data.store`` has no hidden side effects; call it once from the CLI/TUI
28 bootstrap.
29 """
30 original = threading.excepthook
32 def _hook(args: threading.ExceptHookArgs) -> None:
33 if args.thread and "LanceDB" in args.thread.name:
34 return
35 original(args)
37 threading.excepthook = _hook
40def _table_names(db: lancedb.DBConnection) -> list[str]:
41 """Get list of table names, handling the ListTablesResponse object."""
42 result = db.list_tables()
43 try:
44 return result.tables # type: ignore[no-any-return, union-attr]
45 except AttributeError:
46 return list(result) # type: ignore[arg-type]
49def ensure_table(db: lancedb.DBConnection, name: str, schema: pa.Schema) -> lancedb.table.Table:
50 if name in _table_names(db):
51 return db.open_table(name)
52 try:
53 return db.create_table(name, schema=schema)
54 except ValueError:
55 return db.open_table(name)
58def _safe_delete_unlocked(table: lancedb.table.Table, predicate: str) -> None:
59 """Delete rows matching predicate, logging on failure. Caller must hold write lock."""
60 try:
61 table.delete(predicate)
62 except Exception:
63 log.warning("Failed to delete rows matching: %s", predicate, exc_info=True)
66def safe_delete(table: lancedb.table.Table, predicate: str) -> None:
67 """Delete rows matching predicate, logging on failure."""
68 with write_lock():
69 _safe_delete_unlocked(table, predicate)
72def escape_sql_string(value: str) -> str:
73 """Escape single quotes for SQL predicates."""
74 return value.replace("\\", "\\\\").replace("'", "''")
77def local_owner_predicate() -> str:
78 """SQL predicate selecting the local human's memories."""
79 return f"owner = '{LOCAL_OWNER}'"
82def agent_recall_predicate(owner: str) -> str:
83 """SQL predicate for an agent: its own memories plus the human's shared ones."""
84 return f"owner = '{escape_sql_string(owner)}' OR (shared = true AND owner = '{LOCAL_OWNER}')"
87def _chunk_type_predicate(chunk_type: ChunkType | str) -> str:
88 """SQL predicate that matches ``chunk_type`` while tolerating NULL rows.
90 Rows written before ``chunk_type`` was populated land as NULL. They
91 are semantically raw, so a ``'raw'`` filter still includes them; a
92 ``'wiki'`` filter excludes them.
93 """
94 escaped = escape_sql_string(chunk_type)
95 if chunk_type == ChunkType.RAW:
96 return f"(chunk_type = '{escaped}' OR chunk_type IS NULL)"
97 return f"chunk_type = '{escaped}'"
100def _has_fts_index(table: lancedb.table.Table) -> bool:
101 """Return True when an FTS index on the chunk column already exists."""
102 try:
103 for idx in table.list_indices():
104 if idx.index_type == "FTS" and "chunk" in idx.columns:
105 return True
106 except Exception:
107 return False
108 return False
111def _has_vector_index(table: lancedb.table.Table) -> bool:
112 """Return True when an ANN index on the vector column already exists.
114 LanceDB reports IVF index types as ``IvfPq`` / ``IvfFlat`` etc., so the
115 family match is case-insensitive.
116 """
117 try:
118 for idx in table.list_indices():
119 if "IVF" in idx.index_type.upper() and "vector" in idx.columns:
120 return True
121 except Exception:
122 return False
123 return False
126def _sources_search_filter(search: str | None) -> str | None:
127 """Case-insensitive filename WHERE clause, or ``None`` for empty *search*."""
128 if not search:
129 return None
130 escaped = escape_sql_string(search.lower())
131 return f"LOWER(filename) LIKE '%{escaped}%'"
134def refs_compatible(
135 persisted_ref: str,
136 current_ref: str,
137 persisted_dim: int,
138 current_dim: int,
139) -> bool:
140 """Return True when *persisted_ref* and *current_ref* describe the same embedder.
142 Compatible iff dims match and either the raw refs are equal or the persisted
143 ref is the legacy bare-repo form (``<org>/<repo>`` without a ``.gguf``
144 filename) whose repo matches the current canonical full ref. The legacy
145 asymmetry exists because pre-canonical lilbee versions persisted only the
146 repo; the current code persists the full ``<org>/<repo>/<filename>.gguf``.
147 Two different ``.gguf`` files in the same repo are not lumped together
148 (different quantizations can produce subtly different vectors), so both-
149 full-ref strict identity is preserved.
150 """
151 if persisted_dim != current_dim:
152 return False
153 if persisted_ref == current_ref:
154 return True
155 if persisted_ref.endswith(".gguf"):
156 return False
157 if not current_ref.endswith(".gguf"):
158 return False
159 return hf_repo_from_ref(current_ref) == persisted_ref