Coverage for src / lilbee / data / ingest / extract.py: 100%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Document extraction: kreuzberg config, OCR fallback, markdown/document chunking.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import logging 

7from collections.abc import Callable, Sequence 

8from pathlib import Path 

9from typing import TYPE_CHECKING, Any 

10 

11if TYPE_CHECKING: 

12 from kreuzberg import ExtractionConfig, ExtractionResult 

13 

14from lilbee.app.services import get_services 

15from lilbee.core.config import cfg 

16from lilbee.data.chunk import build_chunking_config, chunk_text 

17from lilbee.data.ingest.types import ( 

18 MARKDOWN_OUTPUT, 

19 MIN_MEANINGFUL_CHARS, 

20 PDF_CONTENT_TYPE, 

21 TESSERACT_BACKEND, 

22 ChunkRecord, 

23 ExtractMode, 

24) 

25from lilbee.data.store import ChunkType, PageTextRecord 

26from lilbee.runtime.cpu import cpu_quota 

27from lilbee.runtime.progress import ( 

28 DetailedProgressCallback, 

29 EventType, 

30 ExtractEvent, 

31 noop_callback, 

32) 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def _has_meaningful_text(result: ExtractionResult) -> bool: 

38 """True when extraction yielded real text; content is primary, chunks the fallback.""" 

39 if len(result.content.strip()) > MIN_MEANINGFUL_CHARS: 

40 return True 

41 return sum(len(c.content.strip()) for c in result.chunks or []) > MIN_MEANINGFUL_CHARS 

42 

43 

44def content_type_to_mode(content_type: str) -> ExtractMode: 

45 """Map a content_type to the extraction mode.""" 

46 return ExtractMode.PAGINATED if content_type == PDF_CONTENT_TYPE else ExtractMode.MARKDOWN 

47 

48 

49def _page_text_record(source: str, page: int, text: str, content_type: str) -> PageTextRecord: 

50 """Build one per-page text row for the export dataset.""" 

51 return PageTextRecord(source=source, page=page, text=text, content_type=content_type) 

52 

53 

54def extraction_config(mode: ExtractMode) -> ExtractionConfig: 

55 """Build ExtractionConfig for the given extraction mode.""" 

56 from kreuzberg import ConcurrencyConfig, ExtractionConfig, OcrConfig, PageConfig 

57 

58 chunking = build_chunking_config() 

59 pages = PageConfig(extract_pages=True, insert_page_markers=False) 

60 ocr = OcrConfig(backend=TESSERACT_BACKEND) 

61 # Bound kreuzberg's internal pool to the same CPU budget as the 

62 # pipeline semaphore so the two stop competing for cores. 

63 concurrency = ConcurrencyConfig(max_threads=cpu_quota()) 

64 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = { 

65 ExtractMode.MARKDOWN: lambda: ExtractionConfig( 

66 chunking=chunking, 

67 output_format=MARKDOWN_OUTPUT, 

68 concurrency=concurrency, 

69 ), 

70 ExtractMode.PAGINATED: lambda: ExtractionConfig( 

71 chunking=chunking, 

72 pages=pages, 

73 concurrency=concurrency, 

74 ), 

75 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig( 

76 chunking=chunking, 

77 pages=pages, 

78 ocr=ocr, 

79 concurrency=concurrency, 

80 ), 

81 } 

82 return builders[mode]() 

83 

84 

85def _should_run_ocr() -> bool: 

86 """Decide whether to attempt vision-based OCR on scanned PDFs. 

87 

88 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``: 

89 True = force on (requires ``cfg.vision_model`` to be set for a real 

90 vision run; otherwise the caller falls back to Tesseract). 

91 False = force off. 

92 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set. 

93 """ 

94 if cfg.enable_ocr is True: 

95 return True 

96 if cfg.enable_ocr is False: 

97 return False 

98 return bool(cfg.vision_model) 

99 

100 

101def _record_page_texts( 

102 page_texts: Sequence[tuple[int, str]], 

103 source_name: str, 

104 content_type: str, 

105 page_texts_out: list[PageTextRecord] | None, 

106) -> None: 

107 """Append OCR page texts to the export accumulator when one is supplied.""" 

108 if page_texts_out is None: 

109 return 

110 page_texts_out.extend( 

111 _page_text_record(source_name, page, text, content_type) for page, text in page_texts 

112 ) 

113 

114 

115async def _vision_ocr_fallback( 

116 path: Path, 

117 source_name: str, 

118 content_type: str, 

119 *, 

120 on_progress: DetailedProgressCallback, 

121 quiet: bool, 

122 page_texts_out: list[PageTextRecord] | None = None, 

123) -> list[ChunkRecord]: 

124 """Vision OCR via the persistent worker pool, chunk + embed the pages. 

125 

126 Pool routing is what amortises the multi-second vision-Llama load 

127 cost across PDFs. Tesseract has no shared model state and is run 

128 inline by ``_tesseract_ocr_fallback``; this helper is vision-only. 

129 """ 

130 try: 

131 page_texts = await asyncio.to_thread( 

132 get_services().provider.pdf_ocr, 

133 path, 

134 backend="vision", 

135 model=cfg.vision_model, 

136 per_page_timeout_s=cfg.ocr_timeout, 

137 quiet=quiet, 

138 on_progress=on_progress, 

139 ) 

140 except Exception: 

141 log.warning("OCR via vision backend failed for %s.", source_name, exc_info=True) 

142 return [] 

143 _record_page_texts(page_texts, source_name, content_type, page_texts_out) 

144 return await chunk_and_embed_pages(page_texts, source_name, content_type, on_progress) 

145 

146 

147def _run_tesseract_sync(path: Path) -> Any: 

148 """Run kreuzberg Tesseract OCR with the worker's stderr redirected to /dev/null. 

149 

150 Tesseract writes "Line cannot be recognized!!", "Image too small to 

151 scale!!", and "Detected N diacritics" directly to fd 2 from inside libc. 

152 Without the redirect those lines flood the TUI log file (1 000+ entries 

153 per scanned PDF) and can bleed into the TUI itself. We hold the 

154 suppression for just the duration of the extraction call so other 

155 threads' stderr writes still go through. 

156 """ 

157 from kreuzberg import extract_file_sync 

158 

159 from lilbee.providers.llama_cpp.log_dispatch import stderr_suppressed 

160 

161 with stderr_suppressed(): 

162 return extract_file_sync(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR)) 

163 

164 

165async def _tesseract_ocr_fallback( 

166 path: Path, 

167 source_name: str, 

168 content_type: str, 

169 *, 

170 on_progress: DetailedProgressCallback, 

171 page_texts_out: list[PageTextRecord] | None = None, 

172) -> list[ChunkRecord]: 

173 """Tesseract OCR via ``asyncio.to_thread`` (no model load = no pool). 

174 

175 ``cfg.tesseract_timeout`` caps the whole-document extract; 0 means 

176 unlimited. Failures (including timeout) log a warning and return an 

177 empty list so the caller can skip the file. 

178 """ 

179 coro = asyncio.to_thread(_run_tesseract_sync, path) 

180 try: 

181 if cfg.tesseract_timeout > 0: 

182 result = await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout) 

183 else: 

184 result = await coro 

185 except TimeoutError: 

186 log.warning( 

187 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.", 

188 cfg.tesseract_timeout, 

189 source_name, 

190 ) 

191 return [] 

192 except Exception: 

193 log.warning("OCR via tesseract backend failed for %s.", source_name, exc_info=True) 

194 return [] 

195 

196 by_page: dict[int, list[str]] = {} 

197 for chunk in result.chunks or []: 

198 page = int(chunk.metadata.get("first_page") or 1) 

199 by_page.setdefault(page, []).append(chunk.content) 

200 page_texts = [(page, "\n".join(by_page[page])) for page in sorted(by_page)] 

201 _record_page_texts(page_texts, source_name, content_type, page_texts_out) 

202 return await chunk_and_embed_pages(page_texts, source_name, content_type, on_progress) 

203 

204 

205async def chunk_and_embed_pages( 

206 page_texts: Sequence[tuple[int, str]], 

207 source_name: str, 

208 content_type: str, 

209 on_progress: DetailedProgressCallback, 

210) -> list[ChunkRecord]: 

211 """Chunk per-page text and embed every chunk. Shared by OCR ingest and import.""" 

212 if not page_texts: 

213 return [] 

214 

215 # Single OCR page rarely spans multiple topics; skip the semantic round-trip. 

216 all_chunks = [ 

217 (page_num, chunk) 

218 for page_num, text in page_texts 

219 for chunk in chunk_text(text, use_semantic=False) 

220 ] 

221 if not all_chunks: 

222 return [] 

223 texts = [c for _, c in all_chunks] 

224 vectors = await asyncio.to_thread( 

225 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

226 ) 

227 return [ 

228 ChunkRecord( 

229 source=source_name, 

230 content_type=content_type, 

231 chunk_type=ChunkType.RAW, 

232 page_start=page_num, 

233 page_end=page_num, 

234 line_start=0, 

235 line_end=0, 

236 chunk=text, 

237 chunk_index=i, 

238 vector=vec, 

239 ) 

240 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True)) 

241 ] 

242 

243 

244def _capture_result_page_texts( 

245 result: ExtractionResult, 

246 source_name: str, 

247 content_type: str, 

248 page_texts_out: list[PageTextRecord] | None, 

249) -> None: 

250 """Append a normal extraction's page texts to the export accumulator. 

251 

252 Paginated PDFs yield one row per ``result.pages`` entry; other documents 

253 have no page split, so the full ``result.content`` is recorded as page 0. 

254 """ 

255 if page_texts_out is None: 

256 return 

257 if result.pages: 

258 page_texts_out.extend( 

259 _page_text_record(source_name, page["page_number"], page["content"], content_type) 

260 for page in result.pages 

261 ) 

262 elif result.content.strip(): 

263 page_texts_out.append(_page_text_record(source_name, 0, result.content, content_type)) 

264 

265 

266async def _handle_scanned_pdf_fallback( 

267 path: Path, 

268 source_name: str, 

269 content_type: str, 

270 result: ExtractionResult, 

271 *, 

272 quiet: bool, 

273 on_progress: DetailedProgressCallback, 

274 page_texts_out: list[PageTextRecord] | None = None, 

275) -> list[ChunkRecord]: 

276 """Route a scanned PDF through the configured OCR backend. 

277 

278 Vision OCR (pool-routed) runs when ``_should_run_ocr()`` is True 

279 and a vision model is configured; otherwise the file falls back to 

280 inline Tesseract. Both paths return chunk records; an empty list 

281 means OCR found no usable text and the caller should skip the 

282 file. 

283 """ 

284 del result # Both backends re-extract; the kreuzberg result is not reused. 

285 use_ocr = _should_run_ocr() 

286 if use_ocr and cfg.vision_model: 

287 log.info( 

288 "Scanned PDF: using vision OCR for %s (model=%s)", 

289 source_name, 

290 cfg.vision_model, 

291 ) 

292 return await _vision_ocr_fallback( 

293 path, 

294 source_name, 

295 content_type, 

296 on_progress=on_progress, 

297 quiet=quiet, 

298 page_texts_out=page_texts_out, 

299 ) 

300 

301 log.info("Scanned PDF: falling back to Tesseract OCR for %s", source_name) 

302 chunks = await _tesseract_ocr_fallback( 

303 path, 

304 source_name, 

305 content_type, 

306 on_progress=on_progress, 

307 page_texts_out=page_texts_out, 

308 ) 

309 if not chunks: 

310 log.warning( 

311 "Skipped %s: text extraction produced no usable text. " 

312 "For better results on scanned PDFs, configure a vision model " 

313 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.", 

314 source_name, 

315 ) 

316 return chunks 

317 

318 

319async def ingest_document( 

320 path: Path, 

321 source_name: str, 

322 content_type: str, 

323 *, 

324 quiet: bool = False, 

325 on_progress: DetailedProgressCallback = noop_callback, 

326 page_texts_out: list[PageTextRecord] | None = None, 

327) -> list[ChunkRecord]: 

328 """Extract and chunk a document, embed, return records. 

329 

330 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``). 

331 When ``page_texts_out`` is given, per-page text is appended for export. 

332 """ 

333 from kreuzberg import extract_file_sync 

334 

335 config = extraction_config(content_type_to_mode(content_type)) 

336 result = await asyncio.to_thread(extract_file_sync, str(path), config=config) 

337 

338 if content_type == PDF_CONTENT_TYPE and not _has_meaningful_text(result): 

339 return await _handle_scanned_pdf_fallback( 

340 path, 

341 source_name, 

342 content_type, 

343 result, 

344 quiet=quiet, 

345 on_progress=on_progress, 

346 page_texts_out=page_texts_out, 

347 ) 

348 

349 if not result.chunks: 

350 return [] 

351 

352 _capture_result_page_texts(result, source_name, content_type, page_texts_out) 

353 

354 # Fire one EXTRACT event per file so subscribers (chat /add, /sync, 

355 # CLI Rich progress) can show "extracted N pages" before the embed 

356 # phase starts; otherwise a 44MB PDF sits at file-level 0% for 

357 # minutes. get_page_count is the canonical PDF page count; for 

358 # non-paginated formats we fall back to the chunk count. 

359 page_count = result.get_page_count() or len(result.chunks) 

360 on_progress( 

361 EventType.EXTRACT, 

362 ExtractEvent(file=source_name, page=page_count, total_pages=page_count), 

363 ) 

364 

365 texts = [chunk.content for chunk in result.chunks] 

366 vectors = await asyncio.to_thread( 

367 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

368 ) 

369 

370 return [ 

371 ChunkRecord( 

372 source=source_name, 

373 content_type=content_type, 

374 chunk_type=ChunkType.RAW, 

375 page_start=chunk.metadata.get("first_page") or 0, 

376 page_end=chunk.metadata.get("last_page") or 0, 

377 line_start=0, 

378 line_end=0, 

379 chunk=text, 

380 chunk_index=chunk.metadata.get("chunk_index", idx), 

381 vector=vec, 

382 ) 

383 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True)) 

384 ] 

385 

386 

387async def ingest_markdown( 

388 path: Path, 

389 source_name: str, 

390 on_progress: DetailedProgressCallback = noop_callback, 

391 page_texts_out: list[PageTextRecord] | None = None, 

392) -> list[ChunkRecord]: 

393 """Chunk a markdown file with heading context prepended to each chunk. 

394 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install") 

395 prepended for better retrieval context. When ``page_texts_out`` is given, 

396 the full text is appended as page 0 for export. 

397 """ 

398 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace") 

399 if not raw_text.strip(): 

400 return [] 

401 

402 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True) 

403 if not texts: 

404 return [] 

405 

406 if page_texts_out is not None: 

407 page_texts_out.append(_page_text_record(source_name, 0, raw_text, "text")) 

408 

409 vectors = await asyncio.to_thread( 

410 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress 

411 ) 

412 return [ 

413 ChunkRecord( 

414 source=source_name, 

415 content_type="text", 

416 chunk_type=ChunkType.RAW, 

417 page_start=0, 

418 page_end=0, 

419 line_start=0, 

420 line_end=0, 

421 chunk=t, 

422 chunk_index=idx, 

423 vector=vec, 

424 ) 

425 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True)) 

426 ]