Coverage for src / lilbee / retrieval / concepts / community.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Concept community dataclass and PMI / Leiden helpers.""" 

2 

3from __future__ import annotations 

4 

5import math 

6from collections import Counter 

7from dataclasses import dataclass 

8from typing import Any 

9 

10_MIN_LEIDEN_WEIGHT = 0.01 

11 

12 

13@dataclass 

14class Community: 

15 """A cluster of related concepts from Leiden partitioning.""" 

16 

17 cluster_id: int 

18 size: int 

19 concepts: list[str] 

20 

21 

22def _compute_pmi( 

23 cooccurrences: Counter[tuple[str, str]], 

24 concept_counts: Counter[str], 

25 total_chunks: int, 

26) -> dict[tuple[str, str], float]: 

27 """Compute PPMI (Positive PMI) weights for concept co-occurrence pairs. 

28 PPMI = max(0, log2(P(a,b) / (P(a) * P(b)))). 

29 Based on Church & Hanks 1990, "Word Association Norms, Mutual Information, 

30 and Lexicography." Negative values are clamped to zero to discard 

31 anti-correlated pairs. 

32 """ 

33 pmi: dict[tuple[str, str], float] = {} 

34 for (a, b), count in cooccurrences.items(): 

35 p_a = concept_counts[a] / total_chunks 

36 p_b = concept_counts[b] / total_chunks 

37 if p_a == 0 or p_b == 0: 

38 continue 

39 p_ab = count / total_chunks 

40 pmi[(a, b)] = max(0.0, math.log2(p_ab / (p_a * p_b))) 

41 return pmi 

42 

43 

44def _leiden_partition( 

45 edge_rows: list[dict[str, Any]], 

46) -> tuple[dict[str, int], dict[str, int]]: 

47 """Run Leiden clustering on edge rows. Returns (partition, degree_map). 

48 Uses graspologic-native's Rust implementation (Traag et al. 2019, 

49 "From Louvain to Leiden: guaranteeing well-connected communities"). 

50 """ 

51 from graspologic_native import leiden 

52 

53 edges: list[tuple[str, str, float]] = [ 

54 (row["source"], row["target"], max(_MIN_LEIDEN_WEIGHT, row["weight"])) for row in edge_rows 

55 ] 

56 _modularity, partition = leiden(edges=edges) # type: ignore[call-arg] 

57 

58 degree_map: dict[str, int] = Counter() 

59 for row in edge_rows: 

60 degree_map[row["source"]] += 1 

61 degree_map[row["target"]] += 1 

62 return partition, dict(degree_map)