Coverage for src / lilbee / app / dataset.py: 100%
76 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""Surface-neutral export/import use cases over the per-page text dataset."""
3from __future__ import annotations
5from dataclasses import dataclass
6from pathlib import Path
8from pydantic import BaseModel
10from lilbee.app.services import get_services
11from lilbee.data.export import (
12 DatasetFormat,
13 build_page_dataset,
14 decode_format,
15 deserialize_dataset,
16 import_dataset,
17 load_page_dataset,
18 resolve_format,
19 serialize_dataset,
20 write_dataset,
21)
22from lilbee.data.store import EmbeddingModelMismatchError, PageTextRecord
23from lilbee.runtime.progress import DetailedProgressCallback, noop_callback
26class DatasetError(Exception):
27 """User-facing export/import failure surfaces render as-is."""
30class ExportSummary(BaseModel):
31 """Result of a path-based export."""
33 command: str = "export"
34 format: str
35 output: str
36 pages: int
37 sources: int
40class ImportSummary(BaseModel):
41 """Result of an import."""
43 command: str = "import"
44 sources: list[str]
45 pages: int
46 chunks: int
49@dataclass
50class ExportPayload:
51 """In-memory export for byte transport (HTTP download)."""
53 data: bytes
54 fmt: DatasetFormat
55 pages: int
56 sources: int
59def require_format(value: str) -> DatasetFormat:
60 """Decode an explicit *value* into a format; there is no path to infer from."""
61 if not value:
62 raise DatasetError("format is required (parquet or jsonl)")
63 try:
64 return decode_format(value)
65 except ValueError as exc:
66 raise DatasetError(str(exc)) from None
69def _build_validated(source: str | None) -> list[PageTextRecord]:
70 """Build the dataset rows for *source* (or all), validating the request."""
71 store = get_services().store
72 if source is not None and source not in {s["filename"] for s in store.get_sources()}:
73 raise DatasetError(f"Source not found: {source}")
74 rows = build_page_dataset(store, source)
75 if not rows:
76 raise DatasetError("Nothing to export: the store has no indexed pages.")
77 return rows
80def export_to_path(output: Path, fmt_value: str, source: str | None) -> ExportSummary:
81 """Write the per-page dataset to *output*; format from *fmt_value* or suffix."""
82 try:
83 fmt = resolve_format(fmt_value, output)
84 except ValueError as exc:
85 raise DatasetError(str(exc)) from None
86 rows = _build_validated(source)
87 write_dataset(rows, output, fmt)
88 return ExportSummary(
89 format=str(fmt),
90 output=str(output),
91 pages=len(rows),
92 sources=len({row["source"] for row in rows}),
93 )
96def export_to_bytes(fmt_value: str, source: str | None) -> ExportPayload:
97 """Encode the per-page dataset to bytes; empty *fmt_value* defaults to parquet."""
98 fmt = require_format(fmt_value) if fmt_value else DatasetFormat.PARQUET
99 rows = _build_validated(source)
100 return ExportPayload(
101 data=serialize_dataset(rows, fmt),
102 fmt=fmt,
103 pages=len(rows),
104 sources=len({row["source"] for row in rows}),
105 )
108async def _run_import(
109 rows: list[PageTextRecord], on_progress: DetailedProgressCallback
110) -> ImportSummary:
111 """Re-embed *rows* into the store, mapping the mismatch error for surfaces."""
112 if not rows:
113 raise DatasetError("Dataset has no pages to import.")
114 store = get_services().store
115 try:
116 result = await import_dataset(store, rows, on_progress=on_progress)
117 except EmbeddingModelMismatchError as exc:
118 raise DatasetError(str(exc)) from None
119 return ImportSummary(sources=result.sources, pages=result.pages, chunks=result.chunks)
122async def import_from_path(
123 path: Path, fmt_value: str, on_progress: DetailedProgressCallback = noop_callback
124) -> ImportSummary:
125 """Load and import a dataset file; format from *fmt_value* or suffix."""
126 try:
127 fmt = resolve_format(fmt_value, path)
128 rows = load_page_dataset(path, fmt)
129 except ValueError as exc:
130 raise DatasetError(str(exc)) from None
131 return await _run_import(rows, on_progress)
134async def import_from_bytes(
135 data: bytes, fmt_value: str, on_progress: DetailedProgressCallback = noop_callback
136) -> ImportSummary:
137 """Decode and import dataset *data*; *fmt_value* is required (no filename)."""
138 fmt = require_format(fmt_value)
139 try:
140 rows = deserialize_dataset(data, fmt)
141 except ValueError as exc:
142 raise DatasetError(str(exc)) from None
143 return await _run_import(rows, on_progress)