Coverage for src / lilbee / cli / tui / widgets / task_bar_controller.py: 100%
185 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""TaskBarController and the per-task ProgressReporter."""
3from __future__ import annotations
5import logging
6import threading
7from collections.abc import Callable
8from enum import StrEnum
9from typing import TYPE_CHECKING, Any
11from textual.app import App
13from lilbee.cli.tui import messages as msg
14from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType
15from lilbee.cli.tui.thread_safe import call_from_thread
16from lilbee.crawler import bootstrap_chromium, chromium_installed
17from lilbee.runtime import asyncio_loop
18from lilbee.runtime.cancellation import TaskCancelledError
19from lilbee.runtime.progress import EventType, SetupProgressEvent
21if TYPE_CHECKING:
22 from lilbee.catalog import CatalogModel
24log = logging.getLogger(__name__)
26_DOWNLOAD_CONCURRENCY = 2
27_BYTES_PER_MB = 1024 * 1024
30class TaskOutcome(StrEnum):
31 """How a task terminated. Passed from worker thread to finalizer."""
33 DONE = "done"
34 FAILED = "failed"
35 CANCELLED = "cancelled"
38class ProgressReporter:
39 """Thread-safe handle a worker uses to report progress and check cancellation.
41 The worker only sees this object; it never touches ``self.app``,
42 ``call_from_thread``, or any screen. Writes to the lock-protected
43 ``TaskQueue`` so updates survive any UI navigation.
44 """
46 def __init__(self, controller: TaskBarController, task_id: str) -> None:
47 self._controller = controller
48 self._task_id = task_id
50 @property
51 def task_id(self) -> str:
52 return self._task_id
54 @property
55 def cancelled(self) -> bool:
56 task = self._controller.queue.get_task(self._task_id)
57 return task is not None and task.status == TaskStatus.CANCELLED
59 def check_cancelled(self) -> None:
60 """Raise ``TaskCancelledError`` if the task was cancelled from the UI."""
61 if self.cancelled:
62 raise TaskCancelledError
64 def update(
65 self, progress: float, detail: str = "", *, indeterminate: bool | None = None
66 ) -> None:
67 """Write a progress snapshot to the shared queue.
69 Raises ``TaskCancelledError`` first if the UI cancelled the task, so
70 callers can use ``update`` as both a progress write and a cancel
71 checkpoint.
72 """
73 self.check_cancelled()
74 self._controller.queue.update_task(
75 self._task_id, progress, detail, indeterminate=indeterminate
76 )
79TaskTarget = Callable[[ProgressReporter], None]
82def _chromium_bootstrap_target(reporter: ProgressReporter) -> None:
83 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding.
85 Module-level so ``TaskBarController.ensure_chromium`` stays short and
86 tests can stub the target in isolation.
87 """
89 def _forward(event_type: EventType, data: Any) -> None:
90 if event_type != EventType.SETUP_PROGRESS:
91 return
92 if not isinstance(data, SetupProgressEvent):
93 return
94 total = data.total_bytes or 0
95 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0
96 mb = data.downloaded_bytes // _BYTES_PER_MB
97 if total > 0:
98 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB)
99 else:
100 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb)
101 reporter.update(pct, detail)
103 asyncio_loop.run(bootstrap_chromium(on_progress=_forward))
106class TaskBarController:
107 """App-level owner of the shared TaskQueue + all long-running work.
109 The controller is attached as ``app.task_bar`` during
110 ``LilbeeApp.__init__``. All task lifecycle methods
111 (add/update/complete/fail/cancel) go through here so every ``TaskBar``
112 widget sees the same state, and every long-running op is spawned by
113 this controller: never by a screen that may dismiss mid-flight.
114 """
116 def __init__(self, app: App[Any]) -> None:
117 self.app = app
118 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY})
119 # task_id -> (target, on_success). Worker looks up its target here
120 # so we don't capture in a closure that outlives the task.
121 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {}
122 # Number of files in documents/ that are out of date with the store.
123 # Set by start_detect_pending; read by TaskBar to render the
124 # "N docs to sync · S to sync" hint when no live tasks are running.
125 # Atomic int writes are safe under the GIL; the bar polls at 10 Hz.
126 self.pending_sync_count: int = 0
127 self._detect_thread: threading.Thread | None = None
128 # Roles whose worker is currently in the spawn window (1-3 s cold
129 # start). Surfaced as a single TaskBar hint instead of one toast
130 # per role so the chat screen isn't drowned in implementation
131 # detail on first prompt.
132 self.spawning_roles: set[str] = set()
134 def add_task(
135 self,
136 name: str,
137 task_type: str,
138 fn: Callable[[], None] | None = None,
139 *,
140 indeterminate: bool = False,
141 ) -> str:
142 """Enqueue a task. Returns the new task_id."""
143 return self.queue.enqueue(
144 fn or (lambda: None), name, task_type, indeterminate=indeterminate
145 )
147 def update_task(
148 self,
149 task_id: str,
150 progress: float,
151 detail: str = "",
152 *,
153 indeterminate: bool | None = None,
154 ) -> None:
155 """Update progress and detail text for a task."""
156 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate)
158 def complete_task(self, task_id: str) -> None:
159 """Mark a task done. Row lingers in history until the user clears it."""
160 task_type = self._task_type_of(task_id)
161 self.queue.complete_task(task_id)
162 self._after_done_hooks(task_type)
163 self._advance_all(task_type)
165 def fail_task(self, task_id: str, detail: str = "") -> None:
166 """Mark a task failed. Row lingers in history until the user clears it."""
167 self.queue.fail_task(task_id, detail)
168 self._advance_all(self._task_type_of(task_id))
170 def cancel_task(self, task_id: str) -> None:
171 """Mark a task cancelled. Row lingers in history until the user clears it."""
172 task_type = self._task_type_of(task_id)
173 self.queue.cancel(task_id)
174 self._advance_all(task_type)
176 def _after_done_hooks(self, task_type: str | None) -> None:
177 """Side effects triggered by a DONE completion.
179 Callable from both the direct ``complete_task`` convenience and
180 the worker-thread ``_finalize_task`` path so every success route
181 stays in sync. Does NOT advance the queue; each caller picks the
182 advance strategy that fits its context (``_advance_all`` vs
183 ``_try_start_next``).
184 """
185 if task_type == TaskType.DOWNLOAD.value:
186 self._notify_model_installed()
188 def _task_type_of(self, task_id: str) -> str | None:
189 task = self.queue.get_task(task_id)
190 return task.task_type if task else None
192 def _advance_all(self, task_type: str | None) -> None:
193 """Try to advance the freed type first, then any other idle type."""
194 if task_type:
195 self.queue.advance(task_type)
196 while self.queue.advance() is not None:
197 pass
199 def set_pending_sync(self, count: int) -> None:
200 """Update the pending-sync count surfaced in the TaskBar hint."""
201 self.pending_sync_count = max(count, 0)
203 def clear_pending_sync(self) -> None:
204 """Drop the pending hint. Called when sync starts so the bar shows live progress instead."""
205 self.pending_sync_count = 0
207 def mark_role_spawning(self, role: str) -> None:
208 """Add *role* to the set of workers whose pool process is starting."""
209 self.spawning_roles.add(role)
211 def mark_role_spawned(self, role: str) -> None:
212 """Drop *role* from the spawn-in-progress set; harmless if already absent."""
213 self.spawning_roles.discard(role)
215 def start_detect_pending(self) -> None:
216 """Run the cheap sync-detection (filesystem walk + hash compare) on a daemon thread.
218 Writes the result via ``set_pending_sync``. No-op if a detect job
219 is already running. Errors are logged and silently swallowed: a
220 failed detect just leaves the previous count in place rather
221 than blocking the UI.
222 """
223 if self._detect_thread is not None and self._detect_thread.is_alive():
224 return
225 thread = threading.Thread(
226 target=self._run_detect_pending, daemon=True, name="detect-pending"
227 )
228 self._detect_thread = thread
229 thread.start()
231 def _run_detect_pending(self) -> None:
232 # Local import: lilbee.data.ingest pulls in lancedb + the embedder
233 # transitively; the TUI shouldn't pay for that just to import the
234 # task bar widget.
235 from lilbee.data.ingest import detect_pending
237 try:
238 count = detect_pending()
239 except Exception:
240 log.warning("detect_pending failed", exc_info=True)
241 return
242 self.set_pending_sync(count)
244 def ensure_chromium(self, on_ready: Callable[[], None]) -> None:
245 """Kick off a Chromium bootstrap if missing, then call ``on_ready``.
247 If Chromium is already installed, ``on_ready`` runs immediately on
248 the caller's thread. Otherwise a single SETUP task is enqueued
249 that runs ``bootstrap_chromium``; on success the controller
250 invokes ``on_ready`` on the worker thread via the task's
251 ``on_success`` hook. On failure the SETUP task surfaces as FAILED
252 and ``on_ready`` is NOT called (the follow-up work shouldn't
253 proceed against a missing browser).
255 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain
256 their real work behind the one-time bootstrap.
257 """
258 if chromium_installed():
259 on_ready()
260 return
262 self.start_task(
263 msg.SETUP_CHROMIUM_NAME,
264 TaskType.SETUP,
265 _chromium_bootstrap_target,
266 indeterminate=False,
267 on_success=on_ready,
268 )
270 def start_task(
271 self,
272 name: str,
273 task_type: TaskType,
274 target: TaskTarget,
275 *,
276 indeterminate: bool = False,
277 on_success: Callable[[], None] | None = None,
278 ) -> str:
279 """Enqueue a task, spawn its worker, return task_id.
281 The *target* receives a ``ProgressReporter`` as its only argument.
282 It should periodically call ``reporter.update(percent, detail)`` and
283 may call ``reporter.check_cancelled()`` to cooperatively abort.
285 On success (target returns normally) the queue marks the task DONE
286 and ``on_success`` (if provided) runs after on the same worker
287 thread. On ``TaskCancelledError`` the task is marked CANCELLED. On any
288 other exception the task is marked FAILED with ``str(exc)`` as
289 detail. Rows linger in the Task Center under their final status
290 until the user presses capital ``C`` to clear; the bottom bar
291 flashes the outcome once and then hides when idle.
293 Per-type capacity in ``TaskQueue`` (download=2, everything else=1)
294 controls concurrency: a second sync queues behind the first, but a
295 third download waits until one of the two active downloads finishes.
296 """
297 task_id = self.queue.enqueue(
298 lambda: None, name, task_type.value, indeterminate=indeterminate
299 )
300 self._task_targets[task_id] = (target, on_success)
301 self._try_start_next(task_type.value)
302 return task_id
304 def _try_start_next(self, task_type: str) -> None:
305 """Promote queued tasks of this type into any free capacity slots."""
306 while (task := self.queue.advance(task_type)) is not None:
307 self._spawn_task_worker(task.task_id)
309 def _spawn_task_worker(self, task_id: str) -> None:
310 """Start a daemon thread for the task. Safe to call from any thread."""
311 if task_id not in self._task_targets:
312 return
313 thread = threading.Thread(
314 target=self._run_task_worker,
315 args=(task_id,),
316 daemon=True,
317 name=f"task-{task_id}",
318 )
319 thread.start()
321 def _run_task_worker(self, task_id: str) -> None:
322 """Body of the daemon worker thread."""
323 entry = self._task_targets.get(task_id)
324 if entry is None:
325 return
326 target, on_success = entry
327 task = self.queue.get_task(task_id)
328 task_type = task.task_type if task is not None else None
329 reporter = ProgressReporter(self, task_id)
330 try:
331 target(reporter)
332 except TaskCancelledError:
333 log.info("Task %s cancelled", task_id)
334 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type)
335 except Exception as exc:
336 log.warning("Task %s failed: %s", task_id, exc)
337 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type)
338 else:
339 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type)
340 if on_success is not None:
341 try:
342 on_success()
343 except Exception:
344 log.warning("on_success for %s raised", task_id, exc_info=True)
345 finally:
346 self._task_targets.pop(task_id, None)
348 def _post_finalize(
349 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
350 ) -> None:
351 """Marshal finalization back to the main thread.
353 Main-thread execution matters because ``set_timer`` (used for the
354 flash-then-remove cycle) isn't safe from workers. ``call_from_thread``
355 targets ``self.app``: the App is long-lived; screens are not.
356 """
357 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type)
359 def _finalize_task(
360 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
361 ) -> None:
362 """Mark the queue state, refresh dependents, promote next queued task.
364 Runs on the main thread. Atomically: free the active slot, notify
365 anything downstream that needs a repaint (e.g. model dropdowns
366 after a download lands), and advance the queue. Rows stay in
367 history; the bottom bar flash expires on its own. Users clear
368 finished rows from the Task Center manually.
369 """
370 if outcome is TaskOutcome.DONE:
371 self.queue.complete_task(task_id)
372 self._after_done_hooks(task_type)
373 elif outcome is TaskOutcome.FAILED:
374 self.queue.fail_task(task_id, detail)
375 elif outcome is TaskOutcome.CANCELLED:
376 self.queue.cancel(task_id)
377 if task_type:
378 self._try_start_next(task_type)
380 def _notify_model_installed(self) -> None:
381 """Refresh any ChatScreen's ModelBar so the new model is selectable.
383 The dropdowns are built once on mount from the registry; without
384 this nudge, a freshly-downloaded model only appears after the
385 user reopens the screen. NoMatches and similar query errors are
386 silently skipped so a transient "bar not mounted yet" doesn't
387 crash the finalize path; anything else is logged so a real
388 failure surfaces in debug output.
389 """
390 # Late import to avoid a circular (ChatScreen imports this module).
391 from textual.css.query import QueryError
393 from lilbee.cli.tui.screens.chat import ChatScreen
395 for screen in self.app.screen_stack:
396 # screen_stack is typed Screen[Any]; narrow at runtime to
397 # locate the one screen that owns the ModelBar.
398 if isinstance(screen, ChatScreen):
399 try:
400 screen.refresh_model_bar()
401 except QueryError:
402 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True)
403 break
405 def start_download(self, model: CatalogModel) -> str:
406 """Enqueue a model download and spawn a background worker."""
407 return self.start_task(
408 model.display_name,
409 TaskType.DOWNLOAD,
410 lambda reporter: _download_target(reporter, model),
411 )
414def _download_target(reporter: ProgressReporter, model: CatalogModel) -> None:
415 """``start_task`` target for a HuggingFace model download.
417 Translates ``PermissionError`` into the gated-repo friendly message so
418 every call site (wizard, catalog, chat) gets consistent error UX.
419 """
420 from lilbee.app.models import pull_model_data
421 from lilbee.catalog import DownloadProgress
422 from lilbee.catalog.types import ModelSource
424 def _on_progress(p: DownloadProgress) -> None:
425 reporter.update(p.percent, f"{model.display_name}: {p.detail}")
427 try:
428 pull_model_data(model.ref, ModelSource.NATIVE, on_update=_on_progress)
429 except PermissionError as exc:
430 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc