Coverage for src / lilbee / runtime / _splash_runner.py: 100%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Standalone splash animation process: zero lilbee imports, stdlib only. 

2 

3Launched as a subprocess by ``splash.start()``. Reads a pipe fd from argv 

4and animates until the pipe signals EOF (parent closed its write end, or 

5parent died). This guarantees no orphan/zombie animation processes. 

6""" 

7 

8from __future__ import annotations 

9 

10import contextlib 

11import os 

12import select 

13import signal 

14import sys 

15import time 

16 

17HIDE_CURSOR = "\033[?25l" 

18SHOW_CURSOR = "\033[?25h" 

19CLEAR_LINE = "\033[2K" 

20MOVE_UP = "\033[A" 

21 

22AMBER_BRIGHT = "\033[38;5;214m" 

23AMBER_MID = "\033[38;5;172m" 

24AMBER_DIM = "\033[38;5;94m" 

25RESET = "\033[0m" 

26 

27FRAME_INTERVAL = 0.15 

28STARTUP_DELAY = 0.08 

29POLL_INTERVAL = 0.01 

30 

31# Knight-rider bar uses three falloff steps after the bright head. 

32_BAR_FALLOFF_DENSE = 1 

33_BAR_FALLOFF_LIGHT = 2 

34 

35# Subprocess entry point expects exactly ``python -m ... <pipe_fd>`` (script name + 1 arg). 

36_EXPECTED_ARGV_LEN = 2 

37 

38BEE_LINES = [ 

39 " ", 

40 "@@@ @@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@@ ", 

41 "@@@ @@@ @@@ @@@@@@@@ @@@@@@@@ @@@@@@@@ ", 

42 "@@@ @@@ @@@ @@! @@@ @@! @@! ", 

43 "@! !@! !@! !@ @!@ !@! !@! ", 

44 "@!! !!@ @!! @!@!@!@ @!!!:! @!!!:! ", 

45 "!!! !!! !!! !!!@!!!! !!!!!: !!!!!: ", 

46 "!!: !!: !!: !!: !!! !!: !!: ", 

47 " :!: :!: :!: :!: !:! :!: :!: ", 

48 " :: :::: :: :: :::: :: :::: :: :::: :: :::: ", 

49 ": :: : : : : :: : : :: : :: : :: :: : :: :: ", 

50 " ", 

51] 

52 

53LOGO_WIDTH = len(BEE_LINES[1]) 

54 

55COLOR_SEQUENCE = [AMBER_BRIGHT, AMBER_MID, AMBER_DIM, AMBER_MID] 

56 

57 

58def apply_color(line: str, color: str) -> str: 

59 """Apply color to non-empty parts of a line.""" 

60 if not line.strip(): 

61 return line 

62 return color + line + RESET 

63 

64 

65def build_logo_frames() -> list[list[str]]: 

66 """Pre-create 4 color-pulsed versions of the logo.""" 

67 return [[apply_color(line, color) for line in BEE_LINES] for color in COLOR_SEQUENCE] 

68 

69 

70def build_knight_rider_frames() -> list[str]: 

71 """Build 22-frame Knight Rider bar spanning the full logo width.""" 

72 frames: list[str] = [] 

73 sweep_range = LOGO_WIDTH - 1 

74 total_frames = sweep_range * 2 

75 

76 for pos in range(total_frames): 

77 head_pos = pos if pos < sweep_range else (total_frames - pos) 

78 

79 bar = "" 

80 for i in range(LOGO_WIDTH): 

81 dist = abs(i - head_pos) 

82 if dist == 0: 

83 bar += AMBER_BRIGHT + "\u2593" + RESET 

84 elif dist == _BAR_FALLOFF_DENSE: 

85 bar += AMBER_DIM + "\u2592" + RESET 

86 elif dist == _BAR_FALLOFF_LIGHT: 

87 bar += AMBER_DIM + "\u2591" + RESET 

88 else: 

89 bar += " " 

90 frames.append(bar) 

91 

92 return frames 

93 

94 

95def render_frame(logo_lines: list[str], loading_bar: str) -> bytes: 

96 """Build a single frame as raw bytes for os.write().""" 

97 all_lines = [*logo_lines, "", f" {loading_bar}"] 

98 return ("\n".join(all_lines) + "\n").encode() 

99 

100 

101def move_up_and_clear(n: int) -> bytes: 

102 """ANSI sequence to move cursor up n lines and clear each one.""" 

103 return ((MOVE_UP + CLEAR_LINE) * n).encode() 

104 

105 

106def clear_screen(frame_height: int) -> bytes: 

107 """Erase the splash frame area and restore the cursor to the top. 

108 

109 Uses line-by-line clear (move-up + erase) instead of ``\\033[2J\\033[H`` 

110 so the subprocess never writes a cursor-home escape. A cursor-home 

111 would land on the Textual alt-screen if the TUI starts before the 

112 subprocess has finished, leaving a stuck cursor artifact at (0,0). 

113 """ 

114 return move_up_and_clear(frame_height) + SHOW_CURSOR.encode() 

115 

116 

117def _read_eof(pipe_fd: int) -> bool: 

118 """Try to read one byte: returns True if EOF, False if data available.""" 

119 try: 

120 return len(os.read(pipe_fd, 1)) == 0 

121 except OSError: 

122 return True 

123 

124 

125def _pipe_closed_win32(pipe_fd: int) -> bool: # pragma: no cover Windows-only 

126 """Win32 pipe-EOF check using PeekNamedPipe.""" 

127 import ctypes 

128 import msvcrt 

129 

130 try: 

131 handle = msvcrt.get_osfhandle(pipe_fd) # type: ignore[attr-defined] 

132 except OSError: 

133 return True # bad fd, pipe is gone 

134 avail = ctypes.c_ulong(0) 

135 if not ctypes.windll.kernel32.PeekNamedPipe( # type: ignore[attr-defined] 

136 handle, None, 0, None, ctypes.byref(avail), None 

137 ): 

138 return True 

139 if avail.value == 0: 

140 return False 

141 return _read_eof(pipe_fd) 

142 

143 

144def _pipe_closed_posix(pipe_fd: int) -> bool: # pragma: no cover POSIX-only 

145 """POSIX pipe-EOF check using select.""" 

146 try: 

147 readable, _, _ = select.select([pipe_fd], [], [], 0) 

148 except (ValueError, OSError): 

149 return True 

150 if not readable: 

151 return False 

152 return _read_eof(pipe_fd) 

153 

154 

155def pipe_closed(pipe_fd: int) -> bool: 

156 """Check if the pipe has been closed (EOF) without blocking.""" 

157 if sys.platform == "win32": 

158 return _pipe_closed_win32(pipe_fd) 

159 return _pipe_closed_posix(pipe_fd) # pragma: no cover POSIX-only 

160 

161 

162def animation_loop(pipe_fd: int) -> None: 

163 """Run the animation, exiting when the pipe signals EOF.""" 

164 fd = 2 # stderr 

165 

166 logo_frames = build_logo_frames() 

167 knight_frames = build_knight_rider_frames() 

168 frame_height = len(BEE_LINES) + 2 

169 

170 got_signal = False 

171 

172 if sys.platform != "win32": 

173 

174 def handle_term(signum: int, frame: object) -> None: 

175 nonlocal got_signal 

176 got_signal = True 

177 

178 signal.signal(signal.SIGTERM, handle_term) 

179 

180 for _ in range(int(STARTUP_DELAY / POLL_INTERVAL)): 

181 if got_signal or pipe_closed(pipe_fd): 

182 return 

183 time.sleep(POLL_INTERVAL) 

184 

185 try: 

186 os.write(fd, HIDE_CURSOR.encode()) 

187 frame_idx = 0 

188 knight_idx = 0 

189 

190 while not got_signal and not pipe_closed(pipe_fd): 

191 logo = logo_frames[frame_idx % len(logo_frames)] 

192 knight = knight_frames[knight_idx % len(knight_frames)] 

193 rendered = render_frame(logo, knight) 

194 os.write(fd, rendered) 

195 

196 for _ in range(int(FRAME_INTERVAL / POLL_INTERVAL)): 

197 if got_signal or pipe_closed(pipe_fd): 

198 break 

199 time.sleep(POLL_INTERVAL) 

200 

201 if not got_signal and not pipe_closed(pipe_fd): 

202 os.write(fd, move_up_and_clear(frame_height)) # pragma: no cover 

203 

204 frame_idx += 1 

205 knight_idx += 1 

206 except OSError: 

207 pass # parent closed the splash pipe; just stop drawing 

208 finally: 

209 with contextlib.suppress(OSError): 

210 os.write(fd, clear_screen(frame_height)) 

211 

212 

213def main() -> None: 

214 """Entry point when run as ``python -m lilbee.runtime._splash_runner <pipe_fd>``.""" 

215 if len(sys.argv) != _EXPECTED_ARGV_LEN: 

216 sys.exit(1) 

217 

218 try: 

219 pipe_fd = int(sys.argv[1]) 

220 except ValueError: 

221 sys.exit(1) 

222 

223 try: 

224 animation_loop(pipe_fd) 

225 finally: 

226 with contextlib.suppress(OSError): 

227 os.close(pipe_fd) 

228 

229 

230if __name__ == "__main__": 

231 main()