Coverage for src / lilbee / providers / model_cache.py: 100%
79 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Llama-cpp loader-mode constants and dynamic-context / GPU-memory helpers."""
3from __future__ import annotations
5import logging
6import platform
7from enum import StrEnum
8from pathlib import Path
10log = logging.getLogger(__name__)
13class LoaderMode(StrEnum):
14 """Which task to configure llama.cpp for at load time."""
16 CHAT = "chat"
17 EMBED = "embed"
18 RERANK = "rerank"
21# Fallback KV cache estimate when GGUF metadata can't be read.
22# 2048 bytes/token undershoots real KV size for modern models (Gemma3-4B is
23# ~640 KB/token f16) but is fine as a coarse pre-load eviction signal.
24_KV_BYTES_PER_CTX_TOKEN = 2048
26# Metal/CUDA buffer overhead as fraction of model weight memory
27_BUFFER_OVERHEAD_FRACTION = 0.10
29# Default context length for estimation when metadata unavailable
30_DEFAULT_CTX_LEN = 2048
32# Floor for the dynamic n_ctx computation (smaller is unusable for chat)
33_DYNAMIC_CTX_FLOOR = 512
35# Round dynamic n_ctx down to a multiple of this (clean batch sizes)
36_DYNAMIC_CTX_QUANTUM = 256
38# KV cache element size for f16 (bytes). Quantized KV reduces this.
39_KV_ELEM_BYTES_F16 = 2
42def kv_bytes_per_token(meta: dict[str, str] | None, kv_elem_bytes: int = _KV_ELEM_BYTES_F16) -> int:
43 """Estimate per-token KV cache size in bytes from GGUF metadata.
45 Formula: 2 (K + V) * n_layers * n_kv_heads * head_dim * elem_bytes.
46 Falls back to ``_KV_BYTES_PER_CTX_TOKEN`` when metadata is missing.
47 """
48 if not meta:
49 return _KV_BYTES_PER_CTX_TOKEN
50 try:
51 n_layers = int(meta["block_count"])
52 head_count_kv = int(meta.get("head_count_kv") or meta["head_count"])
53 if "key_length" in meta and "value_length" in meta:
54 kv_dim = int(meta["key_length"]) + int(meta["value_length"])
55 else:
56 embed = int(meta["embedding_length"])
57 head_count = int(meta.get("head_count") or head_count_kv)
58 head_dim = embed // head_count
59 kv_dim = 2 * head_dim
60 except (KeyError, ValueError, ZeroDivisionError):
61 return _KV_BYTES_PER_CTX_TOKEN
62 return n_layers * head_count_kv * kv_dim * kv_elem_bytes
65def estimate_model_memory(
66 model_path: Path,
67 n_ctx: int = _DEFAULT_CTX_LEN,
68 kv_bytes_per_tok: int = _KV_BYTES_PER_CTX_TOKEN,
69) -> int:
70 """Estimate memory consumption for a GGUF model.
71 Approximation: file_size (weights) + KV cache + 10% buffer overhead.
72 """
73 file_bytes = model_path.stat().st_size if model_path.exists() else 0
74 kv_bytes = n_ctx * kv_bytes_per_tok
75 overhead = int(file_bytes * _BUFFER_OVERHEAD_FRACTION)
76 return file_bytes + kv_bytes + overhead
79def compute_dynamic_ctx(
80 *,
81 model_bytes: int,
82 available_bytes: int,
83 training_ctx: int,
84 kv_bytes_per_tok: int,
85 ceiling: int,
86 floor: int = _DYNAMIC_CTX_FLOOR,
87 quantum: int = _DYNAMIC_CTX_QUANTUM,
88) -> int:
89 """Pick the largest n_ctx that fits in available memory.
91 Subtracts model weights and a 10% buffer overhead from ``available_bytes``,
92 then divides the remainder by ``kv_bytes_per_tok``. Clamps to
93 ``[floor, min(training_ctx, ceiling)]`` and rounds down to ``quantum``.
94 """
95 if kv_bytes_per_tok <= 0:
96 return min(training_ctx, ceiling)
97 overhead = int(model_bytes * _BUFFER_OVERHEAD_FRACTION)
98 budget = available_bytes - model_bytes - overhead
99 if budget <= 0:
100 return floor
101 raw_ctx = budget // kv_bytes_per_tok
102 upper = min(training_ctx, ceiling)
103 bounded = max(floor, min(raw_ctx, upper))
104 quantized = (bounded // quantum) * quantum
105 return max(floor, quantized)
108def get_available_memory(fraction: float) -> int:
109 """Return usable GPU/unified memory in bytes, scaled by *fraction*.
110 - macOS (Apple Silicon): unified memory via psutil
111 - Linux with NVIDIA GPU: pynvml -> nvidia-smi -> psutil fallback
112 - Other: psutil system memory
113 """
114 import psutil
116 system = platform.system()
118 if system == "Darwin":
119 total = psutil.virtual_memory().total
120 return int(total * fraction)
122 if system in ("Linux", "Windows"):
123 nvidia_mem = _try_nvidia_memory()
124 if nvidia_mem is not None:
125 return int(nvidia_mem * fraction)
127 total = psutil.virtual_memory().total
128 return int(total * fraction)
131def _try_nvidia_memory() -> int | None:
132 """Try to get NVIDIA GPU total memory via pynvml, then nvidia-smi."""
133 try:
134 import pynvml # type: ignore[import-untyped]
136 pynvml.nvmlInit()
137 handle = pynvml.nvmlDeviceGetHandleByIndex(0)
138 info = pynvml.nvmlDeviceGetMemoryInfo(handle)
139 pynvml.nvmlShutdown()
140 return int(info.total)
141 except Exception: # noqa: S110 -- optional GPU detect; absence is expected on non-NVIDIA hosts
142 pass
144 try:
145 import subprocess
147 # nvidia-smi ships with the NVIDIA driver and is always on PATH when
148 # present; fully-qualifying it would break on every install layout.
149 result = subprocess.run(
150 ["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"], # noqa: S607
151 capture_output=True,
152 text=True,
153 timeout=5,
154 )
155 if result.returncode == 0:
156 mib = int(result.stdout.strip().split("\n")[0])
157 return mib * 1024 * 1024
158 except Exception: # noqa: S110 -- optional GPU detect; same rationale as above
159 pass
161 return None