Coverage for src / lilbee / wiki / batch.py: 100%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Per-source batched-generation helpers and legacy concept-page archival. 

2 

3The batched build (one LLM call per source that emits sections for 

4every pre-extracted entity plus 3-5 LLM-curated concepts) lives here: 

5section-finalization, label matching, source hashing, and the page 

6splitter that turns the model's response into per-section bodies. 

7Also owns the one-time migration that archives legacy concept pages 

8(written before per-source batched generation) and unwraps stale 

9``[[archived-slug]]`` links. 

10""" 

11 

12from __future__ import annotations 

13 

14import hashlib 

15import logging 

16import re 

17from collections.abc import Callable 

18from datetime import UTC, datetime 

19from pathlib import Path 

20 

21from lilbee.core.config import Config 

22from lilbee.core.text import make_slug 

23from lilbee.data.ingest import file_hash 

24from lilbee.data.store import CitationRecord, SearchChunk, Store 

25from lilbee.wiki.citation import ( 

26 ParsedCitation, 

27 parse_wiki_citations, 

28 render_citation_block, 

29 strip_citation_block, 

30) 

31from lilbee.wiki.citations import verify_citations 

32from lilbee.wiki.entity_extractor import EntityKind 

33from lilbee.wiki.page import assemble_content, build_frontmatter 

34from lilbee.wiki.persistence import ( 

35 delete_pending_marker_if_present, 

36 divert_concept_collision, 

37 persist_and_finalize, 

38) 

39from lilbee.wiki.quality import check_faithfulness 

40from lilbee.wiki.shared import ( 

41 WIKI_CONTENT_SUBDIRS, 

42 PageTarget, 

43 WikiSubdir, 

44) 

45 

46log = logging.getLogger(__name__) 

47 

48# In-body ``[^keyN]`` footnote-marker pattern. Module-scope so the 

49# batched-generation hot path (`finalize_section`) does not recompile 

50# it on every recovered section. 

51_FOOTNOTE_MARKER_RE = re.compile(r"\[\^([a-zA-Z0-9_\-]+)\]") 

52 

53# Sentinel file for the one-time legacy-concepts archival. Lives under 

54# data_dir (NOT inside wiki/) so Obsidian sync and wiki tree-walkers 

55# never surface it. The on-disk filename is preserved across renames so 

56# upgrading installs do not re-run the migration. 

57_LEGACY_CONCEPTS_MIGRATED_SENTINEL = ".phase-d-migrated" 

58 

59# Legacy wiki concepts that we move to archive/ as part of the one-time 

60# migration. Matches wiki/<WikiSubdir.CONCEPTS>/*.md recursively. 

61_ARCHIVE_CONCEPTS_SUBPATH = Path(WikiSubdir.ARCHIVE) / WikiSubdir.CONCEPTS 

62 

63 

64def hash_existing_sources(source_names: list[str], documents_dir: Path) -> dict[str, str]: 

65 """Hash each source file that still exists on disk (used for citation staleness).""" 

66 out: dict[str, str] = {} 

67 for name in source_names: 

68 source_path = documents_dir / name 

69 if source_path.exists(): 

70 out[name] = file_hash(source_path) 

71 return out 

72 

73 

74def match_label( 

75 lowered_name: str, 

76 expected: set[str], 

77 kind: EntityKind, 

78) -> tuple[EntityKind, str] | None: 

79 """Case-insensitive substring match of *lowered_name* against *expected*. 

80 

81 Returns ``(kind, original_label)`` on hit, ``None`` otherwise. 

82 A substring match (not equality) accommodates the LLM adding 

83 qualifiers ("Brake System (hydraulic)" vs "brake system"). 

84 """ 

85 for label in expected: 

86 low = label.lower() 

87 if low and (low in lowered_name or lowered_name in low): 

88 return (kind, label) 

89 return None 

90 

91 

92def chunks_for_source(chunks: list[SearchChunk], source: str) -> list[SearchChunk]: 

93 """Return the subset of *chunks* whose ``source`` matches, preserving order.""" 

94 return [c for c in chunks if c.source == source] 

95 

96 

97def short_source_hash(source: str) -> str: 

98 """8-char sha256 digest of *source* (stable collision-marker suffix).""" 

99 return hashlib.sha256(source.encode("utf-8")).hexdigest()[:8] 

100 

101 

102def _group_chunks_by_page( 

103 chunks: list[SearchChunk], 

104) -> list[tuple[int, list[SearchChunk]]]: 

105 """Group chunks by ``page_start``, preserving in-document order within a page. 

106 

107 Returns ``(page_start, chunks)`` tuples sorted ascending by page number. 

108 Chunks with ``page_start=0`` (non-paginated sources) collapse to a single 

109 entry keyed at 0, so a markdown or code source still emits exactly one 

110 summary file until structure detection arrives in a later stage. 

111 """ 

112 grouped: dict[int, list[SearchChunk]] = {} 

113 for chunk in chunks: 

114 grouped.setdefault(chunk.page_start, []).append(chunk) 

115 return sorted(grouped.items()) 

116 

117 

118def archive_legacy_concept_pages(wiki_root: Path, data_dir: Path) -> None: 

119 """One-time migration: archive legacy concept pages. 

120 

121 Runs idempotently, gated by ``{data_dir}/.phase-d-migrated``: 

122 

123 1. Move every ``wiki/concepts/*.md`` to ``wiki/archive/concepts/`` 

124 preserving relative subpaths. Older concept pages stay 

125 readable but drop out of the active wiki browse surface. 

126 2. Unwrap stale ``[[archived-slug]]`` references across the 

127 remaining pages so a reader clicking a link does not hit a 

128 404. Archived slugs become plain text. 

129 3. Write the sentinel so future builds skip this path. 

130 

131 Freshly LLM-curated concept pages written AFTER the sentinel exists 

132 are never touched. 

133 """ 

134 sentinel = data_dir / _LEGACY_CONCEPTS_MIGRATED_SENTINEL 

135 if sentinel.exists(): 

136 return 

137 concepts_dir = wiki_root / WikiSubdir.CONCEPTS 

138 archive_dir = wiki_root / _ARCHIVE_CONCEPTS_SUBPATH 

139 archived_slugs: list[str] = [] 

140 if concepts_dir.is_dir(): 

141 for src in sorted(concepts_dir.rglob("*.md")): 

142 rel = src.relative_to(concepts_dir) 

143 dest = archive_dir / rel 

144 dest.parent.mkdir(parents=True, exist_ok=True) 

145 src.replace(dest) 

146 archived_slugs.append(str(rel.with_suffix("")).replace("\\", "/")) 

147 

148 if archived_slugs: 

149 _unwrap_archived_links(wiki_root, archived_slugs) 

150 

151 data_dir.mkdir(parents=True, exist_ok=True) 

152 sentinel.write_text(datetime.now(UTC).isoformat(), encoding="utf-8") 

153 if archived_slugs: 

154 log.info( 

155 "Legacy-concepts migration: archived %d concept pages, sentinel written at %s", 

156 len(archived_slugs), 

157 sentinel, 

158 ) 

159 

160 

161def _unwrap_archived_links(wiki_root: Path, archived_slugs: list[str]) -> None: 

162 """Rewrite ``[[slug]]`` → ``slug`` (plain text) across remaining wiki pages. 

163 

164 The existing ``_rewrite_links_across_wiki`` path is the wrong 

165 tool here: it compiles an *additive* surface map, not a 

166 removal pass. Walk the active wiki content subdirs once per 

167 archived slug is acceptable because the archive count is 

168 bounded (concepts that existed pre-migration). Pages whose body 

169 did not change are not rewritten. 

170 """ 

171 if not archived_slugs: 

172 return 

173 patterns = [(re.compile(r"\[\[" + re.escape(slug) + r"\]\]"), slug) for slug in archived_slugs] 

174 for subdir in WIKI_CONTENT_SUBDIRS: 

175 subdir_path = wiki_root / subdir 

176 if not subdir_path.is_dir(): 

177 continue 

178 for md_path in subdir_path.rglob("*.md"): 

179 original = md_path.read_text(encoding="utf-8") 

180 rewritten = original 

181 for pattern, replacement in patterns: 

182 rewritten = pattern.sub(replacement, rewritten) 

183 if rewritten != original: 

184 md_path.write_text(rewritten, encoding="utf-8") 

185 

186 

187def finalize_section( 

188 *, 

189 header_label: str, 

190 kind: EntityKind, 

191 body: str, 

192 chunks: list[SearchChunk], 

193 citation_resolver: Callable[[list[ParsedCitation]], list[CitationRecord]], 

194 source_names: list[str], 

195 store: Store, 

196 config: Config, 

197 source: str, 

198 written_concept_slugs: dict[str, str], 

199 drafts_dir: Path, 

200 shared_parsed_citations: list[ParsedCitation], 

201) -> Path | None: 

202 """Citation-check, faithfulness-check, write one batched section. 

203 

204 Shared by entity and concept sections from the per-source batched 

205 call. Returns the written page path, or ``None`` if the section 

206 failed any gate (no citations, empty body, slug collision marker 

207 handled via side channel). ``shared_parsed_citations`` is the 

208 definition list parsed once over the whole response: every 

209 section replays it so pages other than the last one still have 

210 their footnotes resolved. 

211 """ 

212 slug = make_slug(header_label) 

213 if not slug: 

214 log.info("Empty slug for batched section %r; skipping", header_label) 

215 return None 

216 

217 # Only replay citation keys that this section actually references 

218 # in the body; otherwise every section would claim every citation. 

219 section_keys = {ref.citation_key for ref in parse_wiki_citations(body)} 

220 # Fall back to in-body ``[^keyN]`` references when no definitions 

221 # live inside the section: count occurrences of the footnote 

222 # marker against the shared definition set. 

223 section_keys.update(_FOOTNOTE_MARKER_RE.findall(body)) 

224 relevant = [c for c in shared_parsed_citations if c.citation_key in section_keys] 

225 verified = verify_citations(citation_resolver(relevant), chunks, header_label, config) 

226 if not verified: 

227 log.info("No valid citations for batched section %s, skipping", header_label) 

228 return None 

229 

230 score = check_faithfulness(chunks, body, header_label, config) 

231 threshold = config.wiki_embedding_faithfulness_threshold 

232 page_type = WikiSubdir.CONCEPTS if kind is EntityKind.CONCEPT else WikiSubdir.ENTITIES 

233 subdir = page_type if score >= threshold else WikiSubdir.DRAFTS 

234 if subdir == WikiSubdir.DRAFTS: 

235 log.info( 

236 "Batched section %s scored %.2f (< %.2f), sending to drafts", 

237 header_label, 

238 score, 

239 threshold, 

240 ) 

241 

242 clean_body = strip_citation_block(body) 

243 frontmatter = build_frontmatter(config, source_names, score, chunks=chunks) 

244 citation_block = render_citation_block(verified) 

245 full_content = assemble_content(frontmatter, clean_body, citation_block) 

246 

247 # Concept collision: the second source proposing a slug loses 

248 # and writes to a drafts collision marker; the winning source's 

249 # page stays untouched. 

250 if kind is EntityKind.CONCEPT and subdir == WikiSubdir.CONCEPTS: 

251 first_source = written_concept_slugs.get(slug) 

252 if first_source is not None and first_source != source: 

253 return divert_concept_collision( 

254 slug=slug, 

255 source=source, 

256 first_source=first_source, 

257 content=full_content, 

258 drafts_dir=drafts_dir, 

259 ) 

260 written_concept_slugs.setdefault(slug, source) 

261 

262 # Successful regen of a previously-PENDING slug: remove the old 

263 # marker so the drafts surface no longer lists it. 

264 delete_pending_marker_if_present(drafts_dir, slug) 

265 

266 wiki_root = config.data_root / config.wiki_dir 

267 target = PageTarget( 

268 wiki_root=wiki_root, 

269 subdir=subdir, 

270 slug=slug, 

271 wiki_source=f"{config.wiki_dir}/{subdir}/{slug}.md", 

272 page_type=page_type, 

273 label=header_label, 

274 ) 

275 page_path = persist_and_finalize(full_content, target, verified, source_names, store, config) 

276 log.info( 

277 "Generated batched page for %s -> %s (score=%.2f, citations=%d)", 

278 header_label, 

279 target.subdir, 

280 score, 

281 len(verified), 

282 ) 

283 return page_path