jaeswift-website/api/telemetry_routes.py

618 lines
23 KiB
Python

#!/usr/bin/env python3
"""JAESWIFT Telemetry Dashboard Endpoints
Provides /api/telemetry/* endpoints for the live ops dashboard at /hq/telemetry
"""
import json, os, time, subprocess, re, socket, platform
from datetime import datetime, timezone
from pathlib import Path
from functools import lru_cache
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor
from flask import Blueprint, jsonify
telemetry_bp = Blueprint('telemetry', __name__)
DATA_DIR = Path(__file__).parent / 'data'
HISTORY_FILE = DATA_DIR / 'telemetry_history.json'
NGINX_LOG = '/var/log/nginx/access.log'
# ─── In-memory state ────────────────────────────────
_prev_net = {'rx': 0, 'tx': 0, 'ts': 0}
_nginx_cache = {'ts': 0, 'data': None}
_geo_cache = {'ts': 0, 'data': []}
_stack_cache = None
def shell(cmd, timeout=5):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return r.stdout.strip()
except Exception:
return ''
def iso_now():
return datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
# ─── SYSTEM METRICS ─────────────────────────────────
def gather_system():
load_raw = shell("cat /proc/loadavg")
load_parts = load_raw.split()
load_1 = float(load_parts[0]) if load_parts else 0.0
load_5 = float(load_parts[1]) if len(load_parts) > 1 else 0.0
load_15 = float(load_parts[2]) if len(load_parts) > 2 else 0.0
ncpu_raw = shell("nproc")
ncpu = int(ncpu_raw) if ncpu_raw.isdigit() else 1
cpu_pct = round(min(load_1 / max(ncpu, 1) * 100, 100), 1)
mem_raw = shell("free -b | awk '/Mem:/{printf \"%d %d %d\", $2,$3,$7}'").split()
if len(mem_raw) >= 3:
mem_total = int(mem_raw[0])
mem_used = int(mem_raw[1])
mem_pct = round(mem_used / mem_total * 100, 1) if mem_total else 0
else:
mem_total = mem_used = 0
mem_pct = 0
disk_raw = shell("df -BG --output=target,used,size,pcent -x tmpfs -x devtmpfs -x squashfs -x overlay 2>/dev/null | tail -n +2")
disks = []
for line in disk_raw.split('\n'):
parts = line.split()
if len(parts) >= 4 and parts[0].startswith('/'):
try:
used_gb = int(parts[1].rstrip('G'))
total_gb = int(parts[2].rstrip('G'))
pct = int(parts[3].rstrip('%'))
disks.append({'mount': parts[0], 'used_gb': used_gb, 'total_gb': total_gb, 'pct': pct})
except Exception:
pass
# Network rate
net_raw = shell("cat /proc/net/dev | awk '/eth0|ens|enp/{print $1,$2,$10; exit}'")
net_parts = net_raw.split()
rx_bps = tx_bps = 0
if len(net_parts) >= 3:
try:
rx = int(net_parts[1]); tx = int(net_parts[2])
now = time.time()
global _prev_net
if _prev_net['ts'] and now > _prev_net['ts']:
dt = now - _prev_net['ts']
rx_bps = max(0, int((rx - _prev_net['rx']) / dt))
tx_bps = max(0, int((tx - _prev_net['tx']) / dt))
_prev_net = {'rx': rx, 'tx': tx, 'ts': now}
except Exception:
pass
up_raw = shell("cat /proc/uptime")
uptime = float(up_raw.split()[0]) if up_raw else 0
return {
'cpu_percent': cpu_pct,
'mem_percent': mem_pct,
'mem_used_bytes': mem_used,
'mem_total_bytes': mem_total,
'disk_per_mount': disks,
'net_rx_bps': rx_bps,
'net_tx_bps': tx_bps,
'uptime_seconds': uptime,
'load_1': load_1,
'load_5': load_5,
'load_15': load_15,
'ncpu': ncpu,
'kernel': shell("uname -r"),
'hostname': socket.gethostname(),
}
# ─── SERVICES ───────────────────────────────────────
SYSTEMD_SERVICES = ['jaeswift-api', 'nginx', 'filebrowser', 'docker', 'ssh']
def check_systemd(svc):
status = shell(f"systemctl is-active {svc} 2>/dev/null") or 'unknown'
result = {'name': svc, 'status': 'up' if status == 'active' else ('down' if status in ('inactive', 'failed') else 'unknown'), 'uptime_seconds': 0, 'memory_mb': 0, 'cpu_percent': 0.0, 'type': 'systemd'}
if result['status'] != 'up':
return result
ts = shell(f"systemctl show -p ActiveEnterTimestamp --value {svc} 2>/dev/null")
if ts and ts != 'n/a':
try:
dt = datetime.strptime(ts.split(' UTC')[0].split(' GMT')[0].split(' BST')[0], '%a %Y-%m-%d %H:%M:%S')
result['uptime_seconds'] = max(0, int(time.time() - dt.timestamp()))
except Exception:
pass
pid = shell(f"systemctl show -p MainPID --value {svc} 2>/dev/null")
if pid and pid.isdigit() and pid != '0':
ps = shell(f"ps -o rss,%cpu -p {pid} --no-headers 2>/dev/null")
parts = ps.split()
if len(parts) >= 2:
try:
result['memory_mb'] = round(int(parts[0]) / 1024, 1)
result['cpu_percent'] = float(parts[1])
except Exception:
pass
return result
def check_pm2():
out = shell("pm2 jlist 2>/dev/null")
results = []
if not out:
return results
try:
data = json.loads(out)
for proc in data:
status = proc.get('pm2_env', {}).get('status', 'unknown')
results.append({
'name': f"pm2:{proc.get('name', 'unknown')}",
'status': 'up' if status == 'online' else 'down',
'uptime_seconds': max(0, int((time.time() * 1000 - proc.get('pm2_env', {}).get('pm_uptime', 0)) / 1000)) if status == 'online' else 0,
'memory_mb': round(proc.get('monit', {}).get('memory', 0) / 1024 / 1024, 1),
'cpu_percent': float(proc.get('monit', {}).get('cpu', 0)),
'type': 'pm2',
})
except Exception:
pass
return results
def gather_services():
results = []
with ThreadPoolExecutor(max_workers=8) as ex:
for r in ex.map(check_systemd, SYSTEMD_SERVICES):
results.append(r)
results.extend(check_pm2())
return results
# ─── CRONS ──────────────────────────────────────────
CRON_JOBS = [
{'name': 'contraband_sync', 'schedule': '0 3 * * 0', 'log': '/var/log/contraband-sync.log'},
{'name': 'awesomelist_sync', 'schedule': '0 4 * * 0', 'log': '/var/log/awesomelist-sync.log'},
{'name': 'govdomains_sync', 'schedule': '0 */12 * * *', 'log': '/var/log/govdomains-sync.log'},
{'name': 'sitrep_generator', 'schedule': '0 7 * * *', 'log': '/var/log/sitrep.log'},
{'name': 'telemetry_snapshot', 'schedule': '*/5 * * * *', 'log': '/var/log/telemetry_snapshot.log'},
]
def gather_crons():
out = []
for job in CRON_JOBS:
entry = {'name': job['name'], 'schedule': job['schedule'], 'last_run_iso': None, 'last_status': 'unknown', 'last_output_tail': ''}
p = Path(job['log'])
if p.exists():
try:
mtime = p.stat().st_mtime
entry['last_run_iso'] = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
tail = shell(f"tail -n 5 {job['log']} 2>/dev/null")
entry['last_output_tail'] = tail[-500:] if tail else ''
low = tail.lower()
if any(w in low for w in ['error', 'fail', 'traceback', 'exception']):
entry['last_status'] = 'fail'
elif tail:
entry['last_status'] = 'ok'
except Exception:
pass
out.append(entry)
return out
# ─── NGINX 24h STATS ────────────────────────────────
def parse_nginx_24h():
now = time.time()
if _nginx_cache['data'] and (now - _nginx_cache['ts']) < 60:
return _nginx_cache['data']
result = {
'total_requests': 0, 'total_bytes': 0, 'avg_rpm': 0,
'top_pages': [], 'top_ips_redacted': [],
'status_4xx_count': 0, 'status_5xx_count': 0, 'error_rate_pct': 0.0,
'response_time_avg_ms': 0,
}
if not os.path.exists(NGINX_LOG):
_nginx_cache.update({'ts': now, 'data': result})
return result
cutoff = now - 86400
pages = Counter()
ips = Counter()
total_bytes = 0
total_reqs = 0
s4 = s5 = 0
rt_sum = 0.0
rt_count = 0
# Parse last N lines (tail is efficient)
log_data = shell(f"tail -n 50000 {NGINX_LOG} 2>/dev/null", timeout=10)
# Regex for combined format + optional request_time at end
pat = re.compile(
r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+) "[^"]*" "[^"]*"(?: (\S+))?'
)
for line in log_data.split('\n'):
m = pat.match(line)
if not m:
continue
try:
ts_str = m.group(2)
# format: 19/Apr/2026:12:34:56 +0000
dt = datetime.strptime(ts_str.split()[0], '%d/%b/%Y:%H:%M:%S')
ts = dt.replace(tzinfo=timezone.utc).timestamp()
if ts < cutoff:
continue
ip = m.group(1)
path = m.group(4)
status = int(m.group(5))
size = int(m.group(6))
rt = m.group(7)
total_reqs += 1
total_bytes += size
# Skip noisy paths
if len(path) <= 200:
pages[path[:120]] += 1
# Mask last octet
parts = ip.split('.')
if len(parts) == 4:
ip_masked = f"{parts[0]}.{parts[1]}.{parts[2]}.xxx"
else:
ip_masked = ip[:20] + ''
ips[ip_masked] += 1
if 400 <= status < 500: s4 += 1
elif status >= 500: s5 += 1
if rt and rt != '-':
try:
rt_sum += float(rt) * 1000
rt_count += 1
except Exception:
pass
except Exception:
continue
result['total_requests'] = total_reqs
result['total_bytes'] = total_bytes
result['avg_rpm'] = round(total_reqs / 1440, 1) if total_reqs else 0
result['top_pages'] = [{'path': p, 'count': c} for p, c in pages.most_common(10)]
result['top_ips_redacted'] = [{'ip': i, 'count': c} for i, c in ips.most_common(10)]
result['status_4xx_count'] = s4
result['status_5xx_count'] = s5
result['error_rate_pct'] = round((s4 + s5) / total_reqs * 100, 2) if total_reqs else 0
result['response_time_avg_ms'] = round(rt_sum / rt_count, 1) if rt_count else 0
_nginx_cache.update({'ts': now, 'data': result})
return result
# ─── SECURITY ───────────────────────────────────────
def gather_security():
jails = []
total_bans = 0
f2b_status = shell("fail2ban-client status 2>/dev/null")
jail_list = []
m = re.search(r'Jail list:\s*(.+)', f2b_status)
if m:
jail_list = [j.strip() for j in m.group(1).split(',') if j.strip()]
for jail in jail_list:
js = shell(f"fail2ban-client status {jail} 2>/dev/null")
banned = 0
m2 = re.search(r'Currently banned:\s*(\d+)', js)
if m2:
banned = int(m2.group(1))
total_bans += banned
jails.append({'name': jail, 'banned': banned})
ufw_out = shell("ufw status numbered 2>/dev/null")
ufw_rules = len([l for l in ufw_out.split('\n') if re.match(r'^\s*\[\s*\d+', l)])
ssh_fails = shell("journalctl -u ssh --since '24 hours ago' --no-pager 2>/dev/null | grep -c 'Failed password'", timeout=8)
try:
ssh_attempts = int(ssh_fails) if ssh_fails.isdigit() else 0
except Exception:
ssh_attempts = 0
reboot = shell("who -b 2>/dev/null | awk '{print $3,$4}'")
reboot_iso = None
try:
dt = datetime.strptime(reboot.strip(), '%Y-%m-%d %H:%M')
reboot_iso = dt.replace(tzinfo=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
except Exception:
pass
return {
'fail2ban_jails': jails,
'fail2ban_bans_total': total_bans,
'ufw_rules_count': ufw_rules,
'ssh_attempts_24h': ssh_attempts,
'last_reboot_iso': reboot_iso,
}
# ─── SSL ────────────────────────────────────────────
SSL_DOMAINS = ['jaeswift.xyz', 'git.jaeswift.xyz', 'files.jaeswift.xyz', 'plex.jaeswift.xyz', 'agentzero.jaeswift.xyz']
def check_ssl(domain):
try:
out = shell(
f"echo | timeout 4 openssl s_client -servername {domain} -connect {domain}:443 2>/dev/null | openssl x509 -noout -enddate 2>/dev/null",
timeout=6
)
m = re.search(r'notAfter=(.+)', out)
if m:
dt = datetime.strptime(m.group(1).strip(), '%b %d %H:%M:%S %Y %Z')
days_left = int((dt.timestamp() - time.time()) / 86400)
return {'domain': domain, 'expires_iso': dt.strftime('%Y-%m-%dT%H:%M:%SZ'), 'days_left': days_left}
except Exception:
pass
return {'domain': domain, 'expires_iso': None, 'days_left': -1}
def gather_ssl():
with ThreadPoolExecutor(max_workers=5) as ex:
return list(ex.map(check_ssl, SSL_DOMAINS))
# ─── STACK INVENTORY (cached forever) ───────────────
def gather_stack():
global _stack_cache
if _stack_cache is not None:
return _stack_cache
_stack_cache = {
'python_version': shell("python3 --version 2>&1").replace('Python ', ''),
'node_version': shell("node --version 2>/dev/null").lstrip('v'),
'nginx_version': shell("nginx -v 2>&1 | awk -F/ '{print $2}'"),
'docker_version': shell("docker --version 2>/dev/null | awk '{print $3}' | tr -d ','"),
'ffmpeg_version': shell("ffmpeg -version 2>/dev/null | head -1 | awk '{print $3}'"),
'kernel': shell("uname -r"),
'os': shell("lsb_release -ds 2>/dev/null || cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2 | tr -d '\"'"),
'uname': platform.uname().system + ' ' + platform.uname().machine,
}
return _stack_cache
# ─── REPOS ──────────────────────────────────────────
REPO_PATHS = ['/var/www/jaeswift-homepage']
def gather_repos():
repos = []
for path in REPO_PATHS:
if not os.path.isdir(os.path.join(path, '.git')):
continue
try:
sha = shell(f"cd {path} && git rev-parse --short HEAD 2>/dev/null")
msg = shell(f"cd {path} && git log -1 --pretty=%s 2>/dev/null")
iso = shell(f"cd {path} && git log -1 --pretty=%cI 2>/dev/null")
dirty = bool(shell(f"cd {path} && git status --porcelain 2>/dev/null"))
repos.append({
'name': os.path.basename(path),
'path': path,
'last_commit_sha': sha,
'last_commit_msg': msg[:120],
'last_commit_iso': iso,
'dirty': dirty,
})
except Exception:
pass
return repos
def gather_recent_commits(limit=10):
"""Recent deployment ticker"""
commits = []
path = REPO_PATHS[0]
if os.path.isdir(os.path.join(path, '.git')):
out = shell(f"cd {path} && git log -n {limit} --pretty=format:'%h|%cI|%s' 2>/dev/null")
for line in out.split('\n'):
parts = line.split('|', 2)
if len(parts) == 3:
commits.append({'sha': parts[0], 'iso': parts[1], 'msg': parts[2][:100]})
return commits
# ─── ALERTS ─────────────────────────────────────────
def compute_alerts(system, services_data, crons_data, nginx_data, security_data, ssl_data):
alerts = []
now_iso = iso_now()
if system['cpu_percent'] > 90:
alerts.append({'level': 'red', 'message': f"CPU CRITICAL {system['cpu_percent']}%", 'since_iso': now_iso})
elif system['cpu_percent'] > 80:
alerts.append({'level': 'amber', 'message': f"CPU HIGH {system['cpu_percent']}%", 'since_iso': now_iso})
if system['mem_percent'] > 90:
alerts.append({'level': 'red', 'message': f"MEMORY CRITICAL {system['mem_percent']}%", 'since_iso': now_iso})
elif system['mem_percent'] > 80:
alerts.append({'level': 'amber', 'message': f"MEMORY HIGH {system['mem_percent']}%", 'since_iso': now_iso})
for d in system.get('disk_per_mount', []):
if d['pct'] > 90:
alerts.append({'level': 'red', 'message': f"DISK {d['mount']} CRITICAL {d['pct']}%", 'since_iso': now_iso})
elif d['pct'] > 85:
alerts.append({'level': 'amber', 'message': f"DISK {d['mount']} HIGH {d['pct']}%", 'since_iso': now_iso})
for s in services_data:
if s['status'] == 'down':
alerts.append({'level': 'red', 'message': f"SERVICE DOWN: {s['name']}", 'since_iso': now_iso})
for c in crons_data:
if c['last_status'] == 'fail':
alerts.append({'level': 'amber', 'message': f"CRON FAILED: {c['name']}", 'since_iso': now_iso})
if nginx_data.get('error_rate_pct', 0) > 1:
alerts.append({'level': 'red', 'message': f"5xx RATE {nginx_data['error_rate_pct']}%", 'since_iso': now_iso})
for s in ssl_data:
if 0 <= s['days_left'] < 14:
alerts.append({'level': 'amber', 'message': f"SSL {s['domain']} EXPIRES {s['days_left']}d", 'since_iso': now_iso})
elif s['days_left'] == -1:
pass # fetch failed, skip
if security_data['ssh_attempts_24h'] > 200:
alerts.append({'level': 'info', 'message': f"SSH brute-force: {security_data['ssh_attempts_24h']}/24h", 'since_iso': now_iso})
return alerts
# ─── ENDPOINTS ──────────────────────────────────────
@telemetry_bp.route('/api/telemetry/overview')
def overview():
try:
system = gather_system()
services_data = gather_services()
crons_data = gather_crons()
nginx = parse_nginx_24h()
security_data = gather_security()
ssl_data = gather_ssl()
stack = gather_stack()
repos = gather_repos()
return jsonify({
'system': system,
'services': services_data,
'crons': crons_data,
'nginx_24h': nginx,
'security': security_data,
'ssl': ssl_data,
'stack': stack,
'repos': repos,
'timestamp': iso_now(),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@telemetry_bp.route('/api/telemetry/history')
def history():
default = {'cpu': [], 'mem': [], 'net_rx': [], 'net_tx': [], 'timestamps': []}
if not HISTORY_FILE.exists():
return jsonify(default)
try:
with open(HISTORY_FILE) as f:
return jsonify(json.load(f))
except Exception:
return jsonify(default)
@telemetry_bp.route('/api/telemetry/nginx-tail')
def nginx_tail():
if not os.path.exists(NGINX_LOG):
return jsonify([])
raw = shell(f"tail -n 20 {NGINX_LOG} 2>/dev/null", timeout=3)
pat = re.compile(
r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+) "[^"]*" "([^"]*)"'
)
out = []
for line in raw.split('\n'):
m = pat.match(line)
if not m:
continue
ip = m.group(1)
parts = ip.split('.')
ip_masked = f"{parts[0]}.{parts[1]}.{parts[2]}.xxx" if len(parts) == 4 else ip[:20] + ''
ua = m.group(7)
ua_short = ua[:40]
out.append({
'time': m.group(2),
'ip_masked': ip_masked,
'method': m.group(3),
'path': m.group(4)[:80],
'status': int(m.group(5)),
'size': int(m.group(6)),
'ua_short': ua_short,
})
return jsonify(out)
@telemetry_bp.route('/api/telemetry/geo')
def geo():
now = time.time()
if _geo_cache['data'] and (now - _geo_cache['ts']) < 60:
return jsonify(_geo_cache['data'])
mmdb_paths = ['/usr/share/GeoIP/GeoLite2-Country.mmdb', '/var/lib/GeoIP/GeoLite2-Country.mmdb']
mmdb = next((p for p in mmdb_paths if os.path.exists(p)), None)
if not mmdb or not os.path.exists(NGINX_LOG):
_geo_cache.update({'ts': now, 'data': []})
return jsonify([])
try:
import geoip2.database
except ImportError:
_geo_cache.update({'ts': now, 'data': []})
return jsonify([])
raw = shell(f"tail -n 20000 {NGINX_LOG} 2>/dev/null | awk '{{print $1}}' | sort -u", timeout=8)
ips = [i for i in raw.split('\n') if i and re.match(r'^\d+\.\d+\.\d+\.\d+$', i)]
counts = Counter()
names = {}
try:
reader = geoip2.database.Reader(mmdb)
for ip in ips[:5000]:
try:
rec = reader.country(ip)
cc = rec.country.iso_code or 'XX'
counts[cc] += 1
names[cc] = rec.country.name or cc
except Exception:
continue
reader.close()
except Exception:
_geo_cache.update({'ts': now, 'data': []})
return jsonify([])
data = [{'country_code': cc, 'country_name': names.get(cc, cc), 'count': c} for cc, c in counts.most_common(50)]
_geo_cache.update({'ts': now, 'data': data})
return jsonify(data)
@telemetry_bp.route('/api/telemetry/alerts')
def alerts():
try:
system = gather_system()
services_data = gather_services()
crons_data = gather_crons()
nginx = parse_nginx_24h()
security_data = gather_security()
ssl_data = gather_ssl()
return jsonify(compute_alerts(system, services_data, crons_data, nginx, security_data, ssl_data))
except Exception as e:
return jsonify({'error': str(e), 'alerts': []}), 500
@telemetry_bp.route('/api/telemetry/visitors')
def visitors():
if not os.path.exists(NGINX_LOG):
return jsonify({'active': 0, 'requests_5min': 0, 'req_per_min': 0})
cutoff = time.time() - 300
raw = shell(f"tail -n 5000 {NGINX_LOG} 2>/dev/null", timeout=4)
pat = re.compile(r'^(\S+) \S+ \S+ \[([^\]]+)\]')
ips = set()
reqs = 0
for line in raw.split('\n'):
m = pat.match(line)
if not m:
continue
try:
ts_str = m.group(2).split()[0]
dt = datetime.strptime(ts_str, '%d/%b/%Y:%H:%M:%S')
ts = dt.replace(tzinfo=timezone.utc).timestamp()
if ts < cutoff:
continue
reqs += 1
parts = m.group(1).split('.')
ips.add('.'.join(parts[:3]) if len(parts) == 4 else m.group(1))
except Exception:
continue
return jsonify({
'active': len(ips),
'requests_5min': reqs,
'req_per_min': round(reqs / 5, 1),
})
@telemetry_bp.route('/api/telemetry/commits')
def commits():
return jsonify(gather_recent_commits(10))