Coverage for src / lilbee / data / ingest / extract.py: 100%
108 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Document extraction: kreuzberg config, OCR fallback, markdown/document chunking."""
3from __future__ import annotations
5import asyncio
6import logging
7from collections.abc import Callable, Sequence
8from pathlib import Path
9from typing import TYPE_CHECKING, Any
11if TYPE_CHECKING:
12 from kreuzberg import ExtractionConfig, ExtractionResult
14from lilbee.app.services import get_services
15from lilbee.core.config import cfg
16from lilbee.data.chunk import build_chunking_config, chunk_text
17from lilbee.data.ingest.types import (
18 MARKDOWN_OUTPUT,
19 MIN_MEANINGFUL_CHARS,
20 PDF_CONTENT_TYPE,
21 TESSERACT_BACKEND,
22 ChunkRecord,
23 ExtractMode,
24)
25from lilbee.data.store import CHUNK_TYPE_RAW
26from lilbee.runtime.cpu import cpu_quota
27from lilbee.runtime.progress import (
28 DetailedProgressCallback,
29 EventType,
30 ExtractEvent,
31 noop_callback,
32)
34log = logging.getLogger(__name__)
37def _has_meaningful_text(result: Any) -> bool:
38 """Check if extraction produced meaningful text."""
39 chunks = getattr(result, "chunks", None)
40 if chunks:
41 total = sum(len(c.content.strip()) for c in chunks)
42 return total > MIN_MEANINGFUL_CHARS
43 return False
46def content_type_to_mode(content_type: str) -> ExtractMode:
47 """Map a content_type to the extraction mode."""
48 return ExtractMode.PAGINATED if content_type == PDF_CONTENT_TYPE else ExtractMode.MARKDOWN
51def extraction_config(mode: ExtractMode) -> ExtractionConfig:
52 """Build ExtractionConfig for the given extraction mode."""
53 from kreuzberg import ConcurrencyConfig, ExtractionConfig, OcrConfig, PageConfig
55 chunking = build_chunking_config()
56 pages = PageConfig(extract_pages=True, insert_page_markers=False)
57 ocr = OcrConfig(backend=TESSERACT_BACKEND)
58 # Bound kreuzberg's internal pool to the same CPU budget as the
59 # pipeline semaphore so the two stop competing for cores.
60 concurrency = ConcurrencyConfig(max_threads=cpu_quota())
61 builders: dict[ExtractMode, Callable[[], ExtractionConfig]] = {
62 ExtractMode.MARKDOWN: lambda: ExtractionConfig(
63 chunking=chunking,
64 output_format=MARKDOWN_OUTPUT,
65 concurrency=concurrency,
66 ),
67 ExtractMode.PAGINATED: lambda: ExtractionConfig(
68 chunking=chunking,
69 pages=pages,
70 concurrency=concurrency,
71 ),
72 ExtractMode.PAGINATED_OCR: lambda: ExtractionConfig(
73 chunking=chunking,
74 pages=pages,
75 ocr=ocr,
76 concurrency=concurrency,
77 ),
78 }
79 return builders[mode]()
82def _should_run_ocr() -> bool:
83 """Decide whether to attempt vision-based OCR on scanned PDFs.
85 Uses ``cfg.enable_ocr`` and ``cfg.vision_model``:
86 True = force on (requires ``cfg.vision_model`` to be set for a real
87 vision run; otherwise the caller falls back to Tesseract).
88 False = force off.
89 None = auto-detect: run vision OCR when ``cfg.vision_model`` is set.
90 """
91 if cfg.enable_ocr is True:
92 return True
93 if cfg.enable_ocr is False:
94 return False
95 return bool(cfg.vision_model)
98async def _vision_ocr_fallback(
99 path: Path,
100 source_name: str,
101 content_type: str,
102 *,
103 on_progress: DetailedProgressCallback,
104 quiet: bool,
105) -> list[ChunkRecord]:
106 """Vision OCR via the persistent worker pool, chunk + embed the pages.
108 Pool routing is what amortises the multi-second vision-Llama load
109 cost across PDFs. Tesseract has no shared model state and is run
110 inline by ``_tesseract_ocr_fallback``; this helper is vision-only.
111 """
112 try:
113 page_texts = await asyncio.to_thread(
114 get_services().provider.pdf_ocr,
115 path,
116 backend="vision",
117 model=cfg.vision_model,
118 per_page_timeout_s=cfg.ocr_timeout,
119 quiet=quiet,
120 on_progress=on_progress,
121 )
122 except Exception:
123 log.warning("OCR via vision backend failed for %s.", source_name, exc_info=True)
124 return []
125 return await _chunk_and_embed_pages(page_texts, source_name, content_type, on_progress)
128def _run_tesseract_sync(path: Path) -> Any:
129 """Run kreuzberg Tesseract OCR with the worker's stderr redirected to /dev/null.
131 Tesseract writes "Line cannot be recognized!!", "Image too small to
132 scale!!", and "Detected N diacritics" directly to fd 2 from inside libc.
133 Without the redirect those lines flood the TUI log file (1 000+ entries
134 per scanned PDF) and can bleed into the TUI itself. We hold the
135 suppression for just the duration of the extraction call so other
136 threads' stderr writes still go through.
137 """
138 from kreuzberg import extract_file_sync
140 from lilbee.providers.llama_cpp.log_dispatch import stderr_suppressed
142 with stderr_suppressed():
143 return extract_file_sync(str(path), config=extraction_config(ExtractMode.PAGINATED_OCR))
146async def _tesseract_ocr_fallback(
147 path: Path,
148 source_name: str,
149 content_type: str,
150 *,
151 on_progress: DetailedProgressCallback,
152) -> list[ChunkRecord]:
153 """Tesseract OCR via ``asyncio.to_thread`` (no model load = no pool).
155 ``cfg.tesseract_timeout`` caps the whole-document extract; 0 means
156 unlimited. Failures (including timeout) log a warning and return an
157 empty list so the caller can skip the file.
158 """
159 coro = asyncio.to_thread(_run_tesseract_sync, path)
160 try:
161 if cfg.tesseract_timeout > 0:
162 result = await asyncio.wait_for(coro, timeout=cfg.tesseract_timeout)
163 else:
164 result = await coro
165 except TimeoutError:
166 log.warning(
167 "Tesseract OCR exceeded %.0fs timeout on %s; skipping.",
168 cfg.tesseract_timeout,
169 source_name,
170 )
171 return []
172 except Exception:
173 log.warning("OCR via tesseract backend failed for %s.", source_name, exc_info=True)
174 return []
176 by_page: dict[int, list[str]] = {}
177 for chunk in result.chunks or []:
178 page = int(chunk.metadata.get("first_page") or 1)
179 by_page.setdefault(page, []).append(chunk.content)
180 page_texts = [(page, "\n".join(by_page[page])) for page in sorted(by_page)]
181 return await _chunk_and_embed_pages(page_texts, source_name, content_type, on_progress)
184async def _chunk_and_embed_pages(
185 page_texts: Sequence[tuple[int, str]],
186 source_name: str,
187 content_type: str,
188 on_progress: DetailedProgressCallback,
189) -> list[ChunkRecord]:
190 """Chunk OCR per-page text and embed every chunk through the embedder."""
191 if not page_texts:
192 return []
194 # Single OCR page rarely spans multiple topics; skip the semantic round-trip.
195 all_chunks = [
196 (page_num, chunk)
197 for page_num, text in page_texts
198 for chunk in chunk_text(text, use_semantic=False)
199 ]
200 if not all_chunks:
201 return []
202 texts = [c for _, c in all_chunks]
203 vectors = await asyncio.to_thread(
204 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
205 )
206 return [
207 ChunkRecord(
208 source=source_name,
209 content_type=content_type,
210 chunk_type=CHUNK_TYPE_RAW,
211 page_start=page_num,
212 page_end=page_num,
213 line_start=0,
214 line_end=0,
215 chunk=text,
216 chunk_index=i,
217 vector=vec,
218 )
219 for i, ((page_num, text), vec) in enumerate(zip(all_chunks, vectors, strict=True))
220 ]
223async def _handle_scanned_pdf_fallback(
224 path: Path,
225 source_name: str,
226 content_type: str,
227 result: ExtractionResult,
228 *,
229 quiet: bool,
230 on_progress: DetailedProgressCallback,
231) -> list[ChunkRecord]:
232 """Route a scanned PDF through the configured OCR backend.
234 Vision OCR (pool-routed) runs when ``_should_run_ocr()`` is True
235 and a vision model is configured; otherwise the file falls back to
236 inline Tesseract. Both paths return chunk records; an empty list
237 means OCR found no usable text and the caller should skip the
238 file.
239 """
240 del result # Both backends re-extract; the kreuzberg result is not reused.
241 use_ocr = _should_run_ocr()
242 if use_ocr and cfg.vision_model:
243 log.info(
244 "Scanned PDF: using vision OCR for %s (model=%s)",
245 source_name,
246 cfg.vision_model,
247 )
248 return await _vision_ocr_fallback(
249 path,
250 source_name,
251 content_type,
252 on_progress=on_progress,
253 quiet=quiet,
254 )
256 log.info("Scanned PDF: falling back to Tesseract OCR for %s", source_name)
257 chunks = await _tesseract_ocr_fallback(
258 path,
259 source_name,
260 content_type,
261 on_progress=on_progress,
262 )
263 if not chunks:
264 log.warning(
265 "Skipped %s: text extraction produced no usable text. "
266 "For better results on scanned PDFs, configure a vision model "
267 "via PUT /api/models/vision or set LILBEE_ENABLE_OCR=true.",
268 source_name,
269 )
270 return chunks
273async def ingest_document(
274 path: Path,
275 source_name: str,
276 content_type: str,
277 *,
278 quiet: bool = False,
279 on_progress: DetailedProgressCallback = noop_callback,
280) -> list[ChunkRecord]:
281 """Extract and chunk a document, embed, return records.
283 Vision OCR is controlled by ``cfg.enable_ocr`` (see ``_should_run_ocr``).
284 """
285 from kreuzberg import extract_file_sync
287 config = extraction_config(content_type_to_mode(content_type))
288 result = await asyncio.to_thread(extract_file_sync, str(path), config=config)
290 if content_type == PDF_CONTENT_TYPE and not _has_meaningful_text(result):
291 return await _handle_scanned_pdf_fallback(
292 path,
293 source_name,
294 content_type,
295 result,
296 quiet=quiet,
297 on_progress=on_progress,
298 )
300 if not result.chunks:
301 return []
303 # Fire one EXTRACT event per file so subscribers (chat /add, /sync,
304 # CLI Rich progress) can show "extracted N pages" before the embed
305 # phase starts; otherwise a 44MB PDF sits at file-level 0% for
306 # minutes. get_page_count is the canonical PDF page count; for
307 # non-paginated formats we fall back to the chunk count.
308 page_count = result.get_page_count() or len(result.chunks)
309 on_progress(
310 EventType.EXTRACT,
311 ExtractEvent(file=source_name, page=page_count, total_pages=page_count),
312 )
314 texts = [chunk.content for chunk in result.chunks]
315 vectors = await asyncio.to_thread(
316 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
317 )
319 return [
320 ChunkRecord(
321 source=source_name,
322 content_type=content_type,
323 chunk_type=CHUNK_TYPE_RAW,
324 page_start=chunk.metadata.get("first_page") or 0,
325 page_end=chunk.metadata.get("last_page") or 0,
326 line_start=0,
327 line_end=0,
328 chunk=text,
329 chunk_index=chunk.metadata.get("chunk_index", idx),
330 vector=vec,
331 )
332 for idx, (chunk, text, vec) in enumerate(zip(result.chunks, texts, vectors, strict=True))
333 ]
336async def ingest_markdown(
337 path: Path,
338 source_name: str,
339 on_progress: DetailedProgressCallback = noop_callback,
340) -> list[ChunkRecord]:
341 """Chunk a markdown file with heading context prepended to each chunk.
342 Each chunk gets the heading hierarchy path (e.g. "# Setup > ## Install")
343 prepended for better retrieval context.
344 """
345 raw_text = await asyncio.to_thread(path.read_text, encoding="utf-8", errors="replace")
346 if not raw_text.strip():
347 return []
349 texts = chunk_text(raw_text, mime_type="text/markdown", heading_context=True)
350 if not texts:
351 return []
353 vectors = await asyncio.to_thread(
354 get_services().embedder.embed_batch, texts, source=source_name, on_progress=on_progress
355 )
356 return [
357 ChunkRecord(
358 source=source_name,
359 content_type="text",
360 chunk_type=CHUNK_TYPE_RAW,
361 page_start=0,
362 page_end=0,
363 line_start=0,
364 line_end=0,
365 chunk=t,
366 chunk_index=idx,
367 vector=vec,
368 )
369 for idx, (t, vec) in enumerate(zip(texts, vectors, strict=True))
370 ]