Coverage for src / lilbee / cli / sync.py: 100%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Background sync, executor management, and sync status for chat mode.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6from collections.abc import Callable 

7from concurrent.futures import Future, ThreadPoolExecutor 

8from typing import TYPE_CHECKING 

9 

10from rich.console import Console 

11 

12from lilbee.cli import theme 

13from lilbee.data.ingest import sync 

14from lilbee.runtime.asyncio_loop import is_executor_shutdown 

15from lilbee.runtime.progress import ( 

16 EventType, 

17 ExtractEvent, 

18 FileStartEvent, 

19 ProgressEvent, 

20 SyncDoneEvent, 

21) 

22 

23if TYPE_CHECKING: 

24 from lilbee.runtime.progress import DetailedProgressCallback 

25 

26 

27def _format_sync_summary( 

28 added: int, updated: int, removed: int, failed: int, skipped: int = 0 

29) -> str | None: 

30 """Format sync counts into a human-readable summary, or None if nothing changed.""" 

31 counts = { 

32 "added": added, 

33 "updated": updated, 

34 "removed": removed, 

35 "skipped": skipped, 

36 "failed": failed, 

37 } 

38 parts = [f"{n} {label}" for label, n in counts.items() if n] 

39 return ", ".join(parts) if parts else None 

40 

41 

42def _print_file_start(con: Console, data: ProgressEvent) -> None: 

43 if not isinstance(data, FileStartEvent): 

44 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}") 

45 m = theme.MUTED 

46 con.print(f"[{m}]Syncing [{data.current_file}/{data.total_files}]: {data.file}[/{m}]") 

47 

48 

49def _print_done(con: Console, data: ProgressEvent) -> None: 

50 if not isinstance(data, SyncDoneEvent): 

51 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}") 

52 summary = _format_sync_summary( 

53 data.added, data.updated, data.removed, data.failed, data.skipped 

54 ) 

55 if summary: 

56 con.print(f"[{theme.MUTED}]Synced: {summary}[/{theme.MUTED}]") 

57 

58 

59def _sync_progress_printer(con: Console) -> DetailedProgressCallback: 

60 """Return a callback that prints one-line status for FILE_START and DONE events.""" 

61 handlers: dict[EventType, Callable[[Console, ProgressEvent], None]] = { 

62 EventType.FILE_START: _print_file_start, 

63 EventType.DONE: _print_done, 

64 } 

65 

66 def _callback(event_type: EventType, data: ProgressEvent) -> None: 

67 handler = handlers.get(event_type) 

68 if handler is not None: 

69 handler(con, data) 

70 

71 return _callback 

72 

73 

74_bg_executor: ThreadPoolExecutor | None = None 

75 

76 

77def _get_executor() -> ThreadPoolExecutor: 

78 """Lazy-init a single-worker executor.""" 

79 global _bg_executor 

80 if _bg_executor is None: 

81 _bg_executor = ThreadPoolExecutor(max_workers=1) 

82 return _bg_executor 

83 

84 

85def shutdown_executor() -> None: 

86 """Shut down the background executor without blocking. 

87 Uses wait=False + cancel_futures to avoid blocking the main thread. 

88 """ 

89 global _bg_executor 

90 if _bg_executor is None: 

91 return 

92 

93 _bg_executor.shutdown(wait=False, cancel_futures=True) 

94 _bg_executor = None 

95 

96 

97def _on_sync_done(con: Console, future: Future[object], *, chat_mode: bool = False) -> None: 

98 """Callback attached to background sync futures: logs errors.""" 

99 exc = future.exception() 

100 if exc is None: 

101 return 

102 if isinstance(exc, asyncio.CancelledError): 

103 return 

104 if is_executor_shutdown(exc): 

105 return 

106 if chat_mode: 

107 print(f"Background sync error: {exc}") 

108 else: 

109 con.print(f"[{theme.ERROR}]Background sync error:[/{theme.ERROR}] {exc}") 

110 

111 

112class SyncStatus: 

113 """Thread-safe holder for background sync status text. 

114 The background sync callback writes here; prompt_toolkit's 

115 ``bottom_toolbar`` reads it on every render cycle: no cursor 

116 manipulation, no flickering. 

117 """ 

118 

119 def __init__(self) -> None: 

120 self.text: str = "" 

121 self.pending: int = 0 

122 

123 def clear(self) -> None: 

124 self.text = "" 

125 

126 

127def _chat_sync_callback(status: SyncStatus) -> DetailedProgressCallback: 

128 """Return a progress callback for chat-mode background sync. 

129 FILE_START updates *status.text* (rendered by prompt_toolkit's bottom 

130 toolbar). On DONE the status is cleared and the summary is printed via 

131 ``print()`` (goes through StdoutProxy → appears above the prompt). 

132 """ 

133 status.clear() 

134 

135 def _callback(event_type: EventType, data: ProgressEvent) -> None: 

136 queue_suffix = f" (+{status.pending} queued)" if status.pending > 0 else "" 

137 if event_type == EventType.FILE_START: 

138 if not isinstance(data, FileStartEvent): 

139 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}") 

140 status.text = ( 

141 f"⟳ Syncing [{data.current_file}/{data.total_files}]: {data.file}{queue_suffix}" 

142 ) 

143 elif event_type == EventType.EXTRACT: 

144 if not isinstance(data, ExtractEvent): 

145 raise TypeError(f"Expected ExtractEvent, got {type(data).__name__}") 

146 status.text = ( 

147 f"⟳ Vision OCR [{data.page}/{data.total_pages}]: {data.file}{queue_suffix}" 

148 ) 

149 elif event_type == EventType.DONE: 

150 status.clear() 

151 if not isinstance(data, SyncDoneEvent): 

152 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}") 

153 summary = _format_sync_summary( 

154 data.added, data.updated, data.removed, data.failed, data.skipped 

155 ) 

156 if summary: 

157 print(f"✓ Synced: {summary}") 

158 

159 return _callback 

160 

161 

162def run_sync_background( 

163 con: Console, 

164 *, 

165 chat_mode: bool = False, 

166 sync_status: SyncStatus | None = None, 

167) -> Future[object]: 

168 """Submit sync to a background thread. Returns the Future.""" 

169 status = sync_status or SyncStatus() 

170 

171 callback = _chat_sync_callback(status) if chat_mode else _sync_progress_printer(con) 

172 

173 def _run() -> object: 

174 if chat_mode: 

175 status.pending -= 1 

176 return asyncio.run(sync(quiet=True, on_progress=callback)) 

177 

178 if chat_mode: 

179 status.pending += 1 

180 

181 future = _get_executor().submit(_run) 

182 future.add_done_callback(lambda f: _on_sync_done(con, f, chat_mode=chat_mode)) 

183 return future