Coverage for src / lilbee / data / store / lance_helpers.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""LanceDB plumbing helpers: table introspection, safe deletes, SQL escaping, error text.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from typing import TYPE_CHECKING 

8 

9from lilbee.catalog.refs import hf_repo_from_ref 

10from lilbee.runtime.lock import write_lock 

11 

12from .types import CHUNK_TYPE_RAW 

13 

14if TYPE_CHECKING: 

15 import lancedb 

16 import lancedb.table 

17 import pyarrow as pa 

18 

19log = logging.getLogger(__name__) 

20 

21 

22def install_lancedb_thread_error_suppressor() -> None: 

23 """Install a ``threading.excepthook`` that swallows lancedb shutdown noise. 

24 lancedb has no ``close()`` API and its internal event loop thread crashes 

25 during Python interpreter teardown. The exception is harmless (the process 

26 is exiting anyway) but pollutes CLI/TUI output. This is opt-in so importing 

27 ``lilbee.data.store`` has no hidden side effects; call it once from the CLI/TUI 

28 bootstrap. 

29 """ 

30 original = threading.excepthook 

31 

32 def _hook(args: threading.ExceptHookArgs) -> None: 

33 if args.thread and "LanceDB" in args.thread.name: 

34 return 

35 original(args) 

36 

37 threading.excepthook = _hook 

38 

39 

40def _table_names(db: lancedb.DBConnection) -> list[str]: 

41 """Get list of table names, handling the ListTablesResponse object.""" 

42 result = db.list_tables() 

43 try: 

44 return result.tables # type: ignore[no-any-return, union-attr] 

45 except AttributeError: 

46 return list(result) # type: ignore[arg-type] 

47 

48 

49def ensure_table(db: lancedb.DBConnection, name: str, schema: pa.Schema) -> lancedb.table.Table: 

50 if name in _table_names(db): 

51 return db.open_table(name) 

52 try: 

53 return db.create_table(name, schema=schema) 

54 except ValueError: 

55 return db.open_table(name) 

56 

57 

58def _safe_delete_unlocked(table: lancedb.table.Table, predicate: str) -> None: 

59 """Delete rows matching predicate, logging on failure. Caller must hold write lock.""" 

60 try: 

61 table.delete(predicate) 

62 except Exception: 

63 log.warning("Failed to delete rows matching: %s", predicate, exc_info=True) 

64 

65 

66def safe_delete(table: lancedb.table.Table, predicate: str) -> None: 

67 """Delete rows matching predicate, logging on failure.""" 

68 with write_lock(): 

69 _safe_delete_unlocked(table, predicate) 

70 

71 

72def escape_sql_string(value: str) -> str: 

73 """Escape single quotes for SQL predicates.""" 

74 return value.replace("\\", "\\\\").replace("'", "''") 

75 

76 

77def _chunk_type_predicate(chunk_type: str) -> str: 

78 """SQL predicate that matches ``chunk_type`` while tolerating NULL rows. 

79 

80 Rows written before ``chunk_type`` was populated land as NULL. They 

81 are semantically raw, so a ``'raw'`` filter still includes them; a 

82 ``'wiki'`` filter excludes them. 

83 """ 

84 escaped = escape_sql_string(chunk_type) 

85 if chunk_type == CHUNK_TYPE_RAW: 

86 return f"(chunk_type = '{escaped}' OR chunk_type IS NULL)" 

87 return f"chunk_type = '{escaped}'" 

88 

89 

90def _has_fts_index(table: lancedb.table.Table) -> bool: 

91 """Return True when an FTS index on the chunk column already exists.""" 

92 try: 

93 for idx in table.list_indices(): 

94 if idx.index_type == "FTS" and "chunk" in idx.columns: 

95 return True 

96 except Exception: 

97 return False 

98 return False 

99 

100 

101def _sources_search_filter(search: str | None) -> str | None: 

102 """Case-insensitive filename WHERE clause, or ``None`` for empty *search*.""" 

103 if not search: 

104 return None 

105 escaped = escape_sql_string(search.lower()) 

106 return f"LOWER(filename) LIKE '%{escaped}%'" 

107 

108 

109def refs_compatible( 

110 persisted_ref: str, 

111 current_ref: str, 

112 persisted_dim: int, 

113 current_dim: int, 

114) -> bool: 

115 """Return True when *persisted_ref* and *current_ref* describe the same embedder. 

116 

117 Compatible iff dims match and either the raw refs are equal or the persisted 

118 ref is the legacy bare-repo form (``<org>/<repo>`` without a ``.gguf`` 

119 filename) whose repo matches the current canonical full ref. The legacy 

120 asymmetry exists because pre-canonical lilbee versions persisted only the 

121 repo; the current code persists the full ``<org>/<repo>/<filename>.gguf``. 

122 Two different ``.gguf`` files in the same repo are not lumped together 

123 (different quantizations can produce subtly different vectors), so both- 

124 full-ref strict identity is preserved. 

125 """ 

126 if persisted_dim != current_dim: 

127 return False 

128 if persisted_ref == current_ref: 

129 return True 

130 if persisted_ref.endswith(".gguf"): 

131 return False 

132 if not current_ref.endswith(".gguf"): 

133 return False 

134 return hf_repo_from_ref(current_ref) == persisted_ref 

135 

136 

137def _embedding_mismatch_message( 

138 persisted_model: str, 

139 persisted_dim: int, 

140 current_model: str, 

141 current_dim: int, 

142) -> str: 

143 return ( 

144 f"The vector store was built with embedding model '{persisted_model}' " 

145 f"(dim {persisted_dim}), but lilbee is now configured to use " 

146 f"'{current_model}' (dim {current_dim}). Search and ingest are disabled " 

147 "until the store is rebuilt under the new model. " 

148 'Run `lilbee rebuild` or POST /api/sync with `{"force_rebuild": true}`.' 

149 )