Coverage for src / lilbee / app / dataset.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""Surface-neutral export/import use cases over the per-page text dataset.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from pathlib import Path 

7 

8from pydantic import BaseModel 

9 

10from lilbee.app.services import get_services 

11from lilbee.data.export import ( 

12 DatasetFormat, 

13 build_page_dataset, 

14 decode_format, 

15 deserialize_dataset, 

16 import_dataset, 

17 load_page_dataset, 

18 resolve_format, 

19 serialize_dataset, 

20 write_dataset, 

21) 

22from lilbee.data.store import EmbeddingModelMismatchError, PageTextRecord 

23from lilbee.runtime.progress import DetailedProgressCallback, noop_callback 

24 

25 

26class DatasetError(Exception): 

27 """User-facing export/import failure surfaces render as-is.""" 

28 

29 

30class ExportSummary(BaseModel): 

31 """Result of a path-based export.""" 

32 

33 command: str = "export" 

34 format: str 

35 output: str 

36 pages: int 

37 sources: int 

38 

39 

40class ImportSummary(BaseModel): 

41 """Result of an import.""" 

42 

43 command: str = "import" 

44 sources: list[str] 

45 pages: int 

46 chunks: int 

47 

48 

49@dataclass 

50class ExportPayload: 

51 """In-memory export for byte transport (HTTP download).""" 

52 

53 data: bytes 

54 fmt: DatasetFormat 

55 pages: int 

56 sources: int 

57 

58 

59def require_format(value: str) -> DatasetFormat: 

60 """Decode an explicit *value* into a format; there is no path to infer from.""" 

61 if not value: 

62 raise DatasetError("format is required (parquet or jsonl)") 

63 try: 

64 return decode_format(value) 

65 except ValueError as exc: 

66 raise DatasetError(str(exc)) from None 

67 

68 

69def _build_validated(source: str | None) -> list[PageTextRecord]: 

70 """Build the dataset rows for *source* (or all), validating the request.""" 

71 store = get_services().store 

72 if source is not None and source not in {s["filename"] for s in store.get_sources()}: 

73 raise DatasetError(f"Source not found: {source}") 

74 rows = build_page_dataset(store, source) 

75 if not rows: 

76 raise DatasetError("Nothing to export: the store has no indexed pages.") 

77 return rows 

78 

79 

80def export_to_path(output: Path, fmt_value: str, source: str | None) -> ExportSummary: 

81 """Write the per-page dataset to *output*; format from *fmt_value* or suffix.""" 

82 try: 

83 fmt = resolve_format(fmt_value, output) 

84 except ValueError as exc: 

85 raise DatasetError(str(exc)) from None 

86 rows = _build_validated(source) 

87 write_dataset(rows, output, fmt) 

88 return ExportSummary( 

89 format=str(fmt), 

90 output=str(output), 

91 pages=len(rows), 

92 sources=len({row["source"] for row in rows}), 

93 ) 

94 

95 

96def export_to_bytes(fmt_value: str, source: str | None) -> ExportPayload: 

97 """Encode the per-page dataset to bytes; empty *fmt_value* defaults to parquet.""" 

98 fmt = require_format(fmt_value) if fmt_value else DatasetFormat.PARQUET 

99 rows = _build_validated(source) 

100 return ExportPayload( 

101 data=serialize_dataset(rows, fmt), 

102 fmt=fmt, 

103 pages=len(rows), 

104 sources=len({row["source"] for row in rows}), 

105 ) 

106 

107 

108async def _run_import( 

109 rows: list[PageTextRecord], on_progress: DetailedProgressCallback 

110) -> ImportSummary: 

111 """Re-embed *rows* into the store, mapping the mismatch error for surfaces.""" 

112 if not rows: 

113 raise DatasetError("Dataset has no pages to import.") 

114 store = get_services().store 

115 try: 

116 result = await import_dataset(store, rows, on_progress=on_progress) 

117 except EmbeddingModelMismatchError as exc: 

118 raise DatasetError(str(exc)) from None 

119 return ImportSummary(sources=result.sources, pages=result.pages, chunks=result.chunks) 

120 

121 

122async def import_from_path( 

123 path: Path, fmt_value: str, on_progress: DetailedProgressCallback = noop_callback 

124) -> ImportSummary: 

125 """Load and import a dataset file; format from *fmt_value* or suffix.""" 

126 try: 

127 fmt = resolve_format(fmt_value, path) 

128 rows = load_page_dataset(path, fmt) 

129 except ValueError as exc: 

130 raise DatasetError(str(exc)) from None 

131 return await _run_import(rows, on_progress) 

132 

133 

134async def import_from_bytes( 

135 data: bytes, fmt_value: str, on_progress: DetailedProgressCallback = noop_callback 

136) -> ImportSummary: 

137 """Decode and import dataset *data*; *fmt_value* is required (no filename).""" 

138 fmt = require_format(fmt_value) 

139 try: 

140 rows = deserialize_dataset(data, fmt) 

141 except ValueError as exc: 

142 raise DatasetError(str(exc)) from None 

143 return await _run_import(rows, on_progress)