Coverage for src / lilbee / retrieval / clustering_embedding / clusterer.py: 100%
50 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Chunk-level mutual-kNN clusterer with TF-IDF labels.
3Pipeline:
51. Load every chunk's (source, chunk_index, text, vector) from LanceDB.
62. Build a float32 matrix ``V`` and L2-normalize rows.
73. Compute a mutual k-nearest-neighbors graph over chunks using blocked
8 similarity. ``k`` auto-scales from corpus size unless
9 ``config.wiki_clusterer_k`` is set.
104. Run asynchronous Label Propagation (Raghavan et al. 2007) over the
11 mutual-kNN graph to obtain chunk-level communities.
125. Aggregate chunk communities into source communities, requiring a
13 source to contribute at least ``min(3, ceil(0.2 * total_chunks))``
14 chunks before it joins a cluster. This keeps a single stray chunk
15 from dragging a whole document into an unrelated cluster.
166. Filter communities that span fewer than ``min_sources`` distinct
17 sources, then label each cluster with TF-IDF scoring over member
18 chunk text against corpus-wide document frequency.
19"""
21from __future__ import annotations
23import logging
25from lilbee.core.config import CHUNKS_TABLE, Config
26from lilbee.data.store import Store
27from lilbee.retrieval.clustering import SourceCluster
28from lilbee.retrieval.clustering_embedding.helpers import (
29 _build_clusters,
30 _corpus_document_frequency,
31 _load_chunk_records,
32 _source_totals,
33 auto_k,
34 communities_by_label,
35 label_propagation,
36 mutual_knn,
37 normalize_rows,
38)
40log = logging.getLogger(__name__)
43def _warn_if_undersegmented(
44 clusters: list[SourceCluster],
45 source_totals: dict[str, int],
46) -> None:
47 """Warn when a single cluster covers more than half the corpus sources."""
48 if not clusters or not source_totals:
49 return
50 total_sources = len(source_totals)
51 for cluster in clusters:
52 if len(cluster.sources) * 2 > total_sources:
53 log.warning(
54 "wiki clustering: cluster %r covers %d/%d sources; "
55 "consider lowering wiki_clusterer_k or check embedding quality",
56 cluster.label,
57 len(cluster.sources),
58 total_sources,
59 )
60 break
63class EmbeddingClusterer:
64 """Chunk-level mutual-kNN clusterer with TF-IDF labels."""
66 def __init__(self, config: Config, store: Store) -> None:
67 self._config = config
68 self._store = store
70 def available(self) -> bool:
71 """Clusterer is available when the chunks table has any rows.
73 ``count_rows()`` is a LanceDB call that can raise on transient
74 backend issues (concurrent compaction, schema rewrites). When
75 it does, we optimistically report available=True and let
76 ``get_clusters`` surface the real error on the next scan: the
77 alternative would silently disable wiki synthesis without the
78 user seeing why. A WARNING is emitted so the failure is still
79 visible at the default log level.
80 """
81 table = self._store.open_table(CHUNKS_TABLE)
82 if table is None:
83 return False
84 try:
85 return bool(table.count_rows())
86 except Exception:
87 log.warning(
88 "count_rows() failed on chunks table; reporting available=True "
89 "optimistically and deferring the error to get_clusters",
90 exc_info=True,
91 )
92 return True
94 def get_clusters(self, min_sources: int = 3) -> list[SourceCluster]:
95 """Return chunk-level communities projected to source clusters."""
96 records, matrix = _load_chunk_records(self._store)
97 if not records:
98 return []
100 matrix, keep_mask = normalize_rows(matrix)
101 records = [record for record, keep in zip(records, keep_mask, strict=True) if keep]
102 if not records:
103 return []
105 configured_k = self._config.wiki_clusterer_k
106 k = configured_k if configured_k > 0 else auto_k(len(records))
107 adjacency = mutual_knn(matrix, k)
108 if not any(adjacency.values()):
109 # WARNING (not INFO) so users see why synthesis produced zero
110 # pages at the default log level: matches the other degenerate
111 # clustering outcome, ``_warn_if_undersegmented``.
112 log.warning(
113 "wiki clustering: N=%d k=%d no mutual edges: skipping synthesis",
114 len(records),
115 k,
116 )
117 return []
118 labels = label_propagation(adjacency, order=list(range(len(records))))
119 communities = communities_by_label(labels)
121 totals = _source_totals(records)
122 df = _corpus_document_frequency(records)
123 clusters, noise = _build_clusters(communities, records, totals, df, min_sources)
125 log.info(
126 "wiki clustering: N=%d k=%d communities=%d kept=%d noise=%d",
127 len(records),
128 k,
129 len(communities),
130 len(clusters),
131 noise,
132 )
133 _warn_if_undersegmented(clusters, totals)
134 return clusters