Coverage for src / lilbee / data / store / lance_helpers.py: 100%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""LanceDB plumbing helpers: table introspection, safe deletes, SQL escaping, error text.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from typing import TYPE_CHECKING 

8 

9from lilbee.catalog.refs import hf_repo_from_ref 

10from lilbee.runtime.lock import write_lock 

11 

12from .types import LOCAL_OWNER, ChunkType 

13 

14if TYPE_CHECKING: 

15 import lancedb 

16 import lancedb.table 

17 import pyarrow as pa 

18 

19log = logging.getLogger(__name__) 

20 

21 

22def install_lancedb_thread_error_suppressor() -> None: 

23 """Install a ``threading.excepthook`` that swallows lancedb shutdown noise. 

24 lancedb has no ``close()`` API and its internal event loop thread crashes 

25 during Python interpreter teardown. The exception is harmless (the process 

26 is exiting anyway) but pollutes CLI/TUI output. This is opt-in so importing 

27 ``lilbee.data.store`` has no hidden side effects; call it once from the CLI/TUI 

28 bootstrap. 

29 """ 

30 original = threading.excepthook 

31 

32 def _hook(args: threading.ExceptHookArgs) -> None: 

33 if args.thread and "LanceDB" in args.thread.name: 

34 return 

35 original(args) 

36 

37 threading.excepthook = _hook 

38 

39 

40def _table_names(db: lancedb.DBConnection) -> list[str]: 

41 """Get list of table names, handling the ListTablesResponse object.""" 

42 result = db.list_tables() 

43 try: 

44 return result.tables # type: ignore[no-any-return, union-attr] 

45 except AttributeError: 

46 return list(result) # type: ignore[arg-type] 

47 

48 

49def ensure_table(db: lancedb.DBConnection, name: str, schema: pa.Schema) -> lancedb.table.Table: 

50 if name in _table_names(db): 

51 return db.open_table(name) 

52 try: 

53 return db.create_table(name, schema=schema) 

54 except ValueError: 

55 return db.open_table(name) 

56 

57 

58def _safe_delete_unlocked(table: lancedb.table.Table, predicate: str) -> None: 

59 """Delete rows matching predicate, logging on failure. Caller must hold write lock.""" 

60 try: 

61 table.delete(predicate) 

62 except Exception: 

63 log.warning("Failed to delete rows matching: %s", predicate, exc_info=True) 

64 

65 

66def safe_delete(table: lancedb.table.Table, predicate: str) -> None: 

67 """Delete rows matching predicate, logging on failure.""" 

68 with write_lock(): 

69 _safe_delete_unlocked(table, predicate) 

70 

71 

72def escape_sql_string(value: str) -> str: 

73 """Escape single quotes for SQL predicates.""" 

74 return value.replace("\\", "\\\\").replace("'", "''") 

75 

76 

77def local_owner_predicate() -> str: 

78 """SQL predicate selecting the local human's memories.""" 

79 return f"owner = '{LOCAL_OWNER}'" 

80 

81 

82def agent_recall_predicate(owner: str) -> str: 

83 """SQL predicate for an agent: its own memories plus the human's shared ones.""" 

84 return f"owner = '{escape_sql_string(owner)}' OR (shared = true AND owner = '{LOCAL_OWNER}')" 

85 

86 

87def _chunk_type_predicate(chunk_type: ChunkType | str) -> str: 

88 """SQL predicate that matches ``chunk_type`` while tolerating NULL rows. 

89 

90 Rows written before ``chunk_type`` was populated land as NULL. They 

91 are semantically raw, so a ``'raw'`` filter still includes them; a 

92 ``'wiki'`` filter excludes them. 

93 """ 

94 escaped = escape_sql_string(chunk_type) 

95 if chunk_type == ChunkType.RAW: 

96 return f"(chunk_type = '{escaped}' OR chunk_type IS NULL)" 

97 return f"chunk_type = '{escaped}'" 

98 

99 

100def _has_fts_index(table: lancedb.table.Table) -> bool: 

101 """Return True when an FTS index on the chunk column already exists.""" 

102 try: 

103 for idx in table.list_indices(): 

104 if idx.index_type == "FTS" and "chunk" in idx.columns: 

105 return True 

106 except Exception: 

107 return False 

108 return False 

109 

110 

111def _has_vector_index(table: lancedb.table.Table) -> bool: 

112 """Return True when an ANN index on the vector column already exists. 

113 

114 LanceDB reports IVF index types as ``IvfPq`` / ``IvfFlat`` etc., so the 

115 family match is case-insensitive. 

116 """ 

117 try: 

118 for idx in table.list_indices(): 

119 if "IVF" in idx.index_type.upper() and "vector" in idx.columns: 

120 return True 

121 except Exception: 

122 return False 

123 return False 

124 

125 

126def _sources_search_filter(search: str | None) -> str | None: 

127 """Case-insensitive filename WHERE clause, or ``None`` for empty *search*.""" 

128 if not search: 

129 return None 

130 escaped = escape_sql_string(search.lower()) 

131 return f"LOWER(filename) LIKE '%{escaped}%'" 

132 

133 

134def refs_compatible( 

135 persisted_ref: str, 

136 current_ref: str, 

137 persisted_dim: int, 

138 current_dim: int, 

139) -> bool: 

140 """Return True when *persisted_ref* and *current_ref* describe the same embedder. 

141 

142 Compatible iff dims match and either the raw refs are equal or the persisted 

143 ref is the legacy bare-repo form (``<org>/<repo>`` without a ``.gguf`` 

144 filename) whose repo matches the current canonical full ref. The legacy 

145 asymmetry exists because pre-canonical lilbee versions persisted only the 

146 repo; the current code persists the full ``<org>/<repo>/<filename>.gguf``. 

147 Two different ``.gguf`` files in the same repo are not lumped together 

148 (different quantizations can produce subtly different vectors), so both- 

149 full-ref strict identity is preserved. 

150 """ 

151 if persisted_dim != current_dim: 

152 return False 

153 if persisted_ref == current_ref: 

154 return True 

155 if persisted_ref.endswith(".gguf"): 

156 return False 

157 if not current_ref.endswith(".gguf"): 

158 return False 

159 return hf_repo_from_ref(current_ref) == persisted_ref