Coverage for src / lilbee / cli / sync.py: 100%
91 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Background sync, executor management, and sync status for chat mode."""
3from __future__ import annotations
5import asyncio
6from collections.abc import Callable
7from concurrent.futures import Future, ThreadPoolExecutor
8from typing import TYPE_CHECKING
10from rich.console import Console
12from lilbee.cli import theme
13from lilbee.data.ingest import sync
14from lilbee.runtime.asyncio_loop import is_executor_shutdown
15from lilbee.runtime.progress import (
16 EventType,
17 ExtractEvent,
18 FileStartEvent,
19 ProgressEvent,
20 SyncDoneEvent,
21)
23if TYPE_CHECKING:
24 from lilbee.runtime.progress import DetailedProgressCallback
27def _format_sync_summary(
28 added: int, updated: int, removed: int, failed: int, skipped: int = 0
29) -> str | None:
30 """Format sync counts into a human-readable summary, or None if nothing changed."""
31 counts = {
32 "added": added,
33 "updated": updated,
34 "removed": removed,
35 "skipped": skipped,
36 "failed": failed,
37 }
38 parts = [f"{n} {label}" for label, n in counts.items() if n]
39 return ", ".join(parts) if parts else None
42def _print_file_start(con: Console, data: ProgressEvent) -> None:
43 if not isinstance(data, FileStartEvent):
44 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}")
45 m = theme.MUTED
46 con.print(f"[{m}]Syncing [{data.current_file}/{data.total_files}]: {data.file}[/{m}]")
49def _print_done(con: Console, data: ProgressEvent) -> None:
50 if not isinstance(data, SyncDoneEvent):
51 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}")
52 summary = _format_sync_summary(
53 data.added, data.updated, data.removed, data.failed, data.skipped
54 )
55 if summary:
56 con.print(f"[{theme.MUTED}]Synced: {summary}[/{theme.MUTED}]")
59def _sync_progress_printer(con: Console) -> DetailedProgressCallback:
60 """Return a callback that prints one-line status for FILE_START and DONE events."""
61 handlers: dict[EventType, Callable[[Console, ProgressEvent], None]] = {
62 EventType.FILE_START: _print_file_start,
63 EventType.DONE: _print_done,
64 }
66 def _callback(event_type: EventType, data: ProgressEvent) -> None:
67 handler = handlers.get(event_type)
68 if handler is not None:
69 handler(con, data)
71 return _callback
74_bg_executor: ThreadPoolExecutor | None = None
77def _get_executor() -> ThreadPoolExecutor:
78 """Lazy-init a single-worker executor."""
79 global _bg_executor
80 if _bg_executor is None:
81 _bg_executor = ThreadPoolExecutor(max_workers=1)
82 return _bg_executor
85def shutdown_executor() -> None:
86 """Shut down the background executor without blocking.
87 Uses wait=False + cancel_futures to avoid blocking the main thread.
88 """
89 global _bg_executor
90 if _bg_executor is None:
91 return
93 _bg_executor.shutdown(wait=False, cancel_futures=True)
94 _bg_executor = None
97def _on_sync_done(con: Console, future: Future[object], *, chat_mode: bool = False) -> None:
98 """Callback attached to background sync futures: logs errors."""
99 exc = future.exception()
100 if exc is None:
101 return
102 if isinstance(exc, asyncio.CancelledError):
103 return
104 if is_executor_shutdown(exc):
105 return
106 if chat_mode:
107 print(f"Background sync error: {exc}")
108 else:
109 con.print(f"[{theme.ERROR}]Background sync error:[/{theme.ERROR}] {exc}")
112class SyncStatus:
113 """Thread-safe holder for background sync status text.
114 The background sync callback writes here; prompt_toolkit's
115 ``bottom_toolbar`` reads it on every render cycle: no cursor
116 manipulation, no flickering.
117 """
119 def __init__(self) -> None:
120 self.text: str = ""
121 self.pending: int = 0
123 def clear(self) -> None:
124 self.text = ""
127def _chat_sync_callback(status: SyncStatus) -> DetailedProgressCallback:
128 """Return a progress callback for chat-mode background sync.
129 FILE_START updates *status.text* (rendered by prompt_toolkit's bottom
130 toolbar). On DONE the status is cleared and the summary is printed via
131 ``print()`` (goes through StdoutProxy → appears above the prompt).
132 """
133 status.clear()
135 def _callback(event_type: EventType, data: ProgressEvent) -> None:
136 queue_suffix = f" (+{status.pending} queued)" if status.pending > 0 else ""
137 if event_type == EventType.FILE_START:
138 if not isinstance(data, FileStartEvent):
139 raise TypeError(f"Expected FileStartEvent, got {type(data).__name__}")
140 status.text = (
141 f"⟳ Syncing [{data.current_file}/{data.total_files}]: {data.file}{queue_suffix}"
142 )
143 elif event_type == EventType.EXTRACT:
144 if not isinstance(data, ExtractEvent):
145 raise TypeError(f"Expected ExtractEvent, got {type(data).__name__}")
146 status.text = (
147 f"⟳ Vision OCR [{data.page}/{data.total_pages}]: {data.file}{queue_suffix}"
148 )
149 elif event_type == EventType.DONE:
150 status.clear()
151 if not isinstance(data, SyncDoneEvent):
152 raise TypeError(f"Expected SyncDoneEvent, got {type(data).__name__}")
153 summary = _format_sync_summary(
154 data.added, data.updated, data.removed, data.failed, data.skipped
155 )
156 if summary:
157 print(f"✓ Synced: {summary}")
159 return _callback
162def run_sync_background(
163 con: Console,
164 *,
165 chat_mode: bool = False,
166 sync_status: SyncStatus | None = None,
167) -> Future[object]:
168 """Submit sync to a background thread. Returns the Future."""
169 status = sync_status or SyncStatus()
171 callback = _chat_sync_callback(status) if chat_mode else _sync_progress_printer(con)
173 def _run() -> object:
174 if chat_mode:
175 status.pending -= 1
176 return asyncio.run(sync(quiet=True, on_progress=callback))
178 if chat_mode:
179 status.pending += 1
181 future = _get_executor().submit(_run)
182 future.add_done_callback(lambda f: _on_sync_done(con, f, chat_mode=chat_mode))
183 return future