Coverage for src / lilbee / data / ingest / extract.py: 100%
124 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Document extraction: kreuzberg config, OCR fallback, markdown/document chunking."""
3from __future__ import annotations
5import asyncio
6import logging
7from collections.abc import Callable, Sequence
8from pathlib import Path
9from typing import TYPE_CHECKING, Any
11if TYPE_CHECKING:
12 from kreuzberg import ExtractionConfig, ExtractionResult
14from lilbee.app.services import get_services
15from lilbee.core.config import cfg
16from lilbee.data.chunk import build_chunking_config, chunk_text
17from lilbee.data.ingest.types import (
18 MARKDOWN_OUTPUT,
19 MIN_MEANINGFUL_CHARS,
20 PDF_CONTENT_TYPE,
21 TESSERACT_BACKEND,
22 ChunkRecord,
23 ExtractMode,
24)
25from lilbee.data.store import ChunkType, PageTextRecord
26from lilbee.runtime.cpu import cpu_quota
27from lilbee.runtime.progress import (
28 DetailedProgressCallback,
29 EventType,
30 ExtractEvent,
31 noop_callback,
32)
34log = logging.getLogger(__name__)
37def _has_meaningful_text(result: ExtractionResult) -> bool:
38 """True when extraction yielded real text; content is primary, chunks the fallback."""
39 if len(result.content.strip()) > MIN_MEANINGFUL_CHARS:
40 return True
41 return sum(len(c.content.strip()) for c in result.chunks or []) > MIN_MEANINGFUL_CHARS
44def content_type_to_mode(content_type: str) -> ExtractMode:
45 """Map a content_type to the extraction mode."""
46 return ExtractMode.PAGINATED if content_type == PDF_CONTENT_TYPE else ExtractMode.MARKDOWN
49def _page_text_record(source: str, page: int, text: str, content_type: str) -> PageTextRecord:
50 """Build one per-page text row for the export dataset."""
51 return PageTextRecord(source=source, page=page, text=text, content_type=content_type)
54def extraction_config(mode: ExtractMode) -> ExtractionConfig:
55 """Build ExtractionConfig for the given extraction mode."""
56 from kreuzberg import ConcurrencyConfig, ExtractionConfig, OcrConfig, PageConfig
58 chunking = build_chunking_config()
59 pages = PageConfig(extract_pages=True, insert_page_markers=False)
60 ocr = OcrConfig(backend=TESSERACT_BACKEND)
61 # Bound kreuzberg's internal pool to the same CPU budget as the
62 # pipeline semaphore so the two stop competing for cores.
63 concurrency = ConcurrencyConfig(max_threads=cpu_quota())
64 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = {
65 ExtractMode.MARKDOWN: lambda: ExtractionConfig(
66 chunking=chunking,
67 output_format=MARKDOWN_OUTPUT,
68 concurrency=concurrency,
69 ),
70 ExtractMode.PAGINATED: lambda: ExtractionConfig(
71 chunking=chunking,
72 pages=pages,
73 concurrency=concurrency,
74 ),
75 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig(
76 chunking=chunking,
77 pages=pages,
78 ocr=ocr,
79 concurrency=concurrency,
80 ),
81 }
82 return builders[mode]()
85def _should_run_ocr() -> bool:
86 """Decide whether to attempt vision-based OCR on scanned PDFs.
88 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``:
89 True = force on (requires ``cfg.vision_model`` to be set for a real
90 vision run; otherwise the caller falls back to Tesseract).
91 False = force off.
92 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set.
93 """
94 if cfg.enable_ocr is True:
95 return True
96 if cfg.enable_ocr is False:
97 return False
98 return bool(cfg.vision_model)
101def _record_page_texts(
102 page_texts: Sequence[tuple[int, str]],
103 source_name: str,
104 content_type: str,
105 page_texts_out: list[PageTextRecord] | None,
106) -> None:
107 """Append OCR page texts to the export accumulator when one is supplied."""
108 if page_texts_out is None:
109 return
110 page_texts_out.extend(
111 _page_text_record(source_name, page, text, content_type) for page, text in page_texts
112 )
115async def _vision_ocr_fallback(
116 path: Path,
117 source_name: str,
118 content_type: str,
119 *,
120 on_progress: DetailedProgressCallback,
121 quiet: bool,
122 page_texts_out: list[PageTextRecord] | None = None,
123) -> list[ChunkRecord]:
124 """Vision OCR via the persistent worker pool, chunk + embed the pages.
126 Pool routing is what amortises the multi-second vision-Llama load
127 cost across PDFs. Tesseract has no shared model state and is run
128 inline by ``_tesseract_ocr_fallback``; this helper is vision-only.
129 """
130 try:
131 page_texts = await asyncio.to_thread(
132 get_services().provider.pdf_ocr,
133 path,
134 backend="vision",
135 model=cfg.vision_model,
136 per_page_timeout_s=cfg.ocr_timeout,
137 quiet=quiet,
138 on_progress=on_progress,
139 )
140 except Exception:
141 log.warning("OCR via vision backend failed for %s.", source_name, exc_info=True)
142 return []
143 _record_page_texts(page_texts, source_name, content_type, page_texts_out)
144 return await chunk_and_embed_pages(page_texts, source_name, content_type, on_progress)
147def _run_tesseract_sync(path: Path) -> Any:
148 """Run kreuzberg Tesseract OCR with the worker's stderr redirected to /dev/null.
150 Tesseract writes "Line cannot be recognized!!", "Image too small to
151 scale!!", and "Detected N diacritics" directly to fd 2 from inside libc.
152 Without the redirect those lines flood the TUI log file (1 000+ entries
153 per scanned PDF) and can bleed into the TUI itself. We hold the
154 suppression for just the duration of the extraction call so other
155 threads' stderr writes still go through.
156 """
157 from kreuzberg import extract_file_sync
159 from lilbee.providers.llama_cpp.log_dispatch import stderr_suppressed
161 with stderr_suppressed():
162 return extract_file_sync(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR))
165async def _tesseract_ocr_fallback(
166 path: Path,
167 source_name: str,
168 content_type: str,
169 *,
170 on_progress: DetailedProgressCallback,
171 page_texts_out: list[PageTextRecord] | None = None,
172) -> list[ChunkRecord]:
173 """Tesseract OCR via ``asyncio.to_thread`` (no model load = no pool).
175 ``cfg.tesseract_timeout`` caps the whole-document extract; 0 means
176 unlimited. Failures (including timeout) log a warning and return an
177 empty list so the caller can skip the file.
178 """
179 coro = asyncio.to_thread(_run_tesseract_sync, path)
180 try:
181 if cfg.tesseract_timeout > 0:
182 result = await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout)
183 else:
184 result = await coro
185 except TimeoutError:
186 log.warning(
187 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.",
188 cfg.tesseract_timeout,
189 source_name,
190 )
191 return []
192 except Exception:
193 log.warning("OCR via tesseract backend failed for %s.", source_name, exc_info=True)
194 return []
196 by_page: dict[int, list[str]] = {}
197 for chunk in result.chunks or []:
198 page = int(chunk.metadata.get("first_page") or 1)
199 by_page.setdefault(page, []).append(chunk.content)
200 page_texts = [(page, "\n".join(by_page[page])) for page in sorted(by_page)]
201 _record_page_texts(page_texts, source_name, content_type, page_texts_out)
202 return await chunk_and_embed_pages(page_texts, source_name, content_type, on_progress)
205async def chunk_and_embed_pages(
206 page_texts: Sequence[tuple[int, str]],
207 source_name: str,
208 content_type: str,
209 on_progress: DetailedProgressCallback,
210) -> list[ChunkRecord]:
211 """Chunk per-page text and embed every chunk. Shared by OCR ingest and import."""
212 if not page_texts:
213 return []
215 # Single OCR page rarely spans multiple topics; skip the semantic round-trip.
216 all_chunks = [
217 (page_num, chunk)
218 for page_num, text in page_texts
219 for chunk in chunk_text(text, use_semantic=False)
220 ]
221 if not all_chunks:
222 return []
223 texts = [c for _, c in all_chunks]
224 vectors = await asyncio.to_thread(
225 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
226 )
227 return [
228 ChunkRecord(
229 source=source_name,
230 content_type=content_type,
231 chunk_type=ChunkType.RAW,
232 page_start=page_num,
233 page_end=page_num,
234 line_start=0,
235 line_end=0,
236 chunk=text,
237 chunk_index=i,
238 vector=vec,
239 )
240 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True))
241 ]
244def _capture_result_page_texts(
245 result: ExtractionResult,
246 source_name: str,
247 content_type: str,
248 page_texts_out: list[PageTextRecord] | None,
249) -> None:
250 """Append a normal extraction's page texts to the export accumulator.
252 Paginated PDFs yield one row per ``result.pages`` entry; other documents
253 have no page split, so the full ``result.content`` is recorded as page 0.
254 """
255 if page_texts_out is None:
256 return
257 if result.pages:
258 page_texts_out.extend(
259 _page_text_record(source_name, page["page_number"], page["content"], content_type)
260 for page in result.pages
261 )
262 elif result.content.strip():
263 page_texts_out.append(_page_text_record(source_name, 0, result.content, content_type))
266async def _handle_scanned_pdf_fallback(
267 path: Path,
268 source_name: str,
269 content_type: str,
270 result: ExtractionResult,
271 *,
272 quiet: bool,
273 on_progress: DetailedProgressCallback,
274 page_texts_out: list[PageTextRecord] | None = None,
275) -> list[ChunkRecord]:
276 """Route a scanned PDF through the configured OCR backend.
278 Vision OCR (pool-routed) runs when ``_should_run_ocr()`` is True
279 and a vision model is configured; otherwise the file falls back to
280 inline Tesseract. Both paths return chunk records; an empty list
281 means OCR found no usable text and the caller should skip the
282 file.
283 """
284 del result # Both backends re-extract; the kreuzberg result is not reused.
285 use_ocr = _should_run_ocr()
286 if use_ocr and cfg.vision_model:
287 log.info(
288 "Scanned PDF: using vision OCR for %s (model=%s)",
289 source_name,
290 cfg.vision_model,
291 )
292 return await _vision_ocr_fallback(
293 path,
294 source_name,
295 content_type,
296 on_progress=on_progress,
297 quiet=quiet,
298 page_texts_out=page_texts_out,
299 )
301 log.info("Scanned PDF: falling back to Tesseract OCR for %s", source_name)
302 chunks = await _tesseract_ocr_fallback(
303 path,
304 source_name,
305 content_type,
306 on_progress=on_progress,
307 page_texts_out=page_texts_out,
308 )
309 if not chunks:
310 log.warning(
311 "Skipped %s: text extraction produced no usable text. "
312 "For better results on scanned PDFs, configure a vision model "
313 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.",
314 source_name,
315 )
316 return chunks
319async def ingest_document(
320 path: Path,
321 source_name: str,
322 content_type: str,
323 *,
324 quiet: bool = False,
325 on_progress: DetailedProgressCallback = noop_callback,
326 page_texts_out: list[PageTextRecord] | None = None,
327) -> list[ChunkRecord]:
328 """Extract and chunk a document, embed, return records.
330 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``).
331 When ``page_texts_out`` is given, per-page text is appended for export.
332 """
333 from kreuzberg import extract_file_sync
335 config = extraction_config(content_type_to_mode(content_type))
336 result = await asyncio.to_thread(extract_file_sync, str(path), config=config)
338 if content_type == PDF_CONTENT_TYPE and not _has_meaningful_text(result):
339 return await _handle_scanned_pdf_fallback(
340 path,
341 source_name,
342 content_type,
343 result,
344 quiet=quiet,
345 on_progress=on_progress,
346 page_texts_out=page_texts_out,
347 )
349 if not result.chunks:
350 return []
352 _capture_result_page_texts(result, source_name, content_type, page_texts_out)
354 # Fire one EXTRACT event per file so subscribers (chat /add, /sync,
355 # CLI Rich progress) can show "extracted N pages" before the embed
356 # phase starts; otherwise a 44MB PDF sits at file-level 0% for
357 # minutes. get_page_count is the canonical PDF page count; for
358 # non-paginated formats we fall back to the chunk count.
359 page_count = result.get_page_count() or len(result.chunks)
360 on_progress(
361 EventType.EXTRACT,
362 ExtractEvent(file=source_name, page=page_count, total_pages=page_count),
363 )
365 texts = [chunk.content for chunk in result.chunks]
366 vectors = await asyncio.to_thread(
367 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
368 )
370 return [
371 ChunkRecord(
372 source=source_name,
373 content_type=content_type,
374 chunk_type=ChunkType.RAW,
375 page_start=chunk.metadata.get("first_page") or 0,
376 page_end=chunk.metadata.get("last_page") or 0,
377 line_start=0,
378 line_end=0,
379 chunk=text,
380 chunk_index=chunk.metadata.get("chunk_index", idx),
381 vector=vec,
382 )
383 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True))
384 ]
387async def ingest_markdown(
388 path: Path,
389 source_name: str,
390 on_progress: DetailedProgressCallback = noop_callback,
391 page_texts_out: list[PageTextRecord] | None = None,
392) -> list[ChunkRecord]:
393 """Chunk a markdown file with heading context prepended to each chunk.
394 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install")
395 prepended for better retrieval context. When ``page_texts_out`` is given,
396 the full text is appended as page 0 for export.
397 """
398 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace")
399 if not raw_text.strip():
400 return []
402 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True)
403 if not texts:
404 return []
406 if page_texts_out is not None:
407 page_texts_out.append(_page_text_record(source_name, 0, raw_text, "text"))
409 vectors = await asyncio.to_thread(
410 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
411 )
412 return [
413 ChunkRecord(
414 source=source_name,
415 content_type="text",
416 chunk_type=ChunkType.RAW,
417 page_start=0,
418 page_end=0,
419 line_start=0,
420 line_end=0,
421 chunk=t,
422 chunk_index=idx,
423 vector=vec,
424 )
425 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True))
426 ]