Coverage for src / lilbee / cli / tui / screens / chat_helpers.py: 100%
82 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Module-level helpers used by ChatScreen: progress callbacks, file cleanup, stream close."""
3from __future__ import annotations
5import contextlib
6import logging
7import shutil
8import time
9from collections.abc import Callable
10from typing import Any
12from lilbee.cli.tui import messages as msg
13from lilbee.cli.tui.widgets.task_bar_controller import ProgressReporter
14from lilbee.core.config import cfg
15from lilbee.providers.base import ClosableIterator
16from lilbee.runtime.progress import (
17 BatchProgressEvent,
18 BatchStatus,
19 DetailedProgressCallback,
20 EmbedEvent,
21 EventType,
22 ExtractEvent,
23 FileDoneEvent,
24 FileStartEvent,
25 ProgressEvent,
26 SyncDoneEvent,
27)
29log = logging.getLogger(__name__)
31_ADD_EMBED_THROTTLE_SECONDS = 0.15
32"""Throttle EMBED reporter updates to avoid TaskBar update storms.
34The embed worker fires one EmbedEvent per sub-batch, which on a fast
35laptop can be dozens per second. The Task Center only repaints at 10 Hz
36anyway, so we coalesce here at the same cadence.
37"""
40def close_stream(stream: Any) -> None:
41 """Close a streaming iterator if it satisfies the ClosableIterator protocol."""
42 if isinstance(stream, ClosableIterator):
43 with contextlib.suppress(Exception):
44 stream.close()
47def detail_for_batch_progress(data: BatchProgressEvent, in_flight: list[str]) -> str:
48 """Pick the user-facing detail label for a BATCH_PROGRESS tick.
50 Per-page rasterization (vision OCR) is the only producer that uses
51 BatchStatus.RASTERIZING; it emits an absolute path in data.file
52 which never matches the relative source name kept in in_flight, so
53 identity-based detection would never fire. Status-based dispatch is
54 the reliable discriminator between per-page and per-file ticks.
55 """
56 if data.status == BatchStatus.RASTERIZING:
57 return msg.ADD_PAGE_PROGRESS.format(
58 status=data.status.capitalize(), current=data.current, total=data.total
59 )
60 if in_flight:
61 return msg.ADD_SYNCING_FILE.format(file=in_flight[0])
62 return msg.ADD_FILE_DONE.format(file=data.file)
65def remove_copied_files(names: list[str]) -> None:
66 """Delete files previously copied into documents/ by a /add invocation.
68 Called on cancel or failure of the add task so a cancelled file does not
69 re-appear on the next sync. Silently tolerates missing entries;
70 the user may have removed them concurrently, and the goal is just to
71 prevent accidental indexing.
72 """
73 for name in names:
74 target = cfg.documents_dir / name
75 try:
76 if target.is_dir():
77 shutil.rmtree(target, ignore_errors=True)
78 elif target.exists():
79 target.unlink()
80 except OSError:
81 log.debug("Could not remove copied file %s", target, exc_info=True)
84def build_add_progress_callback(reporter: ProgressReporter) -> DetailedProgressCallback:
85 """Build the on_progress callback used by /add.
87 Tracks files in flight in start order so the displayed filename pins
88 to the oldest unfinished file (the pipeline runs files concurrently;
89 without pinning the label flips around the queue). EXTRACT surfaces
90 "extracted N pages" once per file so a 44MB scanned PDF doesn't read
91 as a hang; EMBED ticks per chunk, throttled to a steady cadence.
92 """
93 in_flight: list[str] = []
94 last_embed_update = 0.0
96 def on_progress(event_type: EventType, data: ProgressEvent) -> None:
97 nonlocal last_embed_update
98 reporter.check_cancelled()
99 if event_type == EventType.FILE_START and isinstance(data, FileStartEvent):
100 in_flight.append(data.file)
101 reporter.update(0, msg.ADD_SYNCING_FILE.format(file=in_flight[0]), indeterminate=True)
102 elif event_type == EventType.FILE_DONE and isinstance(data, FileDoneEvent):
103 with contextlib.suppress(ValueError):
104 in_flight.remove(data.file)
105 elif event_type == EventType.BATCH_PROGRESS and isinstance(data, BatchProgressEvent):
106 pct = (data.current / data.total * 100.0) if data.total else 0.0
107 reporter.update(pct, detail_for_batch_progress(data, in_flight), indeterminate=False)
108 elif event_type == EventType.EXTRACT and isinstance(data, ExtractEvent):
109 reporter.update(
110 0,
111 msg.SYNC_FILE_PROGRESS.format(
112 current=data.page, total=data.total_pages, file=data.file
113 ),
114 indeterminate=True,
115 )
116 elif event_type == EventType.EMBED and isinstance(data, EmbedEvent):
117 now = time.monotonic()
118 if now - last_embed_update < _ADD_EMBED_THROTTLE_SECONDS:
119 return
120 last_embed_update = now
121 pct = int(data.chunk * 100 / data.total_chunks) if data.total_chunks else 0
122 reporter.update(pct, msg.SYNC_EMBEDDING.format(file=data.file), indeterminate=False)
124 return on_progress
127def build_sync_progress_callback(
128 reporter: ProgressReporter,
129) -> Callable[[EventType, ProgressEvent], None]:
130 """Return the on_progress shim used by ``_do_sync``.
132 EXTRACT mirrors the /add path: a 44MB scanned PDF needs a per-page
133 tick or the row reads as frozen.
134 """
135 last_embed_update = 0.0
137 def on_progress(event_type: EventType, data: ProgressEvent) -> None:
138 nonlocal last_embed_update
139 # Mirror /add: explicit cancel check on every event so a SYNC task
140 # cancelled mid-batch stops at the next progress tick instead of
141 # finishing the current file. update() also checks, but events
142 # without a reporter.update call (e.g. BATCH_PROGRESS in the
143 # ingest_batch path) would otherwise miss the cooperative checkpoint.
144 reporter.check_cancelled()
145 if event_type == EventType.FILE_START and isinstance(data, FileStartEvent):
146 pct = int((data.current_file - 1) * 100 / data.total_files)
147 status = msg.SYNC_FILE_PROGRESS.format(
148 current=data.current_file, total=data.total_files, file=data.file
149 )
150 reporter.update(pct, status, indeterminate=False)
151 elif event_type == EventType.FILE_DONE and isinstance(data, FileDoneEvent):
152 reporter.update(0, msg.SYNC_FILE_DONE.format(file=data.file), indeterminate=False)
153 elif event_type == EventType.EXTRACT and isinstance(data, ExtractEvent):
154 reporter.update(
155 0,
156 msg.SYNC_FILE_PROGRESS.format(
157 current=data.page, total=data.total_pages, file=data.file
158 ),
159 indeterminate=True,
160 )
161 elif event_type == EventType.EMBED and isinstance(data, EmbedEvent):
162 now = time.monotonic()
163 if now - last_embed_update < _ADD_EMBED_THROTTLE_SECONDS:
164 return
165 last_embed_update = now
166 pct = int(data.chunk * 100 / data.total_chunks) if data.total_chunks else 0
167 reporter.update(pct, msg.SYNC_EMBEDDING.format(file=data.file), indeterminate=False)
168 elif event_type == EventType.DONE and isinstance(data, SyncDoneEvent):
169 total = data.added + data.updated + data.removed
170 reporter.update(100, msg.SYNC_STATUS_DONE.format(count=total), indeterminate=False)
172 return on_progress