Coverage for src / lilbee / cli / tui / widgets / task_bar_controller.py: 100%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""TaskBarController and the per-task ProgressReporter.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from collections.abc import Callable 

8from enum import StrEnum 

9from typing import TYPE_CHECKING, Any 

10 

11from textual.app import App 

12 

13from lilbee.cli.tui import messages as msg 

14from lilbee.cli.tui.task_queue import TaskQueue, TaskStatus, TaskType 

15from lilbee.cli.tui.thread_safe import call_from_thread 

16from lilbee.crawler import bootstrap_chromium, chromium_installed 

17from lilbee.runtime import asyncio_loop 

18from lilbee.runtime.cancellation import TaskCancelledError 

19from lilbee.runtime.progress import EventType, SetupProgressEvent 

20 

21if TYPE_CHECKING: 

22 from lilbee.catalog import CatalogModel 

23 

24log = logging.getLogger(__name__) 

25 

26_DOWNLOAD_CONCURRENCY = 2 

27_BYTES_PER_MB = 1024 * 1024 

28 

29 

30class TaskOutcome(StrEnum): 

31 """How a task terminated. Passed from worker thread to finalizer.""" 

32 

33 DONE = "done" 

34 FAILED = "failed" 

35 CANCELLED = "cancelled" 

36 

37 

38class ProgressReporter: 

39 """Thread-safe handle a worker uses to report progress and check cancellation. 

40 

41 The worker only sees this object; it never touches ``self.app``, 

42 ``call_from_thread``, or any screen. Writes to the lock-protected 

43 ``TaskQueue`` so updates survive any UI navigation. 

44 """ 

45 

46 def __init__(self, controller: TaskBarController, task_id: str) -> None: 

47 self._controller = controller 

48 self._task_id = task_id 

49 

50 @property 

51 def task_id(self) -> str: 

52 return self._task_id 

53 

54 @property 

55 def cancelled(self) -> bool: 

56 task = self._controller.queue.get_task(self._task_id) 

57 return task is not None and task.status == TaskStatus.CANCELLED 

58 

59 def check_cancelled(self) -> None: 

60 """Raise ``TaskCancelledError`` if the task was cancelled from the UI.""" 

61 if self.cancelled: 

62 raise TaskCancelledError 

63 

64 def update( 

65 self, progress: float, detail: str = "", *, indeterminate: bool | None = None 

66 ) -> None: 

67 """Write a progress snapshot to the shared queue. 

68 

69 Raises ``TaskCancelledError`` first if the UI cancelled the task, so 

70 callers can use ``update`` as both a progress write and a cancel 

71 checkpoint. 

72 """ 

73 self.check_cancelled() 

74 self._controller.queue.update_task( 

75 self._task_id, progress, detail, indeterminate=indeterminate 

76 ) 

77 

78 

79TaskTarget = Callable[[ProgressReporter], None] 

80 

81 

82def _chromium_bootstrap_target(reporter: ProgressReporter) -> None: 

83 """Worker target for the SETUP task: run bootstrap_chromium with progress forwarding. 

84 

85 Module-level so ``TaskBarController.ensure_chromium`` stays short and 

86 tests can stub the target in isolation. 

87 """ 

88 

89 def _forward(event_type: EventType, data: Any) -> None: 

90 if event_type != EventType.SETUP_PROGRESS: 

91 return 

92 if not isinstance(data, SetupProgressEvent): 

93 return 

94 total = data.total_bytes or 0 

95 pct = int(data.downloaded_bytes * 100 / total) if total > 0 else 0 

96 mb = data.downloaded_bytes // _BYTES_PER_MB 

97 if total > 0: 

98 detail = msg.SETUP_CHROMIUM_DETAIL.format(done=mb, total=total // _BYTES_PER_MB) 

99 else: 

100 detail = msg.SETUP_CHROMIUM_DETAIL_UNKNOWN.format(done=mb) 

101 reporter.update(pct, detail) 

102 

103 asyncio_loop.run(bootstrap_chromium(on_progress=_forward)) 

104 

105 

106class TaskBarController: 

107 """App-level owner of the shared TaskQueue + all long-running work. 

108 

109 The controller is attached as ``app.task_bar`` during 

110 ``LilbeeApp.__init__``. All task lifecycle methods 

111 (add/update/complete/fail/cancel) go through here so every ``TaskBar`` 

112 widget sees the same state, and every long-running op is spawned by 

113 this controller: never by a screen that may dismiss mid-flight. 

114 """ 

115 

116 def __init__(self, app: App[Any]) -> None: 

117 self.app = app 

118 self.queue = TaskQueue(capacity={TaskType.DOWNLOAD.value: _DOWNLOAD_CONCURRENCY}) 

119 # task_id -> (target, on_success). Worker looks up its target here 

120 # so we don't capture in a closure that outlives the task. 

121 self._task_targets: dict[str, tuple[TaskTarget, Callable[[], None] | None]] = {} 

122 # Number of files in documents/ that are out of date with the store. 

123 # Set by start_detect_pending; read by TaskBar to render the 

124 # "N docs to sync · S to sync" hint when no live tasks are running. 

125 # Atomic int writes are safe under the GIL; the bar polls at 10 Hz. 

126 self.pending_sync_count: int = 0 

127 self._detect_thread: threading.Thread | None = None 

128 # Roles whose worker is currently in the spawn window (1-3 s cold 

129 # start). Surfaced as a single TaskBar hint instead of one toast 

130 # per role so the chat screen isn't drowned in implementation 

131 # detail on first prompt. 

132 self.spawning_roles: set[str] = set() 

133 

134 def add_task( 

135 self, 

136 name: str, 

137 task_type: str, 

138 fn: Callable[[], None] | None = None, 

139 *, 

140 indeterminate: bool = False, 

141 ) -> str: 

142 """Enqueue a task. Returns the new task_id.""" 

143 return self.queue.enqueue( 

144 fn or (lambda: None), name, task_type, indeterminate=indeterminate 

145 ) 

146 

147 def update_task( 

148 self, 

149 task_id: str, 

150 progress: float, 

151 detail: str = "", 

152 *, 

153 indeterminate: bool | None = None, 

154 ) -> None: 

155 """Update progress and detail text for a task.""" 

156 self.queue.update_task(task_id, progress, detail, indeterminate=indeterminate) 

157 

158 def complete_task(self, task_id: str) -> None: 

159 """Mark a task done. Row lingers in history until the user clears it.""" 

160 task_type = self._task_type_of(task_id) 

161 self.queue.complete_task(task_id) 

162 self._after_done_hooks(task_type) 

163 self._advance_all(task_type) 

164 

165 def fail_task(self, task_id: str, detail: str = "") -> None: 

166 """Mark a task failed. Row lingers in history until the user clears it.""" 

167 self.queue.fail_task(task_id, detail) 

168 self._advance_all(self._task_type_of(task_id)) 

169 

170 def cancel_task(self, task_id: str) -> None: 

171 """Mark a task cancelled. Row lingers in history until the user clears it.""" 

172 task_type = self._task_type_of(task_id) 

173 self.queue.cancel(task_id) 

174 self._advance_all(task_type) 

175 

176 def _after_done_hooks(self, task_type: str | None) -> None: 

177 """Side effects triggered by a DONE completion. 

178 

179 Callable from both the direct ``complete_task`` convenience and 

180 the worker-thread ``_finalize_task`` path so every success route 

181 stays in sync. Does NOT advance the queue; each caller picks the 

182 advance strategy that fits its context (``_advance_all`` vs 

183 ``_try_start_next``). 

184 """ 

185 if task_type == TaskType.DOWNLOAD.value: 

186 self._notify_model_installed() 

187 

188 def _task_type_of(self, task_id: str) -> str | None: 

189 task = self.queue.get_task(task_id) 

190 return task.task_type if task else None 

191 

192 def _advance_all(self, task_type: str | None) -> None: 

193 """Try to advance the freed type first, then any other idle type.""" 

194 if task_type: 

195 self.queue.advance(task_type) 

196 while self.queue.advance() is not None: 

197 pass 

198 

199 def set_pending_sync(self, count: int) -> None: 

200 """Update the pending-sync count surfaced in the TaskBar hint.""" 

201 self.pending_sync_count = max(count, 0) 

202 

203 def clear_pending_sync(self) -> None: 

204 """Drop the pending hint. Called when sync starts so the bar shows live progress instead.""" 

205 self.pending_sync_count = 0 

206 

207 def mark_role_spawning(self, role: str) -> None: 

208 """Add *role* to the set of workers whose pool process is starting.""" 

209 self.spawning_roles.add(role) 

210 

211 def mark_role_spawned(self, role: str) -> None: 

212 """Drop *role* from the spawn-in-progress set; harmless if already absent.""" 

213 self.spawning_roles.discard(role) 

214 

215 def start_detect_pending(self) -> None: 

216 """Run the cheap sync-detection (filesystem walk + hash compare) on a daemon thread. 

217 

218 Writes the result via ``set_pending_sync``. No-op if a detect job 

219 is already running. Errors are logged and silently swallowed: a 

220 failed detect just leaves the previous count in place rather 

221 than blocking the UI. 

222 """ 

223 if self._detect_thread is not None and self._detect_thread.is_alive(): 

224 return 

225 thread = threading.Thread( 

226 target=self._run_detect_pending, daemon=True, name="detect-pending" 

227 ) 

228 self._detect_thread = thread 

229 thread.start() 

230 

231 def _run_detect_pending(self) -> None: 

232 # Local import: lilbee.data.ingest pulls in lancedb + the embedder 

233 # transitively; the TUI shouldn't pay for that just to import the 

234 # task bar widget. 

235 from lilbee.data.ingest import detect_pending 

236 

237 try: 

238 count = detect_pending() 

239 except Exception: 

240 log.warning("detect_pending failed", exc_info=True) 

241 return 

242 self.set_pending_sync(count) 

243 

244 def ensure_chromium(self, on_ready: Callable[[], None]) -> None: 

245 """Kick off a Chromium bootstrap if missing, then call ``on_ready``. 

246 

247 If Chromium is already installed, ``on_ready`` runs immediately on 

248 the caller's thread. Otherwise a single SETUP task is enqueued 

249 that runs ``bootstrap_chromium``; on success the controller 

250 invokes ``on_ready`` on the worker thread via the task's 

251 ``on_success`` hook. On failure the SETUP task surfaces as FAILED 

252 and ``on_ready`` is NOT called (the follow-up work shouldn't 

253 proceed against a missing browser). 

254 

255 bb-wq8g: the on_ready hook is how callers like ``_do_crawl`` chain 

256 their real work behind the one-time bootstrap. 

257 """ 

258 if chromium_installed(): 

259 on_ready() 

260 return 

261 

262 self.start_task( 

263 msg.SETUP_CHROMIUM_NAME, 

264 TaskType.SETUP, 

265 _chromium_bootstrap_target, 

266 indeterminate=False, 

267 on_success=on_ready, 

268 ) 

269 

270 def start_task( 

271 self, 

272 name: str, 

273 task_type: TaskType, 

274 target: TaskTarget, 

275 *, 

276 indeterminate: bool = False, 

277 on_success: Callable[[], None] | None = None, 

278 ) -> str: 

279 """Enqueue a task, spawn its worker, return task_id. 

280 

281 The *target* receives a ``ProgressReporter`` as its only argument. 

282 It should periodically call ``reporter.update(percent, detail)`` and 

283 may call ``reporter.check_cancelled()`` to cooperatively abort. 

284 

285 On success (target returns normally) the queue marks the task DONE 

286 and ``on_success`` (if provided) runs after on the same worker 

287 thread. On ``TaskCancelledError`` the task is marked CANCELLED. On any 

288 other exception the task is marked FAILED with ``str(exc)`` as 

289 detail. Rows linger in the Task Center under their final status 

290 until the user presses capital ``C`` to clear; the bottom bar 

291 flashes the outcome once and then hides when idle. 

292 

293 Per-type capacity in ``TaskQueue`` (download=2, everything else=1) 

294 controls concurrency: a second sync queues behind the first, but a 

295 third download waits until one of the two active downloads finishes. 

296 """ 

297 task_id = self.queue.enqueue( 

298 lambda: None, name, task_type.value, indeterminate=indeterminate 

299 ) 

300 self._task_targets[task_id] = (target, on_success) 

301 self._try_start_next(task_type.value) 

302 return task_id 

303 

304 def _try_start_next(self, task_type: str) -> None: 

305 """Promote queued tasks of this type into any free capacity slots.""" 

306 while (task := self.queue.advance(task_type)) is not None: 

307 self._spawn_task_worker(task.task_id) 

308 

309 def _spawn_task_worker(self, task_id: str) -> None: 

310 """Start a daemon thread for the task. Safe to call from any thread.""" 

311 if task_id not in self._task_targets: 

312 return 

313 thread = threading.Thread( 

314 target=self._run_task_worker, 

315 args=(task_id,), 

316 daemon=True, 

317 name=f"task-{task_id}", 

318 ) 

319 thread.start() 

320 

321 def _run_task_worker(self, task_id: str) -> None: 

322 """Body of the daemon worker thread.""" 

323 entry = self._task_targets.get(task_id) 

324 if entry is None: 

325 return 

326 target, on_success = entry 

327 task = self.queue.get_task(task_id) 

328 task_type = task.task_type if task is not None else None 

329 reporter = ProgressReporter(self, task_id) 

330 try: 

331 target(reporter) 

332 except TaskCancelledError: 

333 log.info("Task %s cancelled", task_id) 

334 self._post_finalize(task_id, TaskOutcome.CANCELLED, "", task_type) 

335 except Exception as exc: 

336 log.warning("Task %s failed: %s", task_id, exc) 

337 self._post_finalize(task_id, TaskOutcome.FAILED, str(exc), task_type) 

338 else: 

339 self._post_finalize(task_id, TaskOutcome.DONE, "", task_type) 

340 if on_success is not None: 

341 try: 

342 on_success() 

343 except Exception: 

344 log.warning("on_success for %s raised", task_id, exc_info=True) 

345 finally: 

346 self._task_targets.pop(task_id, None) 

347 

348 def _post_finalize( 

349 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

350 ) -> None: 

351 """Marshal finalization back to the main thread. 

352 

353 Main-thread execution matters because ``set_timer`` (used for the 

354 flash-then-remove cycle) isn't safe from workers. ``call_from_thread`` 

355 targets ``self.app``: the App is long-lived; screens are not. 

356 """ 

357 call_from_thread(self.app, self._finalize_task, task_id, outcome, detail, task_type) 

358 

359 def _finalize_task( 

360 self, task_id: str, outcome: TaskOutcome, detail: str, task_type: str | None 

361 ) -> None: 

362 """Mark the queue state, refresh dependents, promote next queued task. 

363 

364 Runs on the main thread. Atomically: free the active slot, notify 

365 anything downstream that needs a repaint (e.g. model dropdowns 

366 after a download lands), and advance the queue. Rows stay in 

367 history; the bottom bar flash expires on its own. Users clear 

368 finished rows from the Task Center manually. 

369 """ 

370 if outcome is TaskOutcome.DONE: 

371 self.queue.complete_task(task_id) 

372 self._after_done_hooks(task_type) 

373 elif outcome is TaskOutcome.FAILED: 

374 self.queue.fail_task(task_id, detail) 

375 elif outcome is TaskOutcome.CANCELLED: 

376 self.queue.cancel(task_id) 

377 if task_type: 

378 self._try_start_next(task_type) 

379 

380 def _notify_model_installed(self) -> None: 

381 """Refresh any ChatScreen's ModelBar so the new model is selectable. 

382 

383 The dropdowns are built once on mount from the registry; without 

384 this nudge, a freshly-downloaded model only appears after the 

385 user reopens the screen. NoMatches and similar query errors are 

386 silently skipped so a transient "bar not mounted yet" doesn't 

387 crash the finalize path; anything else is logged so a real 

388 failure surfaces in debug output. 

389 """ 

390 # Late import to avoid a circular (ChatScreen imports this module). 

391 from textual.css.query import QueryError 

392 

393 from lilbee.cli.tui.screens.chat import ChatScreen 

394 

395 for screen in self.app.screen_stack: 

396 # screen_stack is typed Screen[Any]; narrow at runtime to 

397 # locate the one screen that owns the ModelBar. 

398 if isinstance(screen, ChatScreen): 

399 try: 

400 screen.refresh_model_bar() 

401 except QueryError: 

402 log.debug("ModelBar not mounted yet; skipping refresh", exc_info=True) 

403 break 

404 

405 def start_download(self, model: CatalogModel) -> str: 

406 """Enqueue a model download and spawn a background worker.""" 

407 return self.start_task( 

408 model.display_name, 

409 TaskType.DOWNLOAD, 

410 lambda reporter: _download_target(reporter, model), 

411 ) 

412 

413 

414def _download_target(reporter: ProgressReporter, model: CatalogModel) -> None: 

415 """``start_task`` target for a HuggingFace model download. 

416 

417 Translates ``PermissionError`` into the gated-repo friendly message so 

418 every call site (wizard, catalog, chat) gets consistent error UX. 

419 """ 

420 from lilbee.app.models import pull_model_data 

421 from lilbee.catalog import DownloadProgress 

422 from lilbee.catalog.types import ModelSource 

423 

424 def _on_progress(p: DownloadProgress) -> None: 

425 reporter.update(p.percent, f"{model.display_name}: {p.detail}") 

426 

427 try: 

428 pull_model_data(model.ref, ModelSource.NATIVE, on_update=_on_progress) 

429 except PermissionError as exc: 

430 raise RuntimeError(msg.CATALOG_GATED_REPO.format(name=model.display_name)) from exc