Coverage for src / lilbee / wiki / synthesis.py: 100%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Cross-source synthesis pages and per-source batched generation. 

2 

3Two related orchestrators live here: 

4 

5- ``generate_synthesis_page`` and friends produce a single 

6 cross-source page from a concept cluster spanning 3+ documents. 

7- ``generate_source_batch`` issues one LLM call per source that 

8 emits sections for every pre-extracted entity plus 3-5 LLM-curated 

9 concepts; the response is split into per-section bodies and each 

10 section is finalized via :func:`finalize_section`. 

11 

12The shared output-parsing helpers (``_split_batched_output``, 

13``_prefix_heading``, ``match_label``) cover both paths. 

14""" 

15 

16from __future__ import annotations 

17 

18import functools 

19import logging 

20import re 

21from pathlib import Path 

22 

23import yaml 

24 

25from lilbee.core.config import Config 

26from lilbee.core.text import clean_label_for_display, make_slug 

27from lilbee.data.store import CitationRecord, SearchChunk, Store 

28from lilbee.providers.base import LLMProvider 

29from lilbee.retrieval.reasoning import strip_reasoning 

30from lilbee.wiki.batch import ( 

31 finalize_section, 

32 hash_existing_sources, 

33 match_label, 

34) 

35from lilbee.wiki.citation import ParsedCitation, parse_wiki_citations 

36from lilbee.wiki.citations import resolve_multi_source_citations 

37from lilbee.wiki.entity_extractor import EntityKind, ExtractedEntity 

38from lilbee.wiki.page import ( 

39 build_wiki_messages, 

40 chunks_to_text, 

41 generate_page, 

42 truncate_chunks_to_budget, 

43) 

44from lilbee.wiki.persistence import write_pending_marker 

45from lilbee.wiki.shared import ( 

46 PENDING_MARKER_KEYWORD_PARSE, 

47 PendingKind, 

48 WikiSubdir, 

49) 

50 

51log = logging.getLogger(__name__) 

52 

53# Regex that matches section headers the batch parser recognizes: 

54# H1 (``# Name``), H2 (``## Name``), or a bold-line heading 

55# (``**Name**``) at line start. The name capture is anchored to the 

56# rest of the line (stripped of trailing whitespace) so labels like 

57# ``## Brake System (hydraulic)`` still parse. 

58_SECTION_HEADER_RE = re.compile( 

59 r"^(?:(?:##?)\s+(?P<hashname>[^\n]+)|\*\*(?P<boldname>[^\*\n]+)\*\*)\s*$", 

60 re.MULTILINE, 

61) 

62 

63_PENDING_PARSE_MARKER_PREFIX = f"<!-- {PENDING_MARKER_KEYWORD_PARSE}" 

64 

65 

66def generate_synthesis_page( 

67 topic: str, 

68 source_names: list[str], 

69 chunks_by_source: dict[str, list[SearchChunk]], 

70 provider: LLMProvider, 

71 store: Store, 

72 config: Config, 

73) -> Path | None: 

74 """Generate a single synthesis page for a concept cluster. 

75 Returns the path to the generated page, or None on failure. 

76 """ 

77 # circular: ingest tooling for source hashing lives outside this module. 

78 from lilbee.data.ingest import file_hash 

79 

80 all_chunks = [c for cs in chunks_by_source.values() for c in cs] 

81 if not all_chunks: 

82 log.warning("No chunks for synthesis topic %r, skipping", topic) 

83 return None 

84 

85 all_chunks = truncate_chunks_to_budget(all_chunks, config) 

86 chunks_text = chunks_to_text(all_chunks) 

87 source_list = "\n".join(f"- {name}" for name in sorted(source_names)) 

88 template = config.wiki_synthesis_prompt 

89 display_topic = clean_label_for_display(topic) 

90 prompt = template.format(topic=display_topic, source_list=source_list, chunks_text=chunks_text) 

91 slug = make_slug(topic) 

92 

93 source_hashes: dict[str, str] = {} 

94 for name in source_names: 

95 source_path = config.documents_dir / name 

96 if source_path.exists(): 

97 source_hashes[name] = file_hash(source_path) 

98 

99 def resolver(parsed: list[ParsedCitation]) -> list[CitationRecord]: 

100 return resolve_multi_source_citations(parsed, source_names, source_hashes, chunks_by_source) 

101 

102 return generate_page( 

103 label=topic, 

104 prompt=prompt, 

105 chunks=all_chunks, 

106 citation_resolver=resolver, 

107 page_type=WikiSubdir.SYNTHESIS, 

108 slug=slug, 

109 source_names=source_names, 

110 provider=provider, 

111 store=store, 

112 config=config, 

113 ) 

114 

115 

116def _split_batched_output( 

117 text: str, 

118 expected_entity_labels: set[str], 

119 expected_concept_labels: set[str] | None = None, 

120) -> dict[str, tuple[EntityKind, str]]: 

121 """Best-effort parse of the batched LLM response into per-label bodies. 

122 

123 Splits on H1/H2/bold-line headers, then matches each header 

124 against the expected entity and concept label sets via 

125 case-insensitive substring. Known labels are tagged with the 

126 right ``EntityKind``; unknown headers are dropped. Labels whose 

127 section could not be recovered at all are surfaced to the caller 

128 (they show up as *missing from the return dict* rather than a 

129 separate list: caller loops over the expected sets to write 

130 PENDING markers). 

131 """ 

132 concepts = expected_concept_labels or set() 

133 recovered: dict[str, tuple[EntityKind, str]] = {} 

134 matches = list(_SECTION_HEADER_RE.finditer(text)) 

135 if not matches: 

136 return recovered 

137 for i, match in enumerate(matches): 

138 name = match.group("hashname") or match.group("boldname") or "" 

139 name = name.strip() 

140 start = match.end() 

141 end = matches[i + 1].start() if i + 1 < len(matches) else len(text) 

142 body = text[start:end].strip() 

143 if not body: 

144 continue 

145 lowered = name.lower() 

146 kind_label = match_label(lowered, expected_entity_labels, EntityKind.ENTITY) 

147 if kind_label is None: 

148 kind_label = match_label(lowered, concepts, EntityKind.CONCEPT) 

149 if kind_label is None: 

150 # Concept labels come from the LLM itself: tag any 

151 # unmatched section as CONCEPT only when the caller is 

152 # expecting concept curation; otherwise drop it as 

153 # noise. 

154 if concepts is not None and expected_concept_labels is not None: 

155 recovered.setdefault(name, (EntityKind.CONCEPT, _prefix_heading(name, body))) 

156 continue 

157 kind, label = kind_label 

158 recovered[label] = (kind, _prefix_heading(name, body)) 

159 return recovered 

160 

161 

162def _prefix_heading(name: str, body: str) -> str: 

163 """Ensure the extracted body starts with a ``# Name`` H1. 

164 

165 The batched prompt instructs the model to emit ``## Name`` per 

166 section. After splitting, the per-section body has lost its 

167 header. Rebuild an H1 so the B3 title/body coherence gate still 

168 has a heading to match. 

169 """ 

170 stripped = body.lstrip() 

171 if stripped.startswith("# "): 

172 return body 

173 return f"# {name}\n\n{body}" 

174 

175 

176def _build_batch_prompt( 

177 source: str, 

178 entities: list[ExtractedEntity], 

179 chunks_text: str, 

180 extract_concepts: bool, 

181 config: Config, 

182) -> str: 

183 """Render :attr:`Config.wiki_entity_batch_prompt` for one source call. 

184 

185 ``extract_concepts`` controls whether the concept-curation 

186 paragraph is injected: True adds a "identify 3-5 concepts" block; 

187 False leaves ``{concept_instruction}`` empty so the LLM writes 

188 entity sections only. Keeps the per-source batched call the 

189 single entry point whether or not concepts are requested. 

190 """ 

191 entity_labels = ", ".join(clean_label_for_display(e.label) for e in entities) or "(none)" 

192 if extract_concepts: 

193 concept_instruction = ( 

194 "First, identify 3-5 CONCEPTS: abstract topics or domain terms " 

195 "from the source that deserve a standalone wiki page. Do NOT include " 

196 "pronouns, articles, or generic nouns.\n\n" 

197 "Then write a wiki section for each of the concepts you identified, " 

198 "PLUS one section for each NER ENTITY listed below.\n\n" 

199 ) 

200 else: 

201 concept_instruction = "" 

202 return config.wiki_entity_batch_prompt.format( 

203 source=source, 

204 entity_list=entity_labels, 

205 chunks_text=chunks_text, 

206 concept_instruction=concept_instruction, 

207 ) 

208 

209 

210def group_entities_by_primary_source( 

211 entities: list[ExtractedEntity], 

212) -> dict[str, list[ExtractedEntity]]: 

213 """Group entities under the source that mentions them most. 

214 

215 Primary source = source with the highest chunk-ref count; 

216 lexicographic tiebreak. An entity with no refs is dropped 

217 silently (defensive: extractor always attaches refs, but a 

218 future extractor might not). 

219 """ 

220 grouped: dict[str, list[ExtractedEntity]] = {} 

221 for entity in entities: 

222 if not entity.chunk_refs: 

223 continue 

224 counts: dict[str, int] = {} 

225 for ref in entity.chunk_refs: 

226 counts[ref.source] = counts.get(ref.source, 0) + 1 

227 primary = min(counts.items(), key=lambda kv: (-kv[1], kv[0]))[0] 

228 grouped.setdefault(primary, []).append(entity) 

229 return grouped 

230 

231 

232def generate_source_batch( 

233 source: str, 

234 entities: list[ExtractedEntity], 

235 chunks: list[SearchChunk], 

236 provider: LLMProvider, 

237 store: Store, 

238 config: Config, 

239 *, 

240 extract_concepts: bool, 

241 written_concept_slugs: dict[str, str], 

242) -> list[Path]: 

243 """Issue one LLM call for *source* and finalize every recovered section. 

244 

245 Returns the list of page paths written (entities + concepts 

246 combined). Labels not recovered by the parser become PENDING 

247 markers under ``wiki/drafts/`` so the next build can retry. 

248 Concept slugs already written by an earlier source produce a 

249 PENDING-COLLISION marker on the losing side (see 

250 :func:`_handle_concept_write`). 

251 

252 ``written_concept_slugs`` is the per-build ledger of 

253 slug → first_source. Callers share one dict across the per-source 

254 loop. The second source to propose a slug is the one that gets 

255 diverted to a collision marker. 

256 """ 

257 if not chunks: 

258 return [] 

259 budgeted = truncate_chunks_to_budget(chunks, config) 

260 chunks_text = chunks_to_text(budgeted) 

261 prompt = _build_batch_prompt(source, entities, chunks_text, extract_concepts, config) 

262 messages = build_wiki_messages(prompt, provider, config) 

263 options = config.generation_options( 

264 temperature=config.wiki_temperature, 

265 max_tokens=config.wiki_summary_max_tokens, 

266 ) 

267 try: 

268 response = provider.chat(messages, stream=False, options=options) 

269 text = strip_reasoning(response).strip() 

270 except Exception as exc: 

271 log.warning("Batched LLM call failed for source %s: %s", source, exc) 

272 return [] 

273 

274 if not text: 

275 log.warning("Batched LLM call returned empty response for source %s", source) 

276 return [] 

277 

278 expected_entity_labels = {e.label for e in entities} 

279 expected_concepts: set[str] | None = set() if extract_concepts else None 

280 parsed = _split_batched_output(text, expected_entity_labels, expected_concepts) 

281 

282 wiki_root = config.data_root / config.wiki_dir 

283 drafts_dir = wiki_root / WikiSubdir.DRAFTS 

284 source_names = [source] 

285 source_hashes = hash_existing_sources(source_names, config.documents_dir) 

286 chunks_by_source = {source: budgeted} 

287 

288 # Citation definitions live in the trailing block of the WHOLE 

289 # response, not inside any one section body. Parse once over the 

290 # full text and replay the same list for every section, so each 

291 # page sees its own citations even when only the last section 

292 # carries the definition trailer. 

293 shared_parsed_citations = parse_wiki_citations(text) 

294 

295 pages: list[Path] = [] 

296 seen_labels: set[str] = set() 

297 for header_label, (kind, body) in parsed.items(): 

298 seen_labels.add(header_label) 

299 resolver = functools.partial( 

300 resolve_multi_source_citations, 

301 source_names=source_names, 

302 source_hashes=source_hashes, 

303 chunks_by_source=chunks_by_source, 

304 ) 

305 page = finalize_section( 

306 header_label=header_label, 

307 kind=kind, 

308 body=body, 

309 chunks=budgeted, 

310 citation_resolver=resolver, 

311 source_names=source_names, 

312 store=store, 

313 config=config, 

314 source=source, 

315 written_concept_slugs=written_concept_slugs, 

316 drafts_dir=drafts_dir, 

317 shared_parsed_citations=shared_parsed_citations, 

318 ) 

319 if page is not None: 

320 pages.append(page) 

321 

322 for entity in entities: 

323 if entity.label not in seen_labels: 

324 marker = ( 

325 f"{_PENDING_PARSE_MARKER_PREFIX} for source {source}, " 

326 f"entity/concept {entity.label} - " 

327 "run wiki build again or manually accept via wiki drafts accept -->" 

328 ) 

329 # Route through ``yaml.safe_dump`` so a label or source 

330 # containing a colon, quote, or newline does not produce a 

331 # frontmatter block that ``parse_frontmatter`` silently drops. 

332 frontmatter_body = yaml.safe_dump( 

333 { 

334 "pending_source": source, 

335 "pending_label": entity.label, 

336 "pending_kind": PendingKind.PARSE.value, 

337 }, 

338 sort_keys=False, 

339 ) 

340 frontmatter = f"---\n{frontmatter_body}---\n" 

341 path = write_pending_marker(drafts_dir, entity.slug, marker, frontmatter) 

342 log.info("Wrote PENDING-PARSE marker for %s -> %s", entity.slug, path) 

343 

344 return pages