Coverage for src / lilbee / wiki / synthesis.py: 100%
131 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Cross-source synthesis pages and per-source batched generation.
3Two related orchestrators live here:
5- ``generate_synthesis_page`` and friends produce a single
6 cross-source page from a concept cluster spanning 3+ documents.
7- ``generate_source_batch`` issues one LLM call per source that
8 emits sections for every pre-extracted entity plus 3-5 LLM-curated
9 concepts; the response is split into per-section bodies and each
10 section is finalized via :func:`finalize_section`.
12The shared output-parsing helpers (``_split_batched_output``,
13``_prefix_heading``, ``match_label``) cover both paths.
14"""
16from __future__ import annotations
18import functools
19import logging
20import re
21from pathlib import Path
23import yaml
25from lilbee.core.config import Config
26from lilbee.core.text import clean_label_for_display, make_slug
27from lilbee.data.store import CitationRecord, SearchChunk, Store
28from lilbee.providers.base import LLMProvider
29from lilbee.retrieval.reasoning import strip_reasoning
30from lilbee.wiki.batch import (
31 finalize_section,
32 hash_existing_sources,
33 match_label,
34)
35from lilbee.wiki.citation import ParsedCitation, parse_wiki_citations
36from lilbee.wiki.citations import resolve_multi_source_citations
37from lilbee.wiki.entity_extractor import EntityKind, ExtractedEntity
38from lilbee.wiki.page import (
39 build_wiki_messages,
40 chunks_to_text,
41 generate_page,
42 truncate_chunks_to_budget,
43)
44from lilbee.wiki.persistence import write_pending_marker
45from lilbee.wiki.shared import (
46 PENDING_MARKER_KEYWORD_PARSE,
47 PendingKind,
48 WikiSubdir,
49)
51log = logging.getLogger(__name__)
53# Regex that matches section headers the batch parser recognizes:
54# H1 (``# Name``), H2 (``## Name``), or a bold-line heading
55# (``**Name**``) at line start. The name capture is anchored to the
56# rest of the line (stripped of trailing whitespace) so labels like
57# ``## Brake System (hydraulic)`` still parse.
58_SECTION_HEADER_RE = re.compile(
59 r"^(?:(?:##?)\s+(?P<hashname>[^\n]+)|\*\*(?P<boldname>[^\*\n]+)\*\*)\s*$",
60 re.MULTILINE,
61)
63_PENDING_PARSE_MARKER_PREFIX = f"<!-- {PENDING_MARKER_KEYWORD_PARSE}"
66def generate_synthesis_page(
67 topic: str,
68 source_names: list[str],
69 chunks_by_source: dict[str, list[SearchChunk]],
70 provider: LLMProvider,
71 store: Store,
72 config: Config,
73) -> Path | None:
74 """Generate a single synthesis page for a concept cluster.
75 Returns the path to the generated page, or None on failure.
76 """
77 # circular: ingest tooling for source hashing lives outside this module.
78 from lilbee.data.ingest import file_hash
80 all_chunks = [c for cs in chunks_by_source.values() for c in cs]
81 if not all_chunks:
82 log.warning("No chunks for synthesis topic %r, skipping", topic)
83 return None
85 all_chunks = truncate_chunks_to_budget(all_chunks, config)
86 chunks_text = chunks_to_text(all_chunks)
87 source_list = "\n".join(f"- {name}" for name in sorted(source_names))
88 template = config.wiki_synthesis_prompt
89 display_topic = clean_label_for_display(topic)
90 prompt = template.format(topic=display_topic, source_list=source_list, chunks_text=chunks_text)
91 slug = make_slug(topic)
93 source_hashes: dict[str, str] = {}
94 for name in source_names:
95 source_path = config.documents_dir / name
96 if source_path.exists():
97 source_hashes[name] = file_hash(source_path)
99 def resolver(parsed: list[ParsedCitation]) -> list[CitationRecord]:
100 return resolve_multi_source_citations(parsed, source_names, source_hashes, chunks_by_source)
102 return generate_page(
103 label=topic,
104 prompt=prompt,
105 chunks=all_chunks,
106 citation_resolver=resolver,
107 page_type=WikiSubdir.SYNTHESIS,
108 slug=slug,
109 source_names=source_names,
110 provider=provider,
111 store=store,
112 config=config,
113 )
116def _split_batched_output(
117 text: str,
118 expected_entity_labels: set[str],
119 expected_concept_labels: set[str] | None = None,
120) -> dict[str, tuple[EntityKind, str]]:
121 """Best-effort parse of the batched LLM response into per-label bodies.
123 Splits on H1/H2/bold-line headers, then matches each header
124 against the expected entity and concept label sets via
125 case-insensitive substring. Known labels are tagged with the
126 right ``EntityKind``; unknown headers are dropped. Labels whose
127 section could not be recovered at all are surfaced to the caller
128 (they show up as *missing from the return dict* rather than a
129 separate list: caller loops over the expected sets to write
130 PENDING markers).
131 """
132 concepts = expected_concept_labels or set()
133 recovered: dict[str, tuple[EntityKind, str]] = {}
134 matches = list(_SECTION_HEADER_RE.finditer(text))
135 if not matches:
136 return recovered
137 for i, match in enumerate(matches):
138 name = match.group("hashname") or match.group("boldname") or ""
139 name = name.strip()
140 start = match.end()
141 end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
142 body = text[start:end].strip()
143 if not body:
144 continue
145 lowered = name.lower()
146 kind_label = match_label(lowered, expected_entity_labels, EntityKind.ENTITY)
147 if kind_label is None:
148 kind_label = match_label(lowered, concepts, EntityKind.CONCEPT)
149 if kind_label is None:
150 # Concept labels come from the LLM itself: tag any
151 # unmatched section as CONCEPT only when the caller is
152 # expecting concept curation; otherwise drop it as
153 # noise.
154 if concepts is not None and expected_concept_labels is not None:
155 recovered.setdefault(name, (EntityKind.CONCEPT, _prefix_heading(name, body)))
156 continue
157 kind, label = kind_label
158 recovered[label] = (kind, _prefix_heading(name, body))
159 return recovered
162def _prefix_heading(name: str, body: str) -> str:
163 """Ensure the extracted body starts with a ``# Name`` H1.
165 The batched prompt instructs the model to emit ``## Name`` per
166 section. After splitting, the per-section body has lost its
167 header. Rebuild an H1 so the B3 title/body coherence gate still
168 has a heading to match.
169 """
170 stripped = body.lstrip()
171 if stripped.startswith("# "):
172 return body
173 return f"# {name}\n\n{body}"
176def _build_batch_prompt(
177 source: str,
178 entities: list[ExtractedEntity],
179 chunks_text: str,
180 extract_concepts: bool,
181 config: Config,
182) -> str:
183 """Render :attr:`Config.wiki_entity_batch_prompt` for one source call.
185 ``extract_concepts`` controls whether the concept-curation
186 paragraph is injected: True adds a "identify 3-5 concepts" block;
187 False leaves ``{concept_instruction}`` empty so the LLM writes
188 entity sections only. Keeps the per-source batched call the
189 single entry point whether or not concepts are requested.
190 """
191 entity_labels = ", ".join(clean_label_for_display(e.label) for e in entities) or "(none)"
192 if extract_concepts:
193 concept_instruction = (
194 "First, identify 3-5 CONCEPTS: abstract topics or domain terms "
195 "from the source that deserve a standalone wiki page. Do NOT include "
196 "pronouns, articles, or generic nouns.\n\n"
197 "Then write a wiki section for each of the concepts you identified, "
198 "PLUS one section for each NER ENTITY listed below.\n\n"
199 )
200 else:
201 concept_instruction = ""
202 return config.wiki_entity_batch_prompt.format(
203 source=source,
204 entity_list=entity_labels,
205 chunks_text=chunks_text,
206 concept_instruction=concept_instruction,
207 )
210def group_entities_by_primary_source(
211 entities: list[ExtractedEntity],
212) -> dict[str, list[ExtractedEntity]]:
213 """Group entities under the source that mentions them most.
215 Primary source = source with the highest chunk-ref count;
216 lexicographic tiebreak. An entity with no refs is dropped
217 silently (defensive: extractor always attaches refs, but a
218 future extractor might not).
219 """
220 grouped: dict[str, list[ExtractedEntity]] = {}
221 for entity in entities:
222 if not entity.chunk_refs:
223 continue
224 counts: dict[str, int] = {}
225 for ref in entity.chunk_refs:
226 counts[ref.source] = counts.get(ref.source, 0) + 1
227 primary = min(counts.items(), key=lambda kv: (-kv[1], kv[0]))[0]
228 grouped.setdefault(primary, []).append(entity)
229 return grouped
232def generate_source_batch(
233 source: str,
234 entities: list[ExtractedEntity],
235 chunks: list[SearchChunk],
236 provider: LLMProvider,
237 store: Store,
238 config: Config,
239 *,
240 extract_concepts: bool,
241 written_concept_slugs: dict[str, str],
242) -> list[Path]:
243 """Issue one LLM call for *source* and finalize every recovered section.
245 Returns the list of page paths written (entities + concepts
246 combined). Labels not recovered by the parser become PENDING
247 markers under ``wiki/drafts/`` so the next build can retry.
248 Concept slugs already written by an earlier source produce a
249 PENDING-COLLISION marker on the losing side (see
250 :func:`_handle_concept_write`).
252 ``written_concept_slugs`` is the per-build ledger of
253 slug → first_source. Callers share one dict across the per-source
254 loop. The second source to propose a slug is the one that gets
255 diverted to a collision marker.
256 """
257 if not chunks:
258 return []
259 budgeted = truncate_chunks_to_budget(chunks, config)
260 chunks_text = chunks_to_text(budgeted)
261 prompt = _build_batch_prompt(source, entities, chunks_text, extract_concepts, config)
262 messages = build_wiki_messages(prompt, provider, config)
263 options = config.generation_options(
264 temperature=config.wiki_temperature,
265 max_tokens=config.wiki_summary_max_tokens,
266 )
267 try:
268 response = provider.chat(messages, stream=False, options=options)
269 text = strip_reasoning(response).strip()
270 except Exception as exc:
271 log.warning("Batched LLM call failed for source %s: %s", source, exc)
272 return []
274 if not text:
275 log.warning("Batched LLM call returned empty response for source %s", source)
276 return []
278 expected_entity_labels = {e.label for e in entities}
279 expected_concepts: set[str] | None = set() if extract_concepts else None
280 parsed = _split_batched_output(text, expected_entity_labels, expected_concepts)
282 wiki_root = config.data_root / config.wiki_dir
283 drafts_dir = wiki_root / WikiSubdir.DRAFTS
284 source_names = [source]
285 source_hashes = hash_existing_sources(source_names, config.documents_dir)
286 chunks_by_source = {source: budgeted}
288 # Citation definitions live in the trailing block of the WHOLE
289 # response, not inside any one section body. Parse once over the
290 # full text and replay the same list for every section, so each
291 # page sees its own citations even when only the last section
292 # carries the definition trailer.
293 shared_parsed_citations = parse_wiki_citations(text)
295 pages: list[Path] = []
296 seen_labels: set[str] = set()
297 for header_label, (kind, body) in parsed.items():
298 seen_labels.add(header_label)
299 resolver = functools.partial(
300 resolve_multi_source_citations,
301 source_names=source_names,
302 source_hashes=source_hashes,
303 chunks_by_source=chunks_by_source,
304 )
305 page = finalize_section(
306 header_label=header_label,
307 kind=kind,
308 body=body,
309 chunks=budgeted,
310 citation_resolver=resolver,
311 source_names=source_names,
312 store=store,
313 config=config,
314 source=source,
315 written_concept_slugs=written_concept_slugs,
316 drafts_dir=drafts_dir,
317 shared_parsed_citations=shared_parsed_citations,
318 )
319 if page is not None:
320 pages.append(page)
322 for entity in entities:
323 if entity.label not in seen_labels:
324 marker = (
325 f"{_PENDING_PARSE_MARKER_PREFIX} for source {source}, "
326 f"entity/concept {entity.label} - "
327 "run wiki build again or manually accept via wiki drafts accept -->"
328 )
329 # Route through ``yaml.safe_dump`` so a label or source
330 # containing a colon, quote, or newline does not produce a
331 # frontmatter block that ``parse_frontmatter`` silently drops.
332 frontmatter_body = yaml.safe_dump(
333 {
334 "pending_source": source,
335 "pending_label": entity.label,
336 "pending_kind": PendingKind.PARSE.value,
337 },
338 sort_keys=False,
339 )
340 frontmatter = f"---\n{frontmatter_body}---\n"
341 path = write_pending_marker(drafts_dir, entity.slug, marker, frontmatter)
342 log.info("Wrote PENDING-PARSE marker for %s -> %s", entity.slug, path)
344 return pages