#!/usr/bin/env python3 """JAESWIFT Telemetry Dashboard Endpoints Provides /api/telemetry/* endpoints for the live ops dashboard at /hq/telemetry """ import json, os, time, subprocess, re, socket, platform from datetime import datetime, timezone from pathlib import Path from functools import lru_cache from collections import Counter, defaultdict from concurrent.futures import ThreadPoolExecutor from flask import Blueprint, jsonify telemetry_bp = Blueprint('telemetry', __name__) DATA_DIR = Path(__file__).parent / 'data' HISTORY_FILE = DATA_DIR / 'telemetry_history.json' NGINX_LOG = '/var/log/nginx/access.log' # ─── In-memory state ──────────────────────────────── _prev_net = {'rx': 0, 'tx': 0, 'ts': 0} _nginx_cache = {'ts': 0, 'data': None} _geo_cache = {'ts': 0, 'data': []} _stack_cache = None def shell(cmd, timeout=5): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) return r.stdout.strip() except Exception: return '' def iso_now(): return datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') # ─── SYSTEM METRICS ───────────────────────────────── def gather_system(): load_raw = shell("cat /proc/loadavg") load_parts = load_raw.split() load_1 = float(load_parts[0]) if load_parts else 0.0 load_5 = float(load_parts[1]) if len(load_parts) > 1 else 0.0 load_15 = float(load_parts[2]) if len(load_parts) > 2 else 0.0 ncpu_raw = shell("nproc") ncpu = int(ncpu_raw) if ncpu_raw.isdigit() else 1 cpu_pct = round(min(load_1 / max(ncpu, 1) * 100, 100), 1) mem_raw = shell("free -b | awk '/Mem:/{printf \"%d %d %d\", $2,$3,$7}'").split() if len(mem_raw) >= 3: mem_total = int(mem_raw[0]) mem_used = int(mem_raw[1]) mem_pct = round(mem_used / mem_total * 100, 1) if mem_total else 0 else: mem_total = mem_used = 0 mem_pct = 0 disk_raw = shell("df -BG --output=target,used,size,pcent -x tmpfs -x devtmpfs -x squashfs -x overlay 2>/dev/null | tail -n +2") disks = [] for line in disk_raw.split('\n'): parts = line.split() if len(parts) >= 4 and parts[0].startswith('/'): try: used_gb = int(parts[1].rstrip('G')) total_gb = int(parts[2].rstrip('G')) pct = int(parts[3].rstrip('%')) disks.append({'mount': parts[0], 'used_gb': used_gb, 'total_gb': total_gb, 'pct': pct}) except Exception: pass # Network rate net_raw = shell("cat /proc/net/dev | awk '/eth0|ens|enp/{print $1,$2,$10; exit}'") net_parts = net_raw.split() rx_bps = tx_bps = 0 if len(net_parts) >= 3: try: rx = int(net_parts[1]); tx = int(net_parts[2]) now = time.time() global _prev_net if _prev_net['ts'] and now > _prev_net['ts']: dt = now - _prev_net['ts'] rx_bps = max(0, int((rx - _prev_net['rx']) / dt)) tx_bps = max(0, int((tx - _prev_net['tx']) / dt)) _prev_net = {'rx': rx, 'tx': tx, 'ts': now} except Exception: pass up_raw = shell("cat /proc/uptime") uptime = float(up_raw.split()[0]) if up_raw else 0 return { 'cpu_percent': cpu_pct, 'mem_percent': mem_pct, 'mem_used_bytes': mem_used, 'mem_total_bytes': mem_total, 'disk_per_mount': disks, 'net_rx_bps': rx_bps, 'net_tx_bps': tx_bps, 'uptime_seconds': uptime, 'load_1': load_1, 'load_5': load_5, 'load_15': load_15, 'ncpu': ncpu, 'kernel': shell("uname -r"), 'hostname': socket.gethostname(), } # ─── SERVICES ─────────────────────────────────────── SYSTEMD_SERVICES = ['jaeswift-api', 'matty-lol', 'nginx', 'filebrowser', 'caddy', 'n8n', 'docker', 'ssh'] def check_systemd(svc): status = shell(f"systemctl is-active {svc} 2>/dev/null") or 'unknown' result = {'name': svc, 'status': 'up' if status == 'active' else ('down' if status in ('inactive', 'failed') else 'unknown'), 'uptime_seconds': 0, 'memory_mb': 0, 'cpu_percent': 0.0, 'type': 'systemd'} if result['status'] != 'up': return result ts = shell(f"systemctl show -p ActiveEnterTimestamp --value {svc} 2>/dev/null") if ts and ts != 'n/a': try: dt = datetime.strptime(ts.split(' UTC')[0].split(' GMT')[0].split(' BST')[0], '%a %Y-%m-%d %H:%M:%S') result['uptime_seconds'] = max(0, int(time.time() - dt.timestamp())) except Exception: pass pid = shell(f"systemctl show -p MainPID --value {svc} 2>/dev/null") if pid and pid.isdigit() and pid != '0': ps = shell(f"ps -o rss,%cpu -p {pid} --no-headers 2>/dev/null") parts = ps.split() if len(parts) >= 2: try: result['memory_mb'] = round(int(parts[0]) / 1024, 1) result['cpu_percent'] = float(parts[1]) except Exception: pass return result def check_pm2(): out = shell("pm2 jlist 2>/dev/null") results = [] if not out: return results try: data = json.loads(out) for proc in data: status = proc.get('pm2_env', {}).get('status', 'unknown') results.append({ 'name': f"pm2:{proc.get('name', 'unknown')}", 'status': 'up' if status == 'online' else 'down', 'uptime_seconds': max(0, int((time.time() * 1000 - proc.get('pm2_env', {}).get('pm_uptime', 0)) / 1000)) if status == 'online' else 0, 'memory_mb': round(proc.get('monit', {}).get('memory', 0) / 1024 / 1024, 1), 'cpu_percent': float(proc.get('monit', {}).get('cpu', 0)), 'type': 'pm2', }) except Exception: pass return results def gather_services(): results = [] with ThreadPoolExecutor(max_workers=8) as ex: for r in ex.map(check_systemd, SYSTEMD_SERVICES): results.append(r) results.extend(check_pm2()) return results # ─── CRONS ────────────────────────────────────────── CRON_JOBS = [ {'name': 'contraband_sync', 'schedule': '0 3 * * 0', 'log': '/var/log/contraband-sync.log'}, {'name': 'awesomelist_sync', 'schedule': '0 4 * * 0', 'log': '/var/log/awesomelist-sync.log'}, {'name': 'govdomains_sync', 'schedule': '0 */12 * * *', 'log': '/var/log/govdomains-sync.log'}, {'name': 'sitrep_generator', 'schedule': '0 7 * * *', 'log': '/var/log/sitrep.log'}, {'name': 'telemetry_snapshot', 'schedule': '*/5 * * * *', 'log': '/var/log/telemetry_snapshot.log'}, ] def gather_crons(): out = [] for job in CRON_JOBS: entry = {'name': job['name'], 'schedule': job['schedule'], 'last_run_iso': None, 'last_status': 'unknown', 'last_output_tail': ''} p = Path(job['log']) if p.exists(): try: mtime = p.stat().st_mtime entry['last_run_iso'] = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') tail = shell(f"tail -n 5 {job['log']} 2>/dev/null") entry['last_output_tail'] = tail[-500:] if tail else '' low = tail.lower() if any(w in low for w in ['error', 'fail', 'traceback', 'exception']): entry['last_status'] = 'fail' elif tail: entry['last_status'] = 'ok' except Exception: pass out.append(entry) return out # ─── NGINX 24h STATS ──────────────────────────────── def parse_nginx_24h(): now = time.time() if _nginx_cache['data'] and (now - _nginx_cache['ts']) < 60: return _nginx_cache['data'] result = { 'total_requests': 0, 'total_bytes': 0, 'avg_rpm': 0, 'top_pages': [], 'top_ips_redacted': [], 'status_4xx_count': 0, 'status_5xx_count': 0, 'error_rate_pct': 0.0, 'response_time_avg_ms': 0, } if not os.path.exists(NGINX_LOG): _nginx_cache.update({'ts': now, 'data': result}) return result cutoff = now - 86400 pages = Counter() ips = Counter() total_bytes = 0 total_reqs = 0 s4 = s5 = 0 rt_sum = 0.0 rt_count = 0 # Parse last N lines (tail is efficient) log_data = shell(f"tail -n 50000 {NGINX_LOG} 2>/dev/null", timeout=10) # Regex for combined format + optional request_time at end pat = re.compile( r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+) "[^"]*" "[^"]*"(?: (\S+))?' ) for line in log_data.split('\n'): m = pat.match(line) if not m: continue try: ts_str = m.group(2) # format: 19/Apr/2026:12:34:56 +0000 dt = datetime.strptime(ts_str.split()[0], '%d/%b/%Y:%H:%M:%S') ts = dt.replace(tzinfo=timezone.utc).timestamp() if ts < cutoff: continue ip = m.group(1) path = m.group(4) status = int(m.group(5)) size = int(m.group(6)) rt = m.group(7) total_reqs += 1 total_bytes += size # Skip noisy paths if len(path) <= 200: pages[path[:120]] += 1 # Mask last octet parts = ip.split('.') if len(parts) == 4: ip_masked = f"{parts[0]}.{parts[1]}.{parts[2]}.xxx" else: ip_masked = ip[:20] + '…' ips[ip_masked] += 1 if 400 <= status < 500: s4 += 1 elif status >= 500: s5 += 1 if rt and rt != '-': try: rt_sum += float(rt) * 1000 rt_count += 1 except Exception: pass except Exception: continue result['total_requests'] = total_reqs result['total_bytes'] = total_bytes result['avg_rpm'] = round(total_reqs / 1440, 1) if total_reqs else 0 result['top_pages'] = [{'path': p, 'count': c} for p, c in pages.most_common(10)] result['top_ips_redacted'] = [{'ip': i, 'count': c} for i, c in ips.most_common(10)] result['status_4xx_count'] = s4 result['status_5xx_count'] = s5 result['error_rate_pct'] = round((s4 + s5) / total_reqs * 100, 2) if total_reqs else 0 result['response_time_avg_ms'] = round(rt_sum / rt_count, 1) if rt_count else 0 _nginx_cache.update({'ts': now, 'data': result}) return result # ─── SECURITY ─────────────────────────────────────── def gather_security(): jails = [] total_bans = 0 f2b_status = shell("fail2ban-client status 2>/dev/null") jail_list = [] m = re.search(r'Jail list:\s*(.+)', f2b_status) if m: jail_list = [j.strip() for j in m.group(1).split(',') if j.strip()] for jail in jail_list: js = shell(f"fail2ban-client status {jail} 2>/dev/null") banned = 0 m2 = re.search(r'Currently banned:\s*(\d+)', js) if m2: banned = int(m2.group(1)) total_bans += banned jails.append({'name': jail, 'banned': banned}) ufw_out = shell("ufw status numbered 2>/dev/null") ufw_rules = len([l for l in ufw_out.split('\n') if re.match(r'^\s*\[\s*\d+', l)]) ssh_fails = shell("journalctl -u ssh --since '24 hours ago' --no-pager 2>/dev/null | grep -c 'Failed password'", timeout=8) try: ssh_attempts = int(ssh_fails) if ssh_fails.isdigit() else 0 except Exception: ssh_attempts = 0 reboot = shell("who -b 2>/dev/null | awk '{print $3,$4}'") reboot_iso = None try: dt = datetime.strptime(reboot.strip(), '%Y-%m-%d %H:%M') reboot_iso = dt.replace(tzinfo=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') except Exception: pass return { 'fail2ban_jails': jails, 'fail2ban_bans_total': total_bans, 'ufw_rules_count': ufw_rules, 'ssh_attempts_24h': ssh_attempts, 'last_reboot_iso': reboot_iso, } # ─── SSL ──────────────────────────────────────────── SSL_DOMAINS = ['jaeswift.xyz', 'git.jaeswift.xyz', 'files.jaeswift.xyz', 'plex.jaeswift.xyz', 'agentzero.jaeswift.xyz'] def check_ssl(domain): try: out = shell( f"echo | timeout 4 openssl s_client -servername {domain} -connect {domain}:443 2>/dev/null | openssl x509 -noout -enddate 2>/dev/null", timeout=6 ) m = re.search(r'notAfter=(.+)', out) if m: dt = datetime.strptime(m.group(1).strip(), '%b %d %H:%M:%S %Y %Z') days_left = int((dt.timestamp() - time.time()) / 86400) return {'domain': domain, 'expires_iso': dt.strftime('%Y-%m-%dT%H:%M:%SZ'), 'days_left': days_left} except Exception: pass return {'domain': domain, 'expires_iso': None, 'days_left': -1} def gather_ssl(): with ThreadPoolExecutor(max_workers=5) as ex: return list(ex.map(check_ssl, SSL_DOMAINS)) # ─── STACK INVENTORY (cached forever) ─────────────── def gather_stack(): global _stack_cache if _stack_cache is not None: return _stack_cache _stack_cache = { 'python_version': shell("python3 --version 2>&1").replace('Python ', ''), 'node_version': shell("node --version 2>/dev/null").lstrip('v'), 'nginx_version': shell("nginx -v 2>&1 | awk -F/ '{print $2}'"), 'docker_version': shell("docker --version 2>/dev/null | awk '{print $3}' | tr -d ','"), 'ffmpeg_version': shell("ffmpeg -version 2>/dev/null | head -1 | awk '{print $3}'"), 'kernel': shell("uname -r"), 'os': shell("lsb_release -ds 2>/dev/null || cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2 | tr -d '\"'"), 'uname': platform.uname().system + ' ' + platform.uname().machine, } return _stack_cache # ─── REPOS ────────────────────────────────────────── REPO_PATHS = ['/var/www/jaeswift-homepage'] def gather_repos(): repos = [] for path in REPO_PATHS: if not os.path.isdir(os.path.join(path, '.git')): continue try: sha = shell(f"cd {path} && git rev-parse --short HEAD 2>/dev/null") msg = shell(f"cd {path} && git log -1 --pretty=%s 2>/dev/null") iso = shell(f"cd {path} && git log -1 --pretty=%cI 2>/dev/null") dirty = bool(shell(f"cd {path} && git status --porcelain 2>/dev/null")) repos.append({ 'name': os.path.basename(path), 'path': path, 'last_commit_sha': sha, 'last_commit_msg': msg[:120], 'last_commit_iso': iso, 'dirty': dirty, }) except Exception: pass return repos def gather_recent_commits(limit=10): """Recent deployment ticker""" commits = [] path = REPO_PATHS[0] if os.path.isdir(os.path.join(path, '.git')): out = shell(f"cd {path} && git log -n {limit} --pretty=format:'%h|%cI|%s' 2>/dev/null") for line in out.split('\n'): parts = line.split('|', 2) if len(parts) == 3: commits.append({'sha': parts[0], 'iso': parts[1], 'msg': parts[2][:100]}) return commits # ─── ALERTS ───────────────────────────────────────── def compute_alerts(system, services_data, crons_data, nginx_data, security_data, ssl_data): alerts = [] now_iso = iso_now() if system['cpu_percent'] > 90: alerts.append({'level': 'red', 'message': f"CPU CRITICAL {system['cpu_percent']}%", 'since_iso': now_iso}) elif system['cpu_percent'] > 80: alerts.append({'level': 'amber', 'message': f"CPU HIGH {system['cpu_percent']}%", 'since_iso': now_iso}) if system['mem_percent'] > 90: alerts.append({'level': 'red', 'message': f"MEMORY CRITICAL {system['mem_percent']}%", 'since_iso': now_iso}) elif system['mem_percent'] > 80: alerts.append({'level': 'amber', 'message': f"MEMORY HIGH {system['mem_percent']}%", 'since_iso': now_iso}) for d in system.get('disk_per_mount', []): if d['pct'] > 90: alerts.append({'level': 'red', 'message': f"DISK {d['mount']} CRITICAL {d['pct']}%", 'since_iso': now_iso}) elif d['pct'] > 85: alerts.append({'level': 'amber', 'message': f"DISK {d['mount']} HIGH {d['pct']}%", 'since_iso': now_iso}) for s in services_data: if s['status'] == 'down': alerts.append({'level': 'red', 'message': f"SERVICE DOWN: {s['name']}", 'since_iso': now_iso}) for c in crons_data: if c['last_status'] == 'fail': alerts.append({'level': 'amber', 'message': f"CRON FAILED: {c['name']}", 'since_iso': now_iso}) if nginx_data.get('error_rate_pct', 0) > 1: alerts.append({'level': 'red', 'message': f"5xx RATE {nginx_data['error_rate_pct']}%", 'since_iso': now_iso}) for s in ssl_data: if 0 <= s['days_left'] < 14: alerts.append({'level': 'amber', 'message': f"SSL {s['domain']} EXPIRES {s['days_left']}d", 'since_iso': now_iso}) elif s['days_left'] == -1: pass # fetch failed, skip if security_data['ssh_attempts_24h'] > 200: alerts.append({'level': 'info', 'message': f"SSH brute-force: {security_data['ssh_attempts_24h']}/24h", 'since_iso': now_iso}) return alerts # ─── ENDPOINTS ────────────────────────────────────── @telemetry_bp.route('/api/telemetry/overview') def overview(): try: system = gather_system() services_data = gather_services() crons_data = gather_crons() nginx = parse_nginx_24h() security_data = gather_security() ssl_data = gather_ssl() stack = gather_stack() repos = gather_repos() return jsonify({ 'system': system, 'services': services_data, 'crons': crons_data, 'nginx_24h': nginx, 'security': security_data, 'ssl': ssl_data, 'stack': stack, 'repos': repos, 'timestamp': iso_now(), }) except Exception as e: return jsonify({'error': str(e)}), 500 @telemetry_bp.route('/api/telemetry/history') def history(): default = {'cpu': [], 'mem': [], 'net_rx': [], 'net_tx': [], 'timestamps': []} if not HISTORY_FILE.exists(): return jsonify(default) try: with open(HISTORY_FILE) as f: return jsonify(json.load(f)) except Exception: return jsonify(default) @telemetry_bp.route('/api/telemetry/nginx-tail') def nginx_tail(): if not os.path.exists(NGINX_LOG): return jsonify([]) raw = shell(f"tail -n 20 {NGINX_LOG} 2>/dev/null", timeout=3) pat = re.compile( r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+) "[^"]*" "([^"]*)"' ) out = [] for line in raw.split('\n'): m = pat.match(line) if not m: continue ip = m.group(1) parts = ip.split('.') ip_masked = f"{parts[0]}.{parts[1]}.{parts[2]}.xxx" if len(parts) == 4 else ip[:20] + '…' ua = m.group(7) ua_short = ua[:40] out.append({ 'time': m.group(2), 'ip_masked': ip_masked, 'method': m.group(3), 'path': m.group(4)[:80], 'status': int(m.group(5)), 'size': int(m.group(6)), 'ua_short': ua_short, }) return jsonify(out) @telemetry_bp.route('/api/telemetry/geo') def geo(): now = time.time() if _geo_cache['data'] and (now - _geo_cache['ts']) < 60: return jsonify(_geo_cache['data']) mmdb_paths = ['/usr/share/GeoIP/GeoLite2-Country.mmdb', '/var/lib/GeoIP/GeoLite2-Country.mmdb'] mmdb = next((p for p in mmdb_paths if os.path.exists(p)), None) if not mmdb or not os.path.exists(NGINX_LOG): _geo_cache.update({'ts': now, 'data': []}) return jsonify([]) try: import geoip2.database except ImportError: _geo_cache.update({'ts': now, 'data': []}) return jsonify([]) raw = shell(f"tail -n 20000 {NGINX_LOG} 2>/dev/null | awk '{{print $1}}' | sort -u", timeout=8) ips = [i for i in raw.split('\n') if i and re.match(r'^\d+\.\d+\.\d+\.\d+$', i)] counts = Counter() names = {} try: reader = geoip2.database.Reader(mmdb) for ip in ips[:5000]: try: rec = reader.country(ip) cc = rec.country.iso_code or 'XX' counts[cc] += 1 names[cc] = rec.country.name or cc except Exception: continue reader.close() except Exception: _geo_cache.update({'ts': now, 'data': []}) return jsonify([]) data = [{'country_code': cc, 'country_name': names.get(cc, cc), 'count': c} for cc, c in counts.most_common(50)] _geo_cache.update({'ts': now, 'data': data}) return jsonify(data) @telemetry_bp.route('/api/telemetry/alerts') def alerts(): try: system = gather_system() services_data = gather_services() crons_data = gather_crons() nginx = parse_nginx_24h() security_data = gather_security() ssl_data = gather_ssl() return jsonify(compute_alerts(system, services_data, crons_data, nginx, security_data, ssl_data)) except Exception as e: return jsonify({'error': str(e), 'alerts': []}), 500 @telemetry_bp.route('/api/telemetry/visitors') def visitors(): if not os.path.exists(NGINX_LOG): return jsonify({'active': 0, 'requests_5min': 0, 'req_per_min': 0}) cutoff = time.time() - 300 raw = shell(f"tail -n 5000 {NGINX_LOG} 2>/dev/null", timeout=4) pat = re.compile(r'^(\S+) \S+ \S+ \[([^\]]+)\]') ips = set() reqs = 0 for line in raw.split('\n'): m = pat.match(line) if not m: continue try: ts_str = m.group(2).split()[0] dt = datetime.strptime(ts_str, '%d/%b/%Y:%H:%M:%S') ts = dt.replace(tzinfo=timezone.utc).timestamp() if ts < cutoff: continue reqs += 1 parts = m.group(1).split('.') ips.add('.'.join(parts[:3]) if len(parts) == 4 else m.group(1)) except Exception: continue return jsonify({ 'active': len(ips), 'requests_5min': reqs, 'req_per_min': round(reqs / 5, 1), }) @telemetry_bp.route('/api/telemetry/commits') def commits(): return jsonify(gather_recent_commits(10))