Coverage for src / lilbee / cli / tui / screens / chat_helpers.py: 100%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Module-level helpers used by ChatScreen: progress callbacks, file cleanup, stream close.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import logging 

7import shutil 

8import time 

9from collections.abc import Callable 

10from typing import Any 

11 

12from lilbee.cli.tui import messages as msg 

13from lilbee.cli.tui.widgets.task_bar_controller import ProgressReporter 

14from lilbee.core.config import cfg 

15from lilbee.providers.base import ClosableIterator 

16from lilbee.runtime.progress import ( 

17 BatchProgressEvent, 

18 BatchStatus, 

19 DetailedProgressCallback, 

20 EmbedEvent, 

21 EventType, 

22 ExtractEvent, 

23 FileDoneEvent, 

24 FileStartEvent, 

25 ProgressEvent, 

26 SyncDoneEvent, 

27) 

28 

29log = logging.getLogger(__name__) 

30 

31_ADD_EMBED_THROTTLE_SECONDS = 0.15 

32"""Throttle EMBED reporter updates to avoid TaskBar update storms. 

33 

34The embed worker fires one EmbedEvent per sub-batch, which on a fast 

35laptop can be dozens per second. The Task Center only repaints at 10 Hz 

36anyway, so we coalesce here at the same cadence. 

37""" 

38 

39 

40def close_stream(stream: Any) -> None: 

41 """Close a streaming iterator if it satisfies the ClosableIterator protocol.""" 

42 if isinstance(stream, ClosableIterator): 

43 with contextlib.suppress(Exception): 

44 stream.close() 

45 

46 

47def detail_for_batch_progress(data: BatchProgressEvent, in_flight: list[str]) -> str: 

48 """Pick the user-facing detail label for a BATCH_PROGRESS tick. 

49 

50 Per-page rasterization (vision OCR) is the only producer that uses 

51 BatchStatus.RASTERIZING; it emits an absolute path in data.file 

52 which never matches the relative source name kept in in_flight, so 

53 identity-based detection would never fire. Status-based dispatch is 

54 the reliable discriminator between per-page and per-file ticks. 

55 """ 

56 if data.status == BatchStatus.RASTERIZING: 

57 return msg.ADD_PAGE_PROGRESS.format( 

58 status=data.status.capitalize(), current=data.current, total=data.total 

59 ) 

60 if in_flight: 

61 return msg.ADD_SYNCING_FILE.format(file=in_flight[0]) 

62 return msg.ADD_FILE_DONE.format(file=data.file) 

63 

64 

65def remove_copied_files(names: list[str]) -> None: 

66 """Delete files previously copied into documents/ by a /add invocation. 

67 

68 Called on cancel or failure of the add task so a cancelled file does not 

69 re-appear on the next sync. Silently tolerates missing entries; 

70 the user may have removed them concurrently, and the goal is just to 

71 prevent accidental indexing. 

72 """ 

73 for name in names: 

74 target = cfg.documents_dir / name 

75 try: 

76 if target.is_dir(): 

77 shutil.rmtree(target, ignore_errors=True) 

78 elif target.exists(): 

79 target.unlink() 

80 except OSError: 

81 log.debug("Could not remove copied file %s", target, exc_info=True) 

82 

83 

84def build_add_progress_callback(reporter: ProgressReporter) -> DetailedProgressCallback: 

85 """Build the on_progress callback used by /add. 

86 

87 Tracks files in flight in start order so the displayed filename pins 

88 to the oldest unfinished file (the pipeline runs files concurrently; 

89 without pinning the label flips around the queue). EXTRACT surfaces 

90 "extracted N pages" once per file so a 44MB scanned PDF doesn't read 

91 as a hang; EMBED ticks per chunk, throttled to a steady cadence. 

92 """ 

93 in_flight: list[str] = [] 

94 last_embed_update = 0.0 

95 

96 def on_progress(event_type: EventType, data: ProgressEvent) -> None: 

97 nonlocal last_embed_update 

98 reporter.check_cancelled() 

99 if event_type == EventType.FILE_START and isinstance(data, FileStartEvent): 

100 in_flight.append(data.file) 

101 reporter.update(0, msg.ADD_SYNCING_FILE.format(file=in_flight[0]), indeterminate=True) 

102 elif event_type == EventType.FILE_DONE and isinstance(data, FileDoneEvent): 

103 with contextlib.suppress(ValueError): 

104 in_flight.remove(data.file) 

105 elif event_type == EventType.BATCH_PROGRESS and isinstance(data, BatchProgressEvent): 

106 pct = (data.current / data.total * 100.0) if data.total else 0.0 

107 reporter.update(pct, detail_for_batch_progress(data, in_flight), indeterminate=False) 

108 elif event_type == EventType.EXTRACT and isinstance(data, ExtractEvent): 

109 reporter.update( 

110 0, 

111 msg.SYNC_FILE_PROGRESS.format( 

112 current=data.page, total=data.total_pages, file=data.file 

113 ), 

114 indeterminate=True, 

115 ) 

116 elif event_type == EventType.EMBED and isinstance(data, EmbedEvent): 

117 now = time.monotonic() 

118 if now - last_embed_update < _ADD_EMBED_THROTTLE_SECONDS: 

119 return 

120 last_embed_update = now 

121 pct = int(data.chunk * 100 / data.total_chunks) if data.total_chunks else 0 

122 reporter.update(pct, msg.SYNC_EMBEDDING.format(file=data.file), indeterminate=False) 

123 

124 return on_progress 

125 

126 

127def build_sync_progress_callback( 

128 reporter: ProgressReporter, 

129) -> Callable[[EventType, ProgressEvent], None]: 

130 """Return the on_progress shim used by ``_do_sync``. 

131 

132 EXTRACT mirrors the /add path: a 44MB scanned PDF needs a per-page 

133 tick or the row reads as frozen. 

134 """ 

135 last_embed_update = 0.0 

136 

137 def on_progress(event_type: EventType, data: ProgressEvent) -> None: 

138 nonlocal last_embed_update 

139 # Mirror /add: explicit cancel check on every event so a SYNC task 

140 # cancelled mid-batch stops at the next progress tick instead of 

141 # finishing the current file. update() also checks, but events 

142 # without a reporter.update call (e.g. BATCH_PROGRESS in the 

143 # ingest_batch path) would otherwise miss the cooperative checkpoint. 

144 reporter.check_cancelled() 

145 if event_type == EventType.FILE_START and isinstance(data, FileStartEvent): 

146 pct = int((data.current_file - 1) * 100 / data.total_files) 

147 status = msg.SYNC_FILE_PROGRESS.format( 

148 current=data.current_file, total=data.total_files, file=data.file 

149 ) 

150 reporter.update(pct, status, indeterminate=False) 

151 elif event_type == EventType.FILE_DONE and isinstance(data, FileDoneEvent): 

152 reporter.update(0, msg.SYNC_FILE_DONE.format(file=data.file), indeterminate=False) 

153 elif event_type == EventType.EXTRACT and isinstance(data, ExtractEvent): 

154 reporter.update( 

155 0, 

156 msg.SYNC_FILE_PROGRESS.format( 

157 current=data.page, total=data.total_pages, file=data.file 

158 ), 

159 indeterminate=True, 

160 ) 

161 elif event_type == EventType.EMBED and isinstance(data, EmbedEvent): 

162 now = time.monotonic() 

163 if now - last_embed_update < _ADD_EMBED_THROTTLE_SECONDS: 

164 return 

165 last_embed_update = now 

166 pct = int(data.chunk * 100 / data.total_chunks) if data.total_chunks else 0 

167 reporter.update(pct, msg.SYNC_EMBEDDING.format(file=data.file), indeterminate=False) 

168 elif event_type == EventType.DONE and isinstance(data, SyncDoneEvent): 

169 total = data.added + data.updated + data.removed 

170 reporter.update(100, msg.SYNC_STATUS_DONE.format(count=total), indeterminate=False) 

171 

172 return on_progress