Coverage for src / lilbee / cli / tui / widgets / task_bar_controller.py: 100%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-28 01:01 +0000

1"""TaskBarController and the per-task ProgressReporter.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from collections.abc import Callable 

8from enum import StrEnum 

9from typing import TYPE_CHECKING, Any 

10 

11from textual.app import App 

12 

13from lilbee.catalog.formatting import download_task_name 

14from lilbee.cli.tui import messages as msg 

15from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType 

16from lilbee.cli.tui.thread_safe import call_from_thread 

17from lilbee.crawler import bootstrap_chromium, chromium_installed 

18from lilbee.runtime import asyncio_loop 

19from lilbee.runtime.cancellation import TaskCancelledError 

20from lilbee.runtime.progress import EventType, SetupProgressEvent 

21 

22if TYPE_CHECKING: 

23 from lilbee.catalog import CatalogModel 

24 

25log = logging.getLogger(__name__) 

26 

27_DOWNLOAD_CONCURRENCY = 2 

28_BYTES_PER_MB = 1024 * 1024 

29 

30 

31class TaskOutcome(StrEnum): 

32 """How a task terminated. Passed from worker thread to finalizer.""" 

33 

34 DONE = "done" 

35 FAILED = "failed" 

36 CANCELLED = "cancelled" 

37 

38 

39class ProgressReporter: 

40 """Thread-safe handle a worker uses to report progress and check cancellation. 

41 

42 The worker only sees this object; it never touches ``self.app``, 

43 ``call_from_thread``, or any screen. Writes to the lock-protected 

44 ``TaskQueue`` so updates survive any UI navigation. 

45 """ 

46 

47 def __init__(self, controller: TaskBarController, task_id: str) -> None: 

48 self._controller = controller 

49 self._task_id = task_id 

50 

51 @property 

52 def task_id(self) -> str: 

53 return self._task_id 

54 

55 @property 

56 def cancelled(self) -> bool: 

57 task = self._controller.queue.get_task(self._task_id) 

58 return task is not None and task.status == TaskStatus.CANCELLED 

59 

60 def check_cancelled(self) -> None: 

61 """Raise ``TaskCancelledError`` if the task was cancelled from the UI.""" 

62 if self.cancelled: 

63 raise TaskCancelledError 

64 

65 def update( 

66 self, progress: float, detail: str = "", *, indeterminate: bool | None = None 

67 ) -> None: 

68 """Write a progress snapshot to the shared queue. 

69 

70 Raises ``TaskCancelledError`` first if the UI cancelled the task, so 

71 callers can use ``update`` as both a progress write and a cancel 

72 checkpoint. 

73 """ 

74 self.check_cancelled() 

75 self._controller.queue.update_task( 

76 self._task_id, progress, detail, indeterminate=indeterminate 

77 ) 

78 

79 

80TaskTarget = Callable[[ProgressReporter], None] 

81 

82 

83def _chromium_bootstrap_target(reporter: ProgressReporter) -> None: 

84 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding. 

85 

86 Module-level so ``TaskBarController.ensure_chromium`` stays short and 

87 tests can stub the target in isolation. 

88 """ 

89 

90 def _forward(event_type: EventType, data: Any) -> None: 

91 if event_type != EventType.SETUP_PROGRESS: 

92 return 

93 if not isinstance(data, SetupProgressEvent): 

94 return 

95 total = data.total_bytes or 0 

96 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0 

97 mb = data.downloaded_bytes // _BYTES_PER_MB 

98 if total > 0: 

99 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB) 

100 else: 

101 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb) 

102 reporter.update(pct, detail) 

103 

104 asyncio_loop.run(bootstrap_chromium(on_progress=_forward)) 

105 

106 

107class TaskBarController: 

108 """App-level owner of the shared TaskQueue + all long-running work. 

109 

110 The controller is attached as ``app.task_bar`` during 

111 ``LilbeeApp.__init__``. All task lifecycle methods 

112 (add/update/complete/fail/cancel) go through here so every ``TaskBar`` 

113 widget sees the same state, and every long-running op is spawned by 

114 this controller: never by a screen that may dismiss mid-flight. 

115 """ 

116 

117 def __init__(self, app: App[Any]) -> None: 

118 self.app = app 

119 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY}) 

120 # task_id -> (target, on_success). Worker looks up its target here 

121 # so we don't capture in a closure that outlives the task. 

122 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {} 

123 # Number of files in documents/ that are out of date with the store. 

124 # Set by start_detect_pending; read by TaskBar to render the 

125 # "N docs to sync · S to sync" hint when no live tasks are running. 

126 # Atomic int writes are safe under the GIL; the bar polls at 10 Hz. 

127 self.pending_sync_count: int = 0 

128 self._detect_thread: threading.Thread | None = None 

129 # Roles whose worker is currently in the spawn window (1-3 s cold 

130 # start). Surfaced as a single TaskBar hint instead of one toast 

131 # per role so the chat screen isn't drowned in implementation 

132 # detail on first prompt. 

133 self.spawning_roles: set[str] = set() 

134 

135 def add_task( 

136 self, 

137 name: str, 

138 task_type: str, 

139 fn: Callable[[], None] | None = None, 

140 *, 

141 indeterminate: bool = False, 

142 ) -> str: 

143 """Enqueue a task. Returns the new task_id.""" 

144 return self.queue.enqueue( 

145 fn or (lambda: None), name, task_type, indeterminate=indeterminate 

146 ) 

147 

148 def update_task( 

149 self, 

150 task_id: str, 

151 progress: float, 

152 detail: str = "", 

153 *, 

154 indeterminate: bool | None = None, 

155 ) -> None: 

156 """Update progress and detail text for a task.""" 

157 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate) 

158 

159 def complete_task(self, task_id: str) -> None: 

160 """Mark a task done. Row lingers in history until the user clears it.""" 

161 task_type = self._task_type_of(task_id) 

162 self.queue.complete_task(task_id) 

163 self._after_done_hooks(task_type) 

164 self._advance_all(task_type) 

165 

166 def fail_task(self, task_id: str, detail: str = "") -> None: 

167 """Mark a task failed. Row lingers in history until the user clears it.""" 

168 self.queue.fail_task(task_id, detail) 

169 self._advance_all(self._task_type_of(task_id)) 

170 

171 def cancel_task(self, task_id: str) -> None: 

172 """Mark a task cancelled. Row lingers in history until the user clears it.""" 

173 task_type = self._task_type_of(task_id) 

174 self.queue.cancel(task_id) 

175 self._advance_all(task_type) 

176 

177 def _after_done_hooks(self, task_type: str | None) -> None: 

178 """Side effects triggered by a DONE completion. 

179 

180 Callable from both the direct ``complete_task`` convenience and 

181 the worker-thread ``_finalize_task`` path so every success route 

182 stays in sync. Does NOT advance the queue; each caller picks the 

183 advance strategy that fits its context (``_advance_all`` vs 

184 ``_try_start_next``). 

185 """ 

186 if task_type == TaskType.DOWNLOAD.value: 

187 self._notify_model_installed() 

188 

189 def _task_type_of(self, task_id: str) -> str | None: 

190 task = self.queue.get_task(task_id) 

191 return task.task_type if task else None 

192 

193 def _advance_all(self, task_type: str | None) -> None: 

194 """Try to advance the freed type first, then any other idle type.""" 

195 if task_type: 

196 self.queue.advance(task_type) 

197 while self.queue.advance() is not None: 

198 pass 

199 

200 def downloading_label_for(self, ref: str) -> str | None: 

201 """Return the task name if *ref*'s download is queued or active, else None. 

202 

203 ``ref`` is a model reference (catalog repo id or native GGUF 

204 ref); the helper maps it to the canonical 

205 :attr:`CatalogModel.display_name` and matches against 

206 in-flight DOWNLOAD tasks. The returned label is suitable for 

207 embedding in a user-facing toast. 

208 """ 

209 label = download_task_name(ref) 

210 if not label: 

211 return None 

212 for task in self.queue.active_tasks + self.queue.queued_tasks: 

213 if task.task_type == TaskType.DOWNLOAD.value and task.name == label: 

214 return task.name 

215 return None 

216 

217 def set_pending_sync(self, count: int) -> None: 

218 """Update the pending-sync count surfaced in the TaskBar hint.""" 

219 self.pending_sync_count = max(count, 0) 

220 

221 def clear_pending_sync(self) -> None: 

222 """Drop the pending hint. Called when sync starts so the bar shows live progress instead.""" 

223 self.pending_sync_count = 0 

224 

225 def mark_role_spawning(self, role: str) -> None: 

226 """Add *role* to the set of workers whose pool process is starting.""" 

227 self.spawning_roles.add(role) 

228 

229 def mark_role_spawned(self, role: str) -> None: 

230 """Drop *role* from the spawn-in-progress set; harmless if already absent.""" 

231 self.spawning_roles.discard(role) 

232 

233 def start_detect_pending(self) -> None: 

234 """Run the cheap sync-detection (filesystem walk + hash compare) on a daemon thread. 

235 

236 Writes the result via ``set_pending_sync``. No-op if a detect job 

237 is already running. Errors are logged and silently swallowed: a 

238 failed detect just leaves the previous count in place rather 

239 than blocking the UI. 

240 """ 

241 if self._detect_thread is not None and self._detect_thread.is_alive(): 

242 return 

243 thread = threading.Thread( 

244 target=self._run_detect_pending, daemon=True, name="detect-pending" 

245 ) 

246 self._detect_thread = thread 

247 thread.start() 

248 

249 def _run_detect_pending(self) -> None: 

250 # Local import: lilbee.data.ingest pulls in lancedb + the embedder 

251 # transitively; the TUI shouldn't pay for that just to import the 

252 # task bar widget. 

253 from lilbee.data.ingest import detect_pending 

254 

255 try: 

256 count = detect_pending() 

257 except Exception: 

258 log.warning("detect_pending failed", exc_info=True) 

259 return 

260 self.set_pending_sync(count) 

261 

262 def ensure_chromium(self, on_ready: Callable[[], None]) -> None: 

263 """Kick off a Chromium bootstrap if missing, then call ``on_ready``. 

264 

265 If Chromium is already installed, ``on_ready`` runs immediately on 

266 the caller's thread. Otherwise a single SETUP task is enqueued 

267 that runs ``bootstrap_chromium``; on success the controller 

268 invokes ``on_ready`` on the worker thread via the task's 

269 ``on_success`` hook. On failure the SETUP task surfaces as FAILED 

270 and ``on_ready`` is NOT called (the follow-up work shouldn't 

271 proceed against a missing browser). 

272 

273 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain 

274 their real work behind the one-time bootstrap. 

275 """ 

276 if chromium_installed(): 

277 on_ready() 

278 return 

279 

280 self.start_task( 

281 msg.SETUP_CHROMIUM_NAME, 

282 TaskType.SETUP, 

283 _chromium_bootstrap_target, 

284 indeterminate=False, 

285 on_success=on_ready, 

286 ) 

287 

288 def start_task( 

289 self, 

290 name: str, 

291 task_type: TaskType, 

292 target: TaskTarget, 

293 *, 

294 indeterminate: bool = False, 

295 on_success: Callable[[], None] | None = None, 

296 ) -> str: 

297 """Enqueue a task, spawn its worker, return task_id. 

298 

299 The *target* receives a ``ProgressReporter`` as its only argument. 

300 It should periodically call ``reporter.update(percent, detail)`` and 

301 may call ``reporter.check_cancelled()`` to cooperatively abort. 

302 

303 On success (target returns normally) the queue marks the task DONE 

304 and ``on_success`` (if provided) runs after on the same worker 

305 thread. On ``TaskCancelledError`` the task is marked CANCELLED. On any 

306 other exception the task is marked FAILED with ``str(exc)`` as 

307 detail. Rows linger in the Task Center under their final status 

308 until the user presses capital ``C`` to clear; the bottom bar 

309 flashes the outcome once and then hides when idle. 

310 

311 Per-type capacity in ``TaskQueue`` (download=2, everything else=1) 

312 controls concurrency: a second sync queues behind the first, but a 

313 third download waits until one of the two active downloads finishes. 

314 """ 

315 task_id = self.queue.enqueue( 

316 lambda: None, name, task_type.value, indeterminate=indeterminate 

317 ) 

318 self._task_targets[task_id] = (target, on_success) 

319 self._try_start_next(task_type.value) 

320 return task_id 

321 

322 def _try_start_next(self, task_type: str) -> None: 

323 """Promote queued tasks of this type into any free capacity slots.""" 

324 while (task := self.queue.advance(task_type)) is not None: 

325 self._spawn_task_worker(task.task_id) 

326 

327 def _spawn_task_worker(self, task_id: str) -> None: 

328 """Start a daemon thread for the task. Safe to call from any thread.""" 

329 if task_id not in self._task_targets: 

330 return 

331 thread = threading.Thread( 

332 target=self._run_task_worker, 

333 args=(task_id,), 

334 daemon=True, 

335 name=f"task-{task_id}", 

336 ) 

337 thread.start() 

338 

339 def _run_task_worker(self, task_id: str) -> None: 

340 """Body of the daemon worker thread.""" 

341 entry = self._task_targets.get(task_id) 

342 if entry is None: 

343 return 

344 target, on_success = entry 

345 task = self.queue.get_task(task_id) 

346 task_type = task.task_type if task is not None else None 

347 reporter = ProgressReporter(self, task_id) 

348 try: 

349 target(reporter) 

350 except TaskCancelledError: 

351 log.info("Task %s cancelled", task_id) 

352 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type) 

353 except Exception as exc: 

354 log.warning("Task %s failed: %s", task_id, exc) 

355 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type) 

356 else: 

357 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type) 

358 if on_success is not None: 

359 try: 

360 on_success() 

361 except Exception: 

362 log.warning("on_success for %s raised", task_id, exc_info=True) 

363 finally: 

364 self._task_targets.pop(task_id, None) 

365 

366 def _post_finalize( 

367 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

368 ) -> None: 

369 """Marshal finalization back to the main thread. 

370 

371 Main-thread execution matters because ``set_timer`` (used for the 

372 flash-then-remove cycle) isn't safe from workers. ``call_from_thread`` 

373 targets ``self.app``: the App is long-lived; screens are not. 

374 """ 

375 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type) 

376 

377 def _finalize_task( 

378 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

379 ) -> None: 

380 """Mark the queue state, refresh dependents, promote next queued task. 

381 

382 Runs on the main thread. Atomically: free the active slot, notify 

383 anything downstream that needs a repaint (e.g. model dropdowns 

384 after a download lands), and advance the queue. Rows stay in 

385 history; the bottom bar flash expires on its own. Users clear 

386 finished rows from the Task Center manually. 

387 """ 

388 if outcome is TaskOutcome.DONE: 

389 self.queue.complete_task(task_id) 

390 self._after_done_hooks(task_type) 

391 elif outcome is TaskOutcome.FAILED: 

392 self.queue.fail_task(task_id, detail) 

393 elif outcome is TaskOutcome.CANCELLED: 

394 self.queue.cancel(task_id) 

395 if task_type: 

396 self._try_start_next(task_type) 

397 

398 def _notify_model_installed(self) -> None: 

399 """Refresh any ChatScreen's ModelBar so the new model is selectable. 

400 

401 The dropdowns are built once on mount from the registry; without 

402 this nudge, a freshly-downloaded model only appears after the 

403 user reopens the screen. NoMatches and similar query errors are 

404 silently skipped so a transient "bar not mounted yet" doesn't 

405 crash the finalize path; anything else is logged so a real 

406 failure surfaces in debug output. 

407 """ 

408 # Late import to avoid a circular (ChatScreen imports this module). 

409 from textual.css.query import QueryError 

410 

411 from lilbee.cli.tui.screens.chat import ChatScreen 

412 

413 for screen in self.app.screen_stack: 

414 # screen_stack is typed Screen[Any]; narrow at runtime to 

415 # locate the one screen that owns the ModelBar. 

416 if isinstance(screen, ChatScreen): 

417 try: 

418 screen.refresh_model_bar() 

419 except QueryError: 

420 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True) 

421 break 

422 

423 def start_download( 

424 self, 

425 model: CatalogModel, 

426 *, 

427 allow_unsupported: bool = False, 

428 on_success: Callable[[], None] | None = None, 

429 ) -> str: 

430 """Enqueue a download; ``on_success`` runs on the worker thread once the file is on disk.""" 

431 return self.start_task( 

432 model.display_name, 

433 TaskType.DOWNLOAD, 

434 lambda reporter: _download_target(reporter, model, allow_unsupported=allow_unsupported), 

435 on_success=on_success, 

436 ) 

437 

438 

439def _download_target( 

440 reporter: ProgressReporter, model: CatalogModel, *, allow_unsupported: bool = False 

441) -> None: 

442 """``start_task`` target for a HuggingFace model download. 

443 

444 Translates ``PermissionError`` into the gated-repo friendly message and 

445 ``UnsupportedArchError`` into a user-facing arch-mismatch message so 

446 every call site gets consistent error UX. 

447 """ 

448 from lilbee.app.models import pull_model_data 

449 from lilbee.catalog import DownloadProgress 

450 from lilbee.catalog.compat import UnsupportedArchError 

451 from lilbee.catalog.types import ModelSource 

452 

453 def _on_progress(p: DownloadProgress) -> None: 

454 reporter.update(p.percent, f"{model.display_name}: {p.detail}") 

455 

456 try: 

457 pull_model_data( 

458 model.ref, 

459 ModelSource.NATIVE, 

460 on_update=_on_progress, 

461 allow_unsupported=allow_unsupported, 

462 ) 

463 except PermissionError as exc: 

464 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc 

465 except UnsupportedArchError as exc: 

466 raise RuntimeError( 

467 f"Architecture {exc.architecture!r} not supported by this lilbee build." 

468 ) from exc