Coverage for src / lilbee / data / ingest / extract.py: 100%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Document extraction: kreuzberg config, OCR fallback, markdown/document chunking.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import logging 

7from collections.abc import Callable, Sequence 

8from pathlib import Path 

9from typing import TYPE_CHECKING, Any 

10 

11if TYPE_CHECKING: 

12 from kreuzberg import ExtractionConfig, ExtractionResult 

13 

14from lilbee.app.services import get_services 

15from lilbee.core.config import cfg 

16from lilbee.data.chunk import build_chunking_config, chunk_text 

17from lilbee.data.ingest.types import ( 

18 MARKDOWN_OUTPUT, 

19 MIN_MEANINGFUL_CHARS, 

20 PDF_CONTENT_TYPE, 

21 TESSERACT_BACKEND, 

22 ChunkRecord, 

23 ExtractMode, 

24) 

25from lilbee.data.store import CHUNK_TYPE_RAW 

26from lilbee.runtime.cpu import cpu_quota 

27from lilbee.runtime.progress import ( 

28 DetailedProgressCallback, 

29 EventType, 

30 ExtractEvent, 

31 noop_callback, 

32) 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def _has_meaningful_text(result: Any) -> bool: 

38 """Check if extraction produced meaningful text.""" 

39 chunks = getattr(result, "chunks", None) 

40 if chunks: 

41 total = sum(len(c.content.strip()) for c in chunks) 

42 return total > MIN_MEANINGFUL_CHARS 

43 return False 

44 

45 

46def content_type_to_mode(content_type: str) -> ExtractMode: 

47 """Map a content_type to the extraction mode.""" 

48 return ExtractMode.PAGINATED if content_type == PDF_CONTENT_TYPE else ExtractMode.MARKDOWN 

49 

50 

51def extraction_config(mode: ExtractMode) -> ExtractionConfig: 

52 """Build ExtractionConfig for the given extraction mode.""" 

53 from kreuzberg import ConcurrencyConfig, ExtractionConfig, OcrConfig, PageConfig 

54 

55 chunking = build_chunking_config() 

56 pages = PageConfig(extract_pages=True, insert_page_markers=False) 

57 ocr = OcrConfig(backend=TESSERACT_BACKEND) 

58 # Bound kreuzberg's internal pool to the same CPU budget as the 

59 # pipeline semaphore so the two stop competing for cores. 

60 concurrency = ConcurrencyConfig(max_threads=cpu_quota()) 

61 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = { 

62 ExtractMode.MARKDOWN: lambda: ExtractionConfig( 

63 chunking=chunking, 

64 output_format=MARKDOWN_OUTPUT, 

65 concurrency=concurrency, 

66 ), 

67 ExtractMode.PAGINATED: lambda: ExtractionConfig( 

68 chunking=chunking, 

69 pages=pages, 

70 concurrency=concurrency, 

71 ), 

72 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig( 

73 chunking=chunking, 

74 pages=pages, 

75 ocr=ocr, 

76 concurrency=concurrency, 

77 ), 

78 } 

79 return builders[mode]() 

80 

81 

82def _should_run_ocr() -> bool: 

83 """Decide whether to attempt vision-based OCR on scanned PDFs. 

84 

85 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``: 

86 True = force on (requires ``cfg.vision_model`` to be set for a real 

87 vision run; otherwise the caller falls back to Tesseract). 

88 False = force off. 

89 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set. 

90 """ 

91 if cfg.enable_ocr is True: 

92 return True 

93 if cfg.enable_ocr is False: 

94 return False 

95 return bool(cfg.vision_model) 

96 

97 

98async def _vision_ocr_fallback( 

99 path: Path, 

100 source_name: str, 

101 content_type: str, 

102 *, 

103 on_progress: DetailedProgressCallback, 

104 quiet: bool, 

105) -> list[ChunkRecord]: 

106 """Vision OCR via the persistent worker pool, chunk + embed the pages. 

107 

108 Pool routing is what amortises the multi-second vision-Llama load 

109 cost across PDFs. Tesseract has no shared model state and is run 

110 inline by ``_tesseract_ocr_fallback``; this helper is vision-only. 

111 """ 

112 try: 

113 page_texts = await asyncio.to_thread( 

114 get_services().provider.pdf_ocr, 

115 path, 

116 backend="vision", 

117 model=cfg.vision_model, 

118 per_page_timeout_s=cfg.ocr_timeout, 

119 quiet=quiet, 

120 on_progress=on_progress, 

121 ) 

122 except Exception: 

123 log.warning("OCR via vision backend failed for %s.", source_name, exc_info=True) 

124 return [] 

125 return await _chunk_and_embed_pages(page_texts, source_name, content_type, on_progress) 

126 

127 

128def _run_tesseract_sync(path: Path) -> Any: 

129 """Run kreuzberg Tesseract OCR with the worker's stderr redirected to /dev/null. 

130 

131 Tesseract writes "Line cannot be recognized!!", "Image too small to 

132 scale!!", and "Detected N diacritics" directly to fd 2 from inside libc. 

133 Without the redirect those lines flood the TUI log file (1 000+ entries 

134 per scanned PDF) and can bleed into the TUI itself. We hold the 

135 suppression for just the duration of the extraction call so other 

136 threads' stderr writes still go through. 

137 """ 

138 from kreuzberg import extract_file_sync 

139 

140 from lilbee.providers.llama_cpp.log_dispatch import stderr_suppressed 

141 

142 with stderr_suppressed(): 

143 return extract_file_sync(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR)) 

144 

145 

146async def _tesseract_ocr_fallback( 

147 path: Path, 

148 source_name: str, 

149 content_type: str, 

150 *, 

151 on_progress: DetailedProgressCallback, 

152) -> list[ChunkRecord]: 

153 """Tesseract OCR via ``asyncio.to_thread`` (no model load = no pool). 

154 

155 ``cfg.tesseract_timeout`` caps the whole-document extract; 0 means 

156 unlimited. Failures (including timeout) log a warning and return an 

157 empty list so the caller can skip the file. 

158 """ 

159 coro = asyncio.to_thread(_run_tesseract_sync, path) 

160 try: 

161 if cfg.tesseract_timeout > 0: 

162 result = await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout) 

163 else: 

164 result = await coro 

165 except TimeoutError: 

166 log.warning( 

167 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.", 

168 cfg.tesseract_timeout, 

169 source_name, 

170 ) 

171 return [] 

172 except Exception: 

173 log.warning("OCR via tesseract backend failed for %s.", source_name, exc_info=True) 

174 return [] 

175 

176 by_page: dict[int, list[str]] = {} 

177 for chunk in result.chunks or []: 

178 page = int(chunk.metadata.get("first_page") or 1) 

179 by_page.setdefault(page, []).append(chunk.content) 

180 page_texts = [(page, "\n".join(by_page[page])) for page in sorted(by_page)] 

181 return await _chunk_and_embed_pages(page_texts, source_name, content_type, on_progress) 

182 

183 

184async def _chunk_and_embed_pages( 

185 page_texts: Sequence[tuple[int, str]], 

186 source_name: str, 

187 content_type: str, 

188 on_progress: DetailedProgressCallback, 

189) -> list[ChunkRecord]: 

190 """Chunk OCR per-page text and embed every chunk through the embedder.""" 

191 if not page_texts: 

192 return [] 

193 

194 # Single OCR page rarely spans multiple topics; skip the semantic round-trip. 

195 all_chunks = [ 

196 (page_num, chunk) 

197 for page_num, text in page_texts 

198 for chunk in chunk_text(text, use_semantic=False) 

199 ] 

200 if not all_chunks: 

201 return [] 

202 texts = [c for _, c in all_chunks] 

203 vectors = await asyncio.to_thread( 

204 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

205 ) 

206 return [ 

207 ChunkRecord( 

208 source=source_name, 

209 content_type=content_type, 

210 chunk_type=CHUNK_TYPE_RAW, 

211 page_start=page_num, 

212 page_end=page_num, 

213 line_start=0, 

214 line_end=0, 

215 chunk=text, 

216 chunk_index=i, 

217 vector=vec, 

218 ) 

219 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True)) 

220 ] 

221 

222 

223async def _handle_scanned_pdf_fallback( 

224 path: Path, 

225 source_name: str, 

226 content_type: str, 

227 result: ExtractionResult, 

228 *, 

229 quiet: bool, 

230 on_progress: DetailedProgressCallback, 

231) -> list[ChunkRecord]: 

232 """Route a scanned PDF through the configured OCR backend. 

233 

234 Vision OCR (pool-routed) runs when ``_should_run_ocr()`` is True 

235 and a vision model is configured; otherwise the file falls back to 

236 inline Tesseract. Both paths return chunk records; an empty list 

237 means OCR found no usable text and the caller should skip the 

238 file. 

239 """ 

240 del result # Both backends re-extract; the kreuzberg result is not reused. 

241 use_ocr = _should_run_ocr() 

242 if use_ocr and cfg.vision_model: 

243 log.info( 

244 "Scanned PDF: using vision OCR for %s (model=%s)", 

245 source_name, 

246 cfg.vision_model, 

247 ) 

248 return await _vision_ocr_fallback( 

249 path, 

250 source_name, 

251 content_type, 

252 on_progress=on_progress, 

253 quiet=quiet, 

254 ) 

255 

256 log.info("Scanned PDF: falling back to Tesseract OCR for %s", source_name) 

257 chunks = await _tesseract_ocr_fallback( 

258 path, 

259 source_name, 

260 content_type, 

261 on_progress=on_progress, 

262 ) 

263 if not chunks: 

264 log.warning( 

265 "Skipped %s: text extraction produced no usable text. " 

266 "For better results on scanned PDFs, configure a vision model " 

267 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.", 

268 source_name, 

269 ) 

270 return chunks 

271 

272 

273async def ingest_document( 

274 path: Path, 

275 source_name: str, 

276 content_type: str, 

277 *, 

278 quiet: bool = False, 

279 on_progress: DetailedProgressCallback = noop_callback, 

280) -> list[ChunkRecord]: 

281 """Extract and chunk a document, embed, return records. 

282 

283 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``). 

284 """ 

285 from kreuzberg import extract_file_sync 

286 

287 config = extraction_config(content_type_to_mode(content_type)) 

288 result = await asyncio.to_thread(extract_file_sync, str(path), config=config) 

289 

290 if content_type == PDF_CONTENT_TYPE and not _has_meaningful_text(result): 

291 return await _handle_scanned_pdf_fallback( 

292 path, 

293 source_name, 

294 content_type, 

295 result, 

296 quiet=quiet, 

297 on_progress=on_progress, 

298 ) 

299 

300 if not result.chunks: 

301 return [] 

302 

303 # Fire one EXTRACT event per file so subscribers (chat /add, /sync, 

304 # CLI Rich progress) can show "extracted N pages" before the embed 

305 # phase starts; otherwise a 44MB PDF sits at file-level 0% for 

306 # minutes. get_page_count is the canonical PDF page count; for 

307 # non-paginated formats we fall back to the chunk count. 

308 page_count = result.get_page_count() or len(result.chunks) 

309 on_progress( 

310 EventType.EXTRACT, 

311 ExtractEvent(file=source_name, page=page_count, total_pages=page_count), 

312 ) 

313 

314 texts = [chunk.content for chunk in result.chunks] 

315 vectors = await asyncio.to_thread( 

316 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

317 ) 

318 

319 return [ 

320 ChunkRecord( 

321 source=source_name, 

322 content_type=content_type, 

323 chunk_type=CHUNK_TYPE_RAW, 

324 page_start=chunk.metadata.get("first_page") or 0, 

325 page_end=chunk.metadata.get("last_page") or 0, 

326 line_start=0, 

327 line_end=0, 

328 chunk=text, 

329 chunk_index=chunk.metadata.get("chunk_index", idx), 

330 vector=vec, 

331 ) 

332 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True)) 

333 ] 

334 

335 

336async def ingest_markdown( 

337 path: Path, 

338 source_name: str, 

339 on_progress: DetailedProgressCallback = noop_callback, 

340) -> list[ChunkRecord]: 

341 """Chunk a markdown file with heading context prepended to each chunk. 

342 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install") 

343 prepended for better retrieval context. 

344 """ 

345 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace") 

346 if not raw_text.strip(): 

347 return [] 

348 

349 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True) 

350 if not texts: 

351 return [] 

352 

353 vectors = await asyncio.to_thread( 

354 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

355 ) 

356 return [ 

357 ChunkRecord( 

358 source=source_name, 

359 content_type="text", 

360 chunk_type=CHUNK_TYPE_RAW, 

361 page_start=0, 

362 page_end=0, 

363 line_start=0, 

364 line_end=0, 

365 chunk=t, 

366 chunk_index=idx, 

367 vector=vec, 

368 ) 

369 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True)) 

370 ]