Coverage for src / lilbee / data / ingest / types.py: 100%
57 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Shared ingest types and constants."""
3from __future__ import annotations
5from dataclasses import dataclass
6from enum import StrEnum
7from pathlib import Path
8from typing import NamedTuple, TypedDict
10from pydantic import BaseModel
13class FileToProcess(NamedTuple):
14 """A file queued for ingestion with its metadata."""
16 name: str
17 path: Path
18 content_type: str
19 file_hash: str
20 needs_cleanup: bool
23# Minimum total chars for extracted text to be considered meaningful.
24# 50 chars ≈ 12 words: if a PDF yields less, it's almost certainly a scanned
25# document with no embedded text layer. Text PDFs with even just a title page
26# easily exceed this threshold; blank/scan-only PDFs yield 0 chars.
27MIN_MEANINGFUL_CHARS = 50
29PDF_CONTENT_TYPE = "pdf"
30MARKDOWN_OUTPUT = "markdown"
31TESSERACT_BACKEND = "tesseract"
34class ExtractMode(StrEnum):
35 """Extraction topology: pagination / OCR / output format."""
37 MARKDOWN = "markdown"
38 PAGINATED = "paginated"
39 PAGINATED_OCR = "paginated_ocr"
42class ChunkRecord(TypedDict):
43 """A single store-ready chunk record matching store.CHUNKS_SCHEMA."""
45 source: str
46 content_type: str
47 chunk_type: str
48 page_start: int
49 page_end: int
50 line_start: int
51 line_end: int
52 chunk: str
53 chunk_index: int
54 vector: list[float]
57class SyncResult(BaseModel):
58 """Summary of a sync operation."""
60 added: list[str] = []
61 updated: list[str] = []
62 removed: list[str] = []
63 unchanged: int = 0
64 failed: list[str] = []
65 skipped: list[str] = []
67 def __str__(self) -> str:
68 lines = [
69 f"Added: {len(self.added)}",
70 f"Updated: {len(self.updated)}",
71 f"Removed: {len(self.removed)}",
72 f"Unchanged: {self.unchanged}",
73 f"Skipped: {len(self.skipped)}",
74 f"Failed: {len(self.failed)}",
75 ]
76 for f in self.skipped:
77 lines.append(f" [yellow]{f}[/yellow]")
78 for f in self.failed:
79 lines.append(f" [red]{f}[/red]")
80 return "\n".join(lines)
82 def __repr__(self) -> str:
83 return (
84 f"SyncResult(added={len(self.added)}, updated={len(self.updated)}, "
85 f"removed={len(self.removed)}, unchanged={self.unchanged}, "
86 f"skipped={len(self.skipped)}, failed={len(self.failed)})"
87 )
89 def __rich__(self) -> str:
90 return self.__str__()
93@dataclass
94class _IngestResult:
95 """Outcome of a single file ingestion attempt."""
97 name: str
98 path: Path
99 chunk_count: int
100 error: Exception | None
101 file_hash: str = ""
104# Extension → content_type string for document formats handled by kreuzberg
105DOCUMENT_EXTENSION_MAP: dict[str, str] = {
106 **{ext: "text" for ext in (".md", ".txt", ".html", ".rst", ".yaml", ".yml")},
107 ".pdf": PDF_CONTENT_TYPE,
108 **{ext: ext.lstrip(".") for ext in (".docx", ".xlsx", ".pptx")},
109 ".epub": "epub",
110 **{ext: "image" for ext in (".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp")},
111 **{ext: "data" for ext in (".csv", ".tsv")},
112 ".xml": "xml",
113 **{ext: "json" for ext in (".json", ".jsonl")},
114}