Coverage for src / lilbee / providers / model_cache.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Llama-cpp loader-mode constants and dynamic-context / GPU-memory helpers.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import platform 

7from enum import StrEnum 

8from pathlib import Path 

9 

10log = logging.getLogger(__name__) 

11 

12 

13class LoaderMode(StrEnum): 

14 """Which task to configure llama.cpp for at load time.""" 

15 

16 CHAT = "chat" 

17 EMBED = "embed" 

18 RERANK = "rerank" 

19 

20 

21# Fallback KV cache estimate when GGUF metadata can't be read. 

22# 2048 bytes/token undershoots real KV size for modern models (Gemma3-4B is 

23# ~640 KB/token f16) but is fine as a coarse pre-load eviction signal. 

24_KV_BYTES_PER_CTX_TOKEN = 2048 

25 

26# Metal/CUDA buffer overhead as fraction of model weight memory 

27_BUFFER_OVERHEAD_FRACTION = 0.10 

28 

29# Default context length for estimation when metadata unavailable 

30_DEFAULT_CTX_LEN = 2048 

31 

32# Floor for the dynamic n_ctx computation (smaller is unusable for chat) 

33_DYNAMIC_CTX_FLOOR = 512 

34 

35# Round dynamic n_ctx down to a multiple of this (clean batch sizes) 

36_DYNAMIC_CTX_QUANTUM = 256 

37 

38# KV cache element size for f16 (bytes). Quantized KV reduces this. 

39_KV_ELEM_BYTES_F16 = 2 

40 

41 

42def kv_bytes_per_token(meta: dict[str, str] | None, kv_elem_bytes: int = _KV_ELEM_BYTES_F16) -> int: 

43 """Estimate per-token KV cache size in bytes from GGUF metadata. 

44 

45 Formula: 2 (K + V) * n_layers * n_kv_heads * head_dim * elem_bytes. 

46 Falls back to ``_KV_BYTES_PER_CTX_TOKEN`` when metadata is missing. 

47 """ 

48 if not meta: 

49 return _KV_BYTES_PER_CTX_TOKEN 

50 try: 

51 n_layers = int(meta["block_count"]) 

52 head_count_kv = int(meta.get("head_count_kv") or meta["head_count"]) 

53 if "key_length" in meta and "value_length" in meta: 

54 kv_dim = int(meta["key_length"]) + int(meta["value_length"]) 

55 else: 

56 embed = int(meta["embedding_length"]) 

57 head_count = int(meta.get("head_count") or head_count_kv) 

58 head_dim = embed // head_count 

59 kv_dim = 2 * head_dim 

60 except (KeyError, ValueError, ZeroDivisionError): 

61 return _KV_BYTES_PER_CTX_TOKEN 

62 return n_layers * head_count_kv * kv_dim * kv_elem_bytes 

63 

64 

65def estimate_model_memory( 

66 model_path: Path, 

67 n_ctx: int = _DEFAULT_CTX_LEN, 

68 kv_bytes_per_tok: int = _KV_BYTES_PER_CTX_TOKEN, 

69) -> int: 

70 """Estimate memory consumption for a GGUF model. 

71 Approximation: file_size (weights) + KV cache + 10% buffer overhead. 

72 """ 

73 file_bytes = model_path.stat().st_size if model_path.exists() else 0 

74 kv_bytes = n_ctx * kv_bytes_per_tok 

75 overhead = int(file_bytes * _BUFFER_OVERHEAD_FRACTION) 

76 return file_bytes + kv_bytes + overhead 

77 

78 

79def compute_dynamic_ctx( 

80 *, 

81 model_bytes: int, 

82 available_bytes: int, 

83 training_ctx: int, 

84 kv_bytes_per_tok: int, 

85 ceiling: int, 

86 floor: int = _DYNAMIC_CTX_FLOOR, 

87 quantum: int = _DYNAMIC_CTX_QUANTUM, 

88) -> int: 

89 """Pick the largest n_ctx that fits in available memory. 

90 

91 Subtracts model weights and a 10% buffer overhead from ``available_bytes``, 

92 then divides the remainder by ``kv_bytes_per_tok``. Clamps to 

93 ``[floor, min(training_ctx, ceiling)]`` and rounds down to ``quantum``. 

94 """ 

95 if kv_bytes_per_tok <= 0: 

96 return min(training_ctx, ceiling) 

97 overhead = int(model_bytes * _BUFFER_OVERHEAD_FRACTION) 

98 budget = available_bytes - model_bytes - overhead 

99 if budget <= 0: 

100 return floor 

101 raw_ctx = budget // kv_bytes_per_tok 

102 upper = min(training_ctx, ceiling) 

103 bounded = max(floor, min(raw_ctx, upper)) 

104 quantized = (bounded // quantum) * quantum 

105 return max(floor, quantized) 

106 

107 

108def get_available_memory(fraction: float) -> int: 

109 """Return usable GPU/unified memory in bytes, scaled by *fraction*. 

110 - macOS (Apple Silicon): unified memory via psutil 

111 - Linux with NVIDIA GPU: pynvml -> nvidia-smi -> psutil fallback 

112 - Other: psutil system memory 

113 """ 

114 import psutil 

115 

116 system = platform.system() 

117 

118 if system == "Darwin": 

119 total = psutil.virtual_memory().total 

120 return int(total * fraction) 

121 

122 if system in ("Linux", "Windows"): 

123 nvidia_mem = _try_nvidia_memory() 

124 if nvidia_mem is not None: 

125 return int(nvidia_mem * fraction) 

126 

127 total = psutil.virtual_memory().total 

128 return int(total * fraction) 

129 

130 

131def _try_nvidia_memory() -> int | None: 

132 """Try to get NVIDIA GPU total memory via pynvml, then nvidia-smi.""" 

133 try: 

134 import pynvml # type: ignore[import-untyped] 

135 

136 pynvml.nvmlInit() 

137 handle = pynvml.nvmlDeviceGetHandleByIndex(0) 

138 info = pynvml.nvmlDeviceGetMemoryInfo(handle) 

139 pynvml.nvmlShutdown() 

140 return int(info.total) 

141 except Exception: # noqa: S110 -- optional GPU detect; absence is expected on non-NVIDIA hosts 

142 pass 

143 

144 try: 

145 import subprocess 

146 

147 # nvidia-smi ships with the NVIDIA driver and is always on PATH when 

148 # present; fully-qualifying it would break on every install layout. 

149 result = subprocess.run( 

150 ["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"], # noqa: S607 

151 capture_output=True, 

152 text=True, 

153 timeout=5, 

154 ) 

155 if result.returncode == 0: 

156 mib = int(result.stdout.strip().split("\n")[0]) 

157 return mib * 1024 * 1024 

158 except Exception: # noqa: S110 -- optional GPU detect; same rationale as above 

159 pass 

160 

161 return None