Coverage for src / lilbee / retrieval / clustering_embedding / clusterer.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Chunk-level mutual-kNN clusterer with TF-IDF labels. 

2 

3Pipeline: 

4 

51. Load every chunk's (source, chunk_index, text, vector) from LanceDB. 

62. Build a float32 matrix ``V`` and L2-normalize rows. 

73. Compute a mutual k-nearest-neighbors graph over chunks using blocked 

8 similarity. ``k`` auto-scales from corpus size unless 

9 ``config.wiki_clusterer_k`` is set. 

104. Run asynchronous Label Propagation (Raghavan et al. 2007) over the 

11 mutual-kNN graph to obtain chunk-level communities. 

125. Aggregate chunk communities into source communities, requiring a 

13 source to contribute at least ``min(3, ceil(0.2 * total_chunks))`` 

14 chunks before it joins a cluster. This keeps a single stray chunk 

15 from dragging a whole document into an unrelated cluster. 

166. Filter communities that span fewer than ``min_sources`` distinct 

17 sources, then label each cluster with TF-IDF scoring over member 

18 chunk text against corpus-wide document frequency. 

19""" 

20 

21from __future__ import annotations 

22 

23import logging 

24 

25from lilbee.core.config import CHUNKS_TABLE, Config 

26from lilbee.data.store import Store 

27from lilbee.retrieval.clustering import SourceCluster 

28from lilbee.retrieval.clustering_embedding.helpers import ( 

29 _build_clusters, 

30 _corpus_document_frequency, 

31 _load_chunk_records, 

32 _source_totals, 

33 auto_k, 

34 communities_by_label, 

35 label_propagation, 

36 mutual_knn, 

37 normalize_rows, 

38) 

39 

40log = logging.getLogger(__name__) 

41 

42 

43def _warn_if_undersegmented( 

44 clusters: list[SourceCluster], 

45 source_totals: dict[str, int], 

46) -> None: 

47 """Warn when a single cluster covers more than half the corpus sources.""" 

48 if not clusters or not source_totals: 

49 return 

50 total_sources = len(source_totals) 

51 for cluster in clusters: 

52 if len(cluster.sources) * 2 > total_sources: 

53 log.warning( 

54 "wiki clustering: cluster %r covers %d/%d sources; " 

55 "consider lowering wiki_clusterer_k or check embedding quality", 

56 cluster.label, 

57 len(cluster.sources), 

58 total_sources, 

59 ) 

60 break 

61 

62 

63class EmbeddingClusterer: 

64 """Chunk-level mutual-kNN clusterer with TF-IDF labels.""" 

65 

66 def __init__(self, config: Config, store: Store) -> None: 

67 self._config = config 

68 self._store = store 

69 

70 def available(self) -> bool: 

71 """Clusterer is available when the chunks table has any rows. 

72 

73 ``count_rows()`` is a LanceDB call that can raise on transient 

74 backend issues (concurrent compaction, schema rewrites). When 

75 it does, we optimistically report available=True and let 

76 ``get_clusters`` surface the real error on the next scan: the 

77 alternative would silently disable wiki synthesis without the 

78 user seeing why. A WARNING is emitted so the failure is still 

79 visible at the default log level. 

80 """ 

81 table = self._store.open_table(CHUNKS_TABLE) 

82 if table is None: 

83 return False 

84 try: 

85 return bool(table.count_rows()) 

86 except Exception: 

87 log.warning( 

88 "count_rows() failed on chunks table; reporting available=True " 

89 "optimistically and deferring the error to get_clusters", 

90 exc_info=True, 

91 ) 

92 return True 

93 

94 def get_clusters(self, min_sources: int = 3) -> list[SourceCluster]: 

95 """Return chunk-level communities projected to source clusters.""" 

96 records, matrix = _load_chunk_records(self._store) 

97 if not records: 

98 return [] 

99 

100 matrix, keep_mask = normalize_rows(matrix) 

101 records = [record for record, keep in zip(records, keep_mask, strict=True) if keep] 

102 if not records: 

103 return [] 

104 

105 configured_k = self._config.wiki_clusterer_k 

106 k = configured_k if configured_k > 0 else auto_k(len(records)) 

107 adjacency = mutual_knn(matrix, k) 

108 if not any(adjacency.values()): 

109 # WARNING (not INFO) so users see why synthesis produced zero 

110 # pages at the default log level: matches the other degenerate 

111 # clustering outcome, ``_warn_if_undersegmented``. 

112 log.warning( 

113 "wiki clustering: N=%d k=%d no mutual edges: skipping synthesis", 

114 len(records), 

115 k, 

116 ) 

117 return [] 

118 labels = label_propagation(adjacency, order=list(range(len(records)))) 

119 communities = communities_by_label(labels) 

120 

121 totals = _source_totals(records) 

122 df = _corpus_document_frequency(records) 

123 clusters, noise = _build_clusters(communities, records, totals, df, min_sources) 

124 

125 log.info( 

126 "wiki clustering: N=%d k=%d communities=%d kept=%d noise=%d", 

127 len(records), 

128 k, 

129 len(communities), 

130 len(clusters), 

131 noise, 

132 ) 

133 _warn_if_undersegmented(clusters, totals) 

134 return clusters