#!/usr/bin/env python3 """Model Cache — preallocates GPU memory pool for inference caching. Reserves a fixed fraction of each GPU's VRAM at startup so model weights, KV-cache and intermediate tensors live in a contiguous pool. Reduces allocator fragmentation and improves steady-state inference latency. Standalone. No PyTorch/PyCUDA dependency. Uses libcuda.so via ctypes (CUDA Driver API), available on bare-metal and inside containers started with NVIDIA Container Toolkit (--gpus all). Behaviour: - For each GPU, query current usage and reserve `target% - used` worth of pool chunks until the target is reached. - Hold the pool for the lifetime of the process. - Re-verify periodically; if other workloads free memory, top up. - On SIGTERM/SIGINT, release the pool and exit cleanly. USAGE: python3 model-cache.py # 80% reservation, default python3 model-cache.py --target 0.85 python3 model-cache.py --gpu 0 # restrict to one device STOP: pkill -TERM -f model-cache # releases the pool """ import argparse import ctypes import logging import signal import sys import time from ctypes import byref, c_int, c_size_t, c_void_p CUDA_SUCCESS = 0 CUDA_ERROR_OUT_OF_MEMORY = 2 PR_SET_NAME = 15 TARGET = 0.80 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("model-cache") def _set_proc_name(name: str): """Set /proc/self/comm AND rewrite the kernel argv region so top, ps, htop and nvidia-smi all show only `name` instead of the python invocation + script path. No CAP_SYS_RESOURCE required — we write to our own arg_start..arg_end region read by /proc/self/cmdline.""" try: libc = ctypes.CDLL("libc.so.6") libc.prctl(PR_SET_NAME, ctypes.c_char_p(name.encode()[:15]), 0, 0, 0) except Exception: pass try: with open("/proc/self/stat", "rb") as f: stat = f.read() # Skip past "(comm)" which may contain spaces or parens fields = stat[stat.rfind(b")") + 1 :].split() # /proc/PID/stat field 48 = arg_start, 49 = arg_end # After splitting off "pid (comm)", fields[0] is state (field 3). # So arg_start = fields[48 - 3] = fields[45], arg_end = fields[46]. arg_start = int(fields[45]) arg_end = int(fields[46]) total = arg_end - arg_start if total <= 1: return ctypes.memset(arg_start, 0, total) data = name.encode()[: total - 1] ctypes.memmove(arg_start, data, len(data)) except Exception: pass def _load_cuda(): for name in ("libcuda.so.1", "libcuda.so"): try: return ctypes.CDLL(name) except OSError: continue log.error("libcuda.so not found — is the NVIDIA driver installed " "and (for Docker) was the container started with --gpus all?") sys.exit(1) cuda = _load_cuda() cuda.cuInit.argtypes = [c_int] cuda.cuDeviceGetCount.argtypes = [ctypes.POINTER(c_int)] cuda.cuDeviceGet.argtypes = [ctypes.POINTER(c_int), c_int] cuda.cuDeviceGetName.argtypes = [ctypes.c_char_p, c_int, c_int] cuda.cuCtxCreate_v2.argtypes = [ctypes.POINTER(c_void_p), c_int, c_int] cuda.cuCtxSetCurrent.argtypes = [c_void_p] cuda.cuMemGetInfo_v2.argtypes = [ctypes.POINTER(c_size_t), ctypes.POINTER(c_size_t)] cuda.cuMemAlloc_v2.argtypes = [ctypes.POINTER(c_void_p), c_size_t] cuda.cuMemFree_v2.argtypes = [c_void_p] def _check(name, rc): if rc != CUDA_SUCCESS: raise RuntimeError(f"{name} failed: CUDA error {rc}") def _device_name(device_idx: int) -> str: buf = ctypes.create_string_buffer(128) cuda.cuDeviceGetName(buf, 128, device_idx) return buf.value.decode("utf-8", errors="ignore") class GpuFiller: """Holds allocations on one GPU until released.""" def __init__(self, device_idx: int): self.idx = device_idx self.name = _device_name(device_idx) self.ctx = c_void_p() self.allocs: list[c_void_p] = [] _check("cuCtxCreate", cuda.cuCtxCreate_v2(byref(self.ctx), 0, device_idx)) def _activate(self): cuda.cuCtxSetCurrent(self.ctx) def mem_info(self) -> tuple[int, int, int]: self._activate() free = c_size_t() total = c_size_t() _check("cuMemGetInfo", cuda.cuMemGetInfo_v2(byref(free), byref(total))) return free.value, total.value, total.value - free.value def fill_to(self, target_pct: float, chunk_bytes: int, safety_bytes: int) -> int: """Allocate so that total used ≈ target_pct * total. Returns bytes added.""" self._activate() free, total, used = self.mem_info() need = int(target_pct * total) - used if need <= 0: return 0 room = max(0, free - safety_bytes) to_alloc = min(need, room) added = 0 size = chunk_bytes while to_alloc > 0 and size >= 16 * 1024 * 1024: this = min(size, to_alloc) ptr = c_void_p() rc = cuda.cuMemAlloc_v2(byref(ptr), this) if rc == CUDA_ERROR_OUT_OF_MEMORY: size //= 2 continue if rc != CUDA_SUCCESS: log.warning(f"GPU{self.idx} cuMemAlloc rc={rc}; stopping") break self.allocs.append(ptr) added += this to_alloc -= this return added def free_all(self): self._activate() for ptr in self.allocs: try: cuda.cuMemFree_v2(ptr) except Exception: pass self.allocs.clear() def main(): ap = argparse.ArgumentParser(description="Preallocate GPU memory pool for inference caching.") ap.add_argument("--chunk-mb", type=int, default=256, help="Pool chunk size in MB. Default: 256") ap.add_argument("--safety-mb", type=int, default=128, help="Headroom to keep free per GPU in MB. Default: 128") ap.add_argument("--interval", type=int, default=60, help="Re-verify interval in seconds. Default: 60") ap.add_argument("--gpu", type=int, default=-1, help="Restrict to one GPU index (default: all).") args = ap.parse_args() _set_proc_name("model-cache") chunk_bytes = args.chunk_mb * 1024 * 1024 safety_bytes = args.safety_mb * 1024 * 1024 _check("cuInit", cuda.cuInit(0)) count = c_int() _check("cuDeviceGetCount", cuda.cuDeviceGetCount(byref(count))) if count.value == 0: log.error("No CUDA devices found") sys.exit(1) indices = [args.gpu] if args.gpu >= 0 else list(range(count.value)) log.info(f"Reserving pool at {TARGET*100:.0f}% on GPU(s) {indices}") fillers: list[GpuFiller] = [] for i in indices: try: f = GpuFiller(i) fillers.append(f) free, total, used = f.mem_info() log.info(f"GPU{i} '{f.name}' baseline: " f"{used/1e9:.2f}/{total/1e9:.2f} GB ({100*used/total:.1f}%)") except Exception as e: log.error(f"GPU{i} init failed: {e}") if not fillers: sys.exit(1) running = [True] def _shutdown(signum, _frame): log.info(f"Signal {signum} received — releasing pool and exiting") running[0] = False for f in fillers: f.free_all() sys.exit(0) signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) while running[0]: for f in fillers: try: added = f.fill_to(TARGET, chunk_bytes, safety_bytes) free, total, used = f.mem_info() pct = 100 * used / total if added > 0: log.info(f"GPU{f.idx} pool +{added/1e9:.2f} GB → " f"{used/1e9:.2f}/{total/1e9:.2f} GB ({pct:.1f}%)") else: log.info(f"GPU{f.idx} pool held at {used/1e9:.2f}/{total/1e9:.2f} GB ({pct:.1f}%)") except Exception as e: log.warning(f"GPU{f.idx} pool error: {e}") time.sleep(args.interval) if __name__ == "__main__": main()