Coverage for src / lilbee / cli / tui / widgets / task_bar_controller.py: 100%
197 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-28 01:01 +0000
1"""TaskBarController and the per-task ProgressReporter."""
3from __future__ import annotations
5import logging
6import threading
7from collections.abc import Callable
8from enum import StrEnum
9from typing import TYPE_CHECKING, Any
11from textual.app import App
13from lilbee.catalog.formatting import download_task_name
14from lilbee.cli.tui import messages as msg
15from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType
16from lilbee.cli.tui.thread_safe import call_from_thread
17from lilbee.crawler import bootstrap_chromium, chromium_installed
18from lilbee.runtime import asyncio_loop
19from lilbee.runtime.cancellation import TaskCancelledError
20from lilbee.runtime.progress import EventType, SetupProgressEvent
22if TYPE_CHECKING:
23 from lilbee.catalog import CatalogModel
25log = logging.getLogger(__name__)
27_DOWNLOAD_CONCURRENCY = 2
28_BYTES_PER_MB = 1024 * 1024
31class TaskOutcome(StrEnum):
32 """How a task terminated. Passed from worker thread to finalizer."""
34 DONE = "done"
35 FAILED = "failed"
36 CANCELLED = "cancelled"
39class ProgressReporter:
40 """Thread-safe handle a worker uses to report progress and check cancellation.
42 The worker only sees this object; it never touches ``self.app``,
43 ``call_from_thread``, or any screen. Writes to the lock-protected
44 ``TaskQueue`` so updates survive any UI navigation.
45 """
47 def __init__(self, controller: TaskBarController, task_id: str) -> None:
48 self._controller = controller
49 self._task_id = task_id
51 @property
52 def task_id(self) -> str:
53 return self._task_id
55 @property
56 def cancelled(self) -> bool:
57 task = self._controller.queue.get_task(self._task_id)
58 return task is not None and task.status == TaskStatus.CANCELLED
60 def check_cancelled(self) -> None:
61 """Raise ``TaskCancelledError`` if the task was cancelled from the UI."""
62 if self.cancelled:
63 raise TaskCancelledError
65 def update(
66 self, progress: float, detail: str = "", *, indeterminate: bool | None = None
67 ) -> None:
68 """Write a progress snapshot to the shared queue.
70 Raises ``TaskCancelledError`` first if the UI cancelled the task, so
71 callers can use ``update`` as both a progress write and a cancel
72 checkpoint.
73 """
74 self.check_cancelled()
75 self._controller.queue.update_task(
76 self._task_id, progress, detail, indeterminate=indeterminate
77 )
80TaskTarget = Callable[[ProgressReporter], None]
83def _chromium_bootstrap_target(reporter: ProgressReporter) -> None:
84 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding.
86 Module-level so ``TaskBarController.ensure_chromium`` stays short and
87 tests can stub the target in isolation.
88 """
90 def _forward(event_type: EventType, data: Any) -> None:
91 if event_type != EventType.SETUP_PROGRESS:
92 return
93 if not isinstance(data, SetupProgressEvent):
94 return
95 total = data.total_bytes or 0
96 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0
97 mb = data.downloaded_bytes // _BYTES_PER_MB
98 if total > 0:
99 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB)
100 else:
101 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb)
102 reporter.update(pct, detail)
104 asyncio_loop.run(bootstrap_chromium(on_progress=_forward))
107class TaskBarController:
108 """App-level owner of the shared TaskQueue + all long-running work.
110 The controller is attached as ``app.task_bar`` during
111 ``LilbeeApp.__init__``. All task lifecycle methods
112 (add/update/complete/fail/cancel) go through here so every ``TaskBar``
113 widget sees the same state, and every long-running op is spawned by
114 this controller: never by a screen that may dismiss mid-flight.
115 """
117 def __init__(self, app: App[Any]) -> None:
118 self.app = app
119 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY})
120 # task_id -> (target, on_success). Worker looks up its target here
121 # so we don't capture in a closure that outlives the task.
122 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {}
123 # Number of files in documents/ that are out of date with the store.
124 # Set by start_detect_pending; read by TaskBar to render the
125 # "N docs to sync · S to sync" hint when no live tasks are running.
126 # Atomic int writes are safe under the GIL; the bar polls at 10 Hz.
127 self.pending_sync_count: int = 0
128 self._detect_thread: threading.Thread | None = None
129 # Roles whose worker is currently in the spawn window (1-3 s cold
130 # start). Surfaced as a single TaskBar hint instead of one toast
131 # per role so the chat screen isn't drowned in implementation
132 # detail on first prompt.
133 self.spawning_roles: set[str] = set()
135 def add_task(
136 self,
137 name: str,
138 task_type: str,
139 fn: Callable[[], None] | None = None,
140 *,
141 indeterminate: bool = False,
142 ) -> str:
143 """Enqueue a task. Returns the new task_id."""
144 return self.queue.enqueue(
145 fn or (lambda: None), name, task_type, indeterminate=indeterminate
146 )
148 def update_task(
149 self,
150 task_id: str,
151 progress: float,
152 detail: str = "",
153 *,
154 indeterminate: bool | None = None,
155 ) -> None:
156 """Update progress and detail text for a task."""
157 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate)
159 def complete_task(self, task_id: str) -> None:
160 """Mark a task done. Row lingers in history until the user clears it."""
161 task_type = self._task_type_of(task_id)
162 self.queue.complete_task(task_id)
163 self._after_done_hooks(task_type)
164 self._advance_all(task_type)
166 def fail_task(self, task_id: str, detail: str = "") -> None:
167 """Mark a task failed. Row lingers in history until the user clears it."""
168 self.queue.fail_task(task_id, detail)
169 self._advance_all(self._task_type_of(task_id))
171 def cancel_task(self, task_id: str) -> None:
172 """Mark a task cancelled. Row lingers in history until the user clears it."""
173 task_type = self._task_type_of(task_id)
174 self.queue.cancel(task_id)
175 self._advance_all(task_type)
177 def _after_done_hooks(self, task_type: str | None) -> None:
178 """Side effects triggered by a DONE completion.
180 Callable from both the direct ``complete_task`` convenience and
181 the worker-thread ``_finalize_task`` path so every success route
182 stays in sync. Does NOT advance the queue; each caller picks the
183 advance strategy that fits its context (``_advance_all`` vs
184 ``_try_start_next``).
185 """
186 if task_type == TaskType.DOWNLOAD.value:
187 self._notify_model_installed()
189 def _task_type_of(self, task_id: str) -> str | None:
190 task = self.queue.get_task(task_id)
191 return task.task_type if task else None
193 def _advance_all(self, task_type: str | None) -> None:
194 """Try to advance the freed type first, then any other idle type."""
195 if task_type:
196 self.queue.advance(task_type)
197 while self.queue.advance() is not None:
198 pass
200 def downloading_label_for(self, ref: str) -> str | None:
201 """Return the task name if *ref*'s download is queued or active, else None.
203 ``ref`` is a model reference (catalog repo id or native GGUF
204 ref); the helper maps it to the canonical
205 :attr:`CatalogModel.display_name` and matches against
206 in-flight DOWNLOAD tasks. The returned label is suitable for
207 embedding in a user-facing toast.
208 """
209 label = download_task_name(ref)
210 if not label:
211 return None
212 for task in self.queue.active_tasks + self.queue.queued_tasks:
213 if task.task_type == TaskType.DOWNLOAD.value and task.name == label:
214 return task.name
215 return None
217 def set_pending_sync(self, count: int) -> None:
218 """Update the pending-sync count surfaced in the TaskBar hint."""
219 self.pending_sync_count = max(count, 0)
221 def clear_pending_sync(self) -> None:
222 """Drop the pending hint. Called when sync starts so the bar shows live progress instead."""
223 self.pending_sync_count = 0
225 def mark_role_spawning(self, role: str) -> None:
226 """Add *role* to the set of workers whose pool process is starting."""
227 self.spawning_roles.add(role)
229 def mark_role_spawned(self, role: str) -> None:
230 """Drop *role* from the spawn-in-progress set; harmless if already absent."""
231 self.spawning_roles.discard(role)
233 def start_detect_pending(self) -> None:
234 """Run the cheap sync-detection (filesystem walk + hash compare) on a daemon thread.
236 Writes the result via ``set_pending_sync``. No-op if a detect job
237 is already running. Errors are logged and silently swallowed: a
238 failed detect just leaves the previous count in place rather
239 than blocking the UI.
240 """
241 if self._detect_thread is not None and self._detect_thread.is_alive():
242 return
243 thread = threading.Thread(
244 target=self._run_detect_pending, daemon=True, name="detect-pending"
245 )
246 self._detect_thread = thread
247 thread.start()
249 def _run_detect_pending(self) -> None:
250 # Local import: lilbee.data.ingest pulls in lancedb + the embedder
251 # transitively; the TUI shouldn't pay for that just to import the
252 # task bar widget.
253 from lilbee.data.ingest import detect_pending
255 try:
256 count = detect_pending()
257 except Exception:
258 log.warning("detect_pending failed", exc_info=True)
259 return
260 self.set_pending_sync(count)
262 def ensure_chromium(self, on_ready: Callable[[], None]) -> None:
263 """Kick off a Chromium bootstrap if missing, then call ``on_ready``.
265 If Chromium is already installed, ``on_ready`` runs immediately on
266 the caller's thread. Otherwise a single SETUP task is enqueued
267 that runs ``bootstrap_chromium``; on success the controller
268 invokes ``on_ready`` on the worker thread via the task's
269 ``on_success`` hook. On failure the SETUP task surfaces as FAILED
270 and ``on_ready`` is NOT called (the follow-up work shouldn't
271 proceed against a missing browser).
273 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain
274 their real work behind the one-time bootstrap.
275 """
276 if chromium_installed():
277 on_ready()
278 return
280 self.start_task(
281 msg.SETUP_CHROMIUM_NAME,
282 TaskType.SETUP,
283 _chromium_bootstrap_target,
284 indeterminate=False,
285 on_success=on_ready,
286 )
288 def start_task(
289 self,
290 name: str,
291 task_type: TaskType,
292 target: TaskTarget,
293 *,
294 indeterminate: bool = False,
295 on_success: Callable[[], None] | None = None,
296 ) -> str:
297 """Enqueue a task, spawn its worker, return task_id.
299 The *target* receives a ``ProgressReporter`` as its only argument.
300 It should periodically call ``reporter.update(percent, detail)`` and
301 may call ``reporter.check_cancelled()`` to cooperatively abort.
303 On success (target returns normally) the queue marks the task DONE
304 and ``on_success`` (if provided) runs after on the same worker
305 thread. On ``TaskCancelledError`` the task is marked CANCELLED. On any
306 other exception the task is marked FAILED with ``str(exc)`` as
307 detail. Rows linger in the Task Center under their final status
308 until the user presses capital ``C`` to clear; the bottom bar
309 flashes the outcome once and then hides when idle.
311 Per-type capacity in ``TaskQueue`` (download=2, everything else=1)
312 controls concurrency: a second sync queues behind the first, but a
313 third download waits until one of the two active downloads finishes.
314 """
315 task_id = self.queue.enqueue(
316 lambda: None, name, task_type.value, indeterminate=indeterminate
317 )
318 self._task_targets[task_id] = (target, on_success)
319 self._try_start_next(task_type.value)
320 return task_id
322 def _try_start_next(self, task_type: str) -> None:
323 """Promote queued tasks of this type into any free capacity slots."""
324 while (task := self.queue.advance(task_type)) is not None:
325 self._spawn_task_worker(task.task_id)
327 def _spawn_task_worker(self, task_id: str) -> None:
328 """Start a daemon thread for the task. Safe to call from any thread."""
329 if task_id not in self._task_targets:
330 return
331 thread = threading.Thread(
332 target=self._run_task_worker,
333 args=(task_id,),
334 daemon=True,
335 name=f"task-{task_id}",
336 )
337 thread.start()
339 def _run_task_worker(self, task_id: str) -> None:
340 """Body of the daemon worker thread."""
341 entry = self._task_targets.get(task_id)
342 if entry is None:
343 return
344 target, on_success = entry
345 task = self.queue.get_task(task_id)
346 task_type = task.task_type if task is not None else None
347 reporter = ProgressReporter(self, task_id)
348 try:
349 target(reporter)
350 except TaskCancelledError:
351 log.info("Task %s cancelled", task_id)
352 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type)
353 except Exception as exc:
354 log.warning("Task %s failed: %s", task_id, exc)
355 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type)
356 else:
357 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type)
358 if on_success is not None:
359 try:
360 on_success()
361 except Exception:
362 log.warning("on_success for %s raised", task_id, exc_info=True)
363 finally:
364 self._task_targets.pop(task_id, None)
366 def _post_finalize(
367 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
368 ) -> None:
369 """Marshal finalization back to the main thread.
371 Main-thread execution matters because ``set_timer`` (used for the
372 flash-then-remove cycle) isn't safe from workers. ``call_from_thread``
373 targets ``self.app``: the App is long-lived; screens are not.
374 """
375 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type)
377 def _finalize_task(
378 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None
379 ) -> None:
380 """Mark the queue state, refresh dependents, promote next queued task.
382 Runs on the main thread. Atomically: free the active slot, notify
383 anything downstream that needs a repaint (e.g. model dropdowns
384 after a download lands), and advance the queue. Rows stay in
385 history; the bottom bar flash expires on its own. Users clear
386 finished rows from the Task Center manually.
387 """
388 if outcome is TaskOutcome.DONE:
389 self.queue.complete_task(task_id)
390 self._after_done_hooks(task_type)
391 elif outcome is TaskOutcome.FAILED:
392 self.queue.fail_task(task_id, detail)
393 elif outcome is TaskOutcome.CANCELLED:
394 self.queue.cancel(task_id)
395 if task_type:
396 self._try_start_next(task_type)
398 def _notify_model_installed(self) -> None:
399 """Refresh any ChatScreen's ModelBar so the new model is selectable.
401 The dropdowns are built once on mount from the registry; without
402 this nudge, a freshly-downloaded model only appears after the
403 user reopens the screen. NoMatches and similar query errors are
404 silently skipped so a transient "bar not mounted yet" doesn't
405 crash the finalize path; anything else is logged so a real
406 failure surfaces in debug output.
407 """
408 # Late import to avoid a circular (ChatScreen imports this module).
409 from textual.css.query import QueryError
411 from lilbee.cli.tui.screens.chat import ChatScreen
413 for screen in self.app.screen_stack:
414 # screen_stack is typed Screen[Any]; narrow at runtime to
415 # locate the one screen that owns the ModelBar.
416 if isinstance(screen, ChatScreen):
417 try:
418 screen.refresh_model_bar()
419 except QueryError:
420 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True)
421 break
423 def start_download(
424 self,
425 model: CatalogModel,
426 *,
427 allow_unsupported: bool = False,
428 on_success: Callable[[], None] | None = None,
429 ) -> str:
430 """Enqueue a download; ``on_success`` runs on the worker thread once the file is on disk."""
431 return self.start_task(
432 model.display_name,
433 TaskType.DOWNLOAD,
434 lambda reporter: _download_target(reporter, model, allow_unsupported=allow_unsupported),
435 on_success=on_success,
436 )
439def _download_target(
440 reporter: ProgressReporter, model: CatalogModel, *, allow_unsupported: bool = False
441) -> None:
442 """``start_task`` target for a HuggingFace model download.
444 Translates ``PermissionError`` into the gated-repo friendly message and
445 ``UnsupportedArchError`` into a user-facing arch-mismatch message so
446 every call site gets consistent error UX.
447 """
448 from lilbee.app.models import pull_model_data
449 from lilbee.catalog import DownloadProgress
450 from lilbee.catalog.compat import UnsupportedArchError
451 from lilbee.catalog.types import ModelSource
453 def _on_progress(p: DownloadProgress) -> None:
454 reporter.update(p.percent, f"{model.display_name}: {p.detail}")
456 try:
457 pull_model_data(
458 model.ref,
459 ModelSource.NATIVE,
460 on_update=_on_progress,
461 allow_unsupported=allow_unsupported,
462 )
463 except PermissionError as exc:
464 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc
465 except UnsupportedArchError as exc:
466 raise RuntimeError(
467 f"Architecture {exc.architecture!r} not supported by this lilbee build."
468 ) from exc