#!/usr/bin/env python3 """JAESWIFT HUD Backend API""" import json, os, time, subprocess, random, datetime, hashlib, zipfile, io, smtplib from functools import wraps from pathlib import Path from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from flask import Flask, request, jsonify, abort, send_file from flask_cors import CORS import jwt import requests as req app = Flask(__name__) CORS(app) DATA_DIR = Path(__file__).parent / 'data' JWT_SECRET = 'jaeswift-hud-s3cr3t-2026!x' ADMIN_USER = 'jae' ADMIN_PASS = 'HUDAdmin2026!' ARRAY_FILES = { 'posts.json', 'tracks.json', 'navigation.json', 'links.json', 'managed_services.json', 'messages.json' } # ─── Helpers ───────────────────────────────────────── def load_json(name): p = DATA_DIR / name if p.exists(): with open(p) as f: return json.load(f) return [] if name in ARRAY_FILES else {} def save_json(name, data): p = DATA_DIR / name p.parent.mkdir(parents=True, exist_ok=True) with open(p, 'w') as f: json.dump(data, f, indent=2) def require_auth(fn): @wraps(fn) def wrapper(*a, **kw): auth = request.headers.get('Authorization', '') if not auth.startswith('Bearer '): abort(401, 'Missing token') try: jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256']) except Exception: abort(401, 'Invalid token') return fn(*a, **kw) return wrapper def shell(cmd): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5) return r.stdout.strip() except Exception: return '' # ─── Auth ──────────────────────────────────────────── @app.route('/api/auth/login', methods=['POST']) def login(): d = request.get_json(force=True, silent=True) or {} if d.get('username') == ADMIN_USER and d.get('password') == ADMIN_PASS: token = jwt.encode( {'user': ADMIN_USER, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET, algorithm='HS256' ) return jsonify({'token': token}) return jsonify({'error': 'Invalid credentials'}), 401 @app.route('/api/auth/check') def auth_check(): auth = request.headers.get('Authorization', '') if not auth.startswith('Bearer '): return jsonify({'valid': False}), 401 try: jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256']) return jsonify({'valid': True, 'user': ADMIN_USER}) except Exception: return jsonify({'valid': False}), 401 # ─── Blog Posts ────────────────────────────────────── @app.route('/api/posts') def get_posts(): posts = load_json('posts.json') return jsonify(posts) @app.route('/api/posts/') def get_post(slug): posts = load_json('posts.json') for p in posts: if p.get('slug') == slug: return jsonify(p) abort(404) @app.route('/api/posts', methods=['POST']) @require_auth def create_post(): d = request.get_json(force=True) posts = load_json('posts.json') d['id'] = max((p.get('id', 0) for p in posts), default=0) + 1 d['date'] = d.get('date', datetime.date.today().isoformat()) d['word_count'] = len(d.get('content', '').split()) posts.append(d) save_json('posts.json', posts) return jsonify(d), 201 @app.route('/api/posts/', methods=['PUT']) @require_auth def update_post(slug): d = request.get_json(force=True) posts = load_json('posts.json') for i, p in enumerate(posts): if p.get('slug') == slug: d['id'] = p['id'] d['word_count'] = len(d.get('content', '').split()) posts[i] = {**p, **d} save_json('posts.json', posts) return jsonify(posts[i]) abort(404) @app.route('/api/posts/', methods=['DELETE']) @require_auth def delete_post(slug): posts = load_json('posts.json') posts = [p for p in posts if p.get('slug') != slug] save_json('posts.json', posts) return jsonify({'ok': True}) # ─── Server Stats ──────────────────────────────────── @app.route('/api/stats') def server_stats(): # CPU load = shell("cat /proc/loadavg | awk '{print $1}'") ncpu = shell("nproc") try: cpu_pct = round(float(load) / max(int(ncpu), 1) * 100, 1) except Exception: cpu_pct = 0 # Memory mem = shell("free | awk '/Mem:/{printf \"%.1f\", $3/$2*100}'") # Disk disk = shell("df / | awk 'NR==2{print $5}' | tr -d '%'") # Network (bytes since boot) net = shell("cat /proc/net/dev | awk '/eth0|ens/{print $2,$10}'") parts = net.split() rx = int(parts[0]) if len(parts) >= 2 else 0 tx = int(parts[1]) if len(parts) >= 2 else 0 # Docker running = shell("docker ps -q 2>/dev/null | wc -l") total = shell("docker ps -aq 2>/dev/null | wc -l") # Uptime up = shell("cat /proc/uptime | awk '{print $1}'") # Connections conns = shell("ss -s | awk '/TCP:/{print $2}'") return jsonify({ 'cpu_percent': cpu_pct, 'memory_percent': float(mem) if mem else 0, 'disk_percent': int(disk) if disk else 0, 'network_rx_bytes': rx, 'network_tx_bytes': tx, 'container_running': int(running) if running else 0, 'container_total': int(total) if total else 0, 'uptime_seconds': float(up) if up else 0, 'active_connections': int(conns) if conns else 0, 'load_avg': float(load) if load else 0, 'timestamp': time.time() }) # ─── Services Status ───────────────────────────────── @app.route('/api/services') def services(): svcs = [ {'name': 'Gitea', 'url': 'https://git.jaeswift.xyz'}, {'name': 'Plex', 'url': 'https://plex.jaeswift.xyz'}, {'name': 'Search', 'url': 'https://jaeswift.xyz/search'}, {'name': 'Yoink', 'url': 'https://jaeswift.xyz/yoink/'}, {'name': 'Archive', 'url': 'https://archive.jaeswift.xyz'}, {'name': 'Agent Zero', 'url': 'https://agentzero.jaeswift.xyz'}, {'name': 'Files', 'url': 'https://files.jaeswift.xyz'}, ] results = [] for s in svcs: try: t0 = time.time() r = req.get(s['url'], timeout=5, verify=False, allow_redirects=True) ms = round((time.time() - t0) * 1000) results.append({**s, 'status': 'online' if r.status_code < 500 else 'offline', 'response_time_ms': ms}) except Exception: results.append({**s, 'status': 'offline', 'response_time_ms': 0}) return jsonify(results) # ─── Weather ───────────────────────────────────────── @app.route('/api/weather') def weather(): try: r = req.get('https://wttr.in/Manchester?format=j1', timeout=5, headers={'User-Agent': 'jaeswift-hud'}) d = r.json() cur = d['current_condition'][0] return jsonify({ 'temp_c': int(cur['temp_C']), 'feels_like': int(cur['FeelsLikeC']), 'condition': cur['weatherDesc'][0]['value'], 'humidity': int(cur['humidity']), 'wind_kph': int(cur['windspeedKmph']), 'wind_dir': cur['winddir16Point'], 'icon': cur.get('weatherCode', ''), }) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Now Playing (random track) ────────────────────── @app.route('/api/nowplaying') def now_playing(): tracks = load_json('tracks.json') if not tracks: return jsonify({'artist': 'Unknown', 'track': 'Silence', 'album': ''}) t = random.choice(tracks) return jsonify(t) # ─── Tracks CRUD ───────────────────────────────────── @app.route('/api/tracks') def get_tracks(): return jsonify(load_json('tracks.json')) @app.route('/api/tracks', methods=['POST']) @require_auth def add_track(): d = request.get_json(force=True) tracks = load_json('tracks.json') tracks.append({ 'artist': d.get('artist', ''), 'track': d.get('track', ''), 'album': d.get('album', ''), }) save_json('tracks.json', tracks) return jsonify(tracks[-1]), 201 @app.route('/api/tracks/', methods=['DELETE']) @require_auth def delete_track(index): tracks = load_json('tracks.json') if 0 <= index < len(tracks): removed = tracks.pop(index) save_json('tracks.json', tracks) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Git Activity (from Gitea API) ─────────────────── @app.route('/api/git-activity') def git_activity(): try: r = req.get('https://git.jaeswift.xyz/api/v1/users/jae/heatmap', timeout=5, verify=False) heatmap = r.json() if r.status_code == 200 else [] r2 = req.get('https://git.jaeswift.xyz/api/v1/repos/search?sort=updated&limit=5&owner=jae', timeout=5, verify=False) repos = [] if r2.status_code == 200: data = r2.json().get('data', r2.json()) if isinstance(r2.json(), dict) else r2.json() for repo in (data if isinstance(data, list) else [])[:5]: repos.append({ 'name': repo.get('name', ''), 'updated': repo.get('updated_at', ''), 'stars': repo.get('stars_count', 0), 'language': repo.get('language', ''), }) return jsonify({'heatmap': heatmap, 'repos': repos}) except Exception as e: return jsonify({'heatmap': [], 'repos': [], 'error': str(e)}) # ─── Threat Feed (CVE feed) ────────────────────────── @app.route('/api/threats') def threats(): try: r = req.get('https://cve.circl.lu/api/last/8', timeout=8) cves = [] if r.status_code == 200: for item in r.json()[:8]: cves.append({ 'id': item.get('id', ''), 'summary': (item.get('summary', '') or '')[:120], 'published': item.get('Published', ''), 'cvss': item.get('cvss', 0), }) return jsonify(cves) except Exception: return jsonify([]) # ─── Settings ──────────────────────────────────────── @app.route('/api/settings') def get_settings(): return jsonify(load_json('settings.json')) @app.route('/api/settings', methods=['PUT']) @require_auth def update_settings(): d = request.get_json(force=True) save_json('settings.json', d) return jsonify(d) # ═════════════════════════════════════════════════════ # NEW ENDPOINTS # ═════════════════════════════════════════════════════ # ─── Homepage Config ───────────────────��───────────── @app.route('/api/homepage') def get_homepage(): return jsonify(load_json('homepage.json')) @app.route('/api/homepage', methods=['POST']) @require_auth def save_homepage(): try: d = request.get_json(force=True) save_json('homepage.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Managed Services ─────────────────────────────── @app.route('/api/services/managed') def get_managed_services(): return jsonify(load_json('managed_services.json')) @app.route('/api/services/managed', methods=['POST']) @require_auth def add_managed_service(): try: d = request.get_json(force=True) svcs = load_json('managed_services.json') if not isinstance(svcs, list): svcs = [] svcs.append({ 'name': d.get('name', ''), 'url': d.get('url', '') }) save_json('managed_services.json', svcs) return jsonify(svcs[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/services/managed/', methods=['DELETE']) @require_auth def delete_managed_service(index): svcs = load_json('managed_services.json') if not isinstance(svcs, list): svcs = [] if 0 <= index < len(svcs): removed = svcs.pop(index) save_json('managed_services.json', svcs) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Navigation ────────────────────────────────────── @app.route('/api/navigation') def get_navigation(): return jsonify(load_json('navigation.json')) @app.route('/api/navigation', methods=['POST']) @require_auth def add_navigation(): try: d = request.get_json(force=True) nav = load_json('navigation.json') if not isinstance(nav, list): nav = [] nav.append({ 'label': d.get('label', ''), 'url': d.get('url', ''), 'order': d.get('order', len(nav) + 1) }) save_json('navigation.json', nav) return jsonify(nav[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/navigation/', methods=['DELETE']) @require_auth def delete_navigation(index): nav = load_json('navigation.json') if not isinstance(nav, list): nav = [] if 0 <= index < len(nav): removed = nav.pop(index) save_json('navigation.json', nav) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Links ─────────────────────────────────────────── @app.route('/api/links') def get_links(): return jsonify(load_json('links.json')) @app.route('/api/links', methods=['POST']) @require_auth def add_link(): try: d = request.get_json(force=True) links = load_json('links.json') if not isinstance(links, list): links = [] links.append({ 'name': d.get('name', ''), 'url': d.get('url', ''), 'icon': d.get('icon', ''), 'category': d.get('category', '') }) save_json('links.json', links) return jsonify(links[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/links/', methods=['DELETE']) @require_auth def delete_link(index): links = load_json('links.json') if not isinstance(links, list): links = [] if 0 <= index < len(links): removed = links.pop(index) save_json('links.json', links) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── API Keys ──────────────────────────────────────── def mask_value(val): """Mask a string value, showing only last 4 chars if longer than 4.""" if not isinstance(val, str) or len(val) == 0: return val if len(val) <= 4: return '••••' return '••••••' + val[-4:] def is_masked(val): """Check if a value is a masked placeholder.""" if not isinstance(val, str): return False return '••••' in val @app.route('/api/apikeys') @require_auth def get_apikeys(): try: keys = load_json('apikeys.json') masked = {} for group, fields in keys.items(): if isinstance(fields, dict): masked[group] = {} for k, v in fields.items(): masked[group][k] = mask_value(v) else: masked[group] = mask_value(fields) return jsonify(masked) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/apikeys', methods=['POST']) @require_auth def save_apikeys(): try: d = request.get_json(force=True) group = d.get('group', '') data = d.get('data', {}) if not group or not isinstance(data, dict): return jsonify({'error': 'Invalid request: need group and data'}), 400 keys = load_json('apikeys.json') if group not in keys: keys[group] = {} # Only update values that are not masked and not empty for k, v in data.items(): if isinstance(v, str) and (is_masked(v) or v == ''): continue # Skip masked or empty values keys[group][k] = v save_json('apikeys.json', keys) return jsonify({'ok': True, 'group': group}) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Theme ─────────────────────────────────────────── @app.route('/api/theme') def get_theme(): return jsonify(load_json('theme.json')) @app.route('/api/theme', methods=['POST']) @require_auth def save_theme(): try: d = request.get_json(force=True) save_json('theme.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── SEO ───────────────────────────────────────────── @app.route('/api/seo') def get_seo(): return jsonify(load_json('seo.json')) @app.route('/api/seo', methods=['POST']) @require_auth def save_seo(): try: d = request.get_json(force=True) save_json('seo.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Contact Settings ─────────────────────────────── @app.route('/api/contact-settings') @require_auth def get_contact_settings(): return jsonify(load_json('contact_settings.json')) @app.route('/api/contact-settings', methods=['POST']) @require_auth def save_contact_settings(): try: d = request.get_json(force=True) save_json('contact_settings.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Contact Form (public) ────────────────────────── @app.route('/api/contact', methods=['POST']) def contact_form(): try: d = request.get_json(force=True) name = d.get('name', '').strip() email = d.get('email', '').strip() message = d.get('message', '').strip() if not name or not email or not message: return jsonify({'error': 'All fields are required'}), 400 # Check if form is enabled settings = load_json('contact_settings.json') if not settings.get('form_enabled', True): return jsonify({'error': 'Contact form is currently disabled'}), 403 contact_email = settings.get('email', '') auto_reply = settings.get('auto_reply', '') # Save message to messages.json regardless messages = load_json('messages.json') if not isinstance(messages, list): messages = [] messages.append({ 'name': name, 'email': email, 'message': message, 'timestamp': datetime.datetime.utcnow().isoformat() + 'Z' }) save_json('messages.json', messages) # Try to send email via SMTP if configured keys = load_json('apikeys.json') smtp_cfg = keys.get('smtp', {}) smtp_host = smtp_cfg.get('host', '') smtp_port = smtp_cfg.get('port', '587') smtp_user = smtp_cfg.get('user', '') smtp_pass = smtp_cfg.get('pass', '') email_sent = False if smtp_host and smtp_user and smtp_pass and contact_email: try: # Send notification to site owner msg = MIMEMultipart() msg['From'] = smtp_user msg['To'] = contact_email msg['Subject'] = f'[JAESWIFT] Contact from {name}' body = f"Name: {name}\nEmail: {email}\n\nMessage:\n{message}" msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(smtp_host, int(smtp_port)) server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) # Send auto-reply if configured if auto_reply: reply = MIMEText(auto_reply, 'plain') reply['From'] = smtp_user reply['To'] = email reply['Subject'] = 'Re: Your message to JAESWIFT' server.send_message(reply) server.quit() email_sent = True except Exception: pass # Email failed, but message is saved return jsonify({ 'ok': True, 'email_sent': email_sent, 'message': 'Message received. Thanks for reaching out!' }) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Backups ───────────────────────────────────────── @app.route('/api/backups/posts') @require_auth def backup_posts(): try: p = DATA_DIR / 'posts.json' if not p.exists(): return jsonify({'error': 'No posts data found'}), 404 return send_file(p, as_attachment=True, download_name='posts.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/tracks') @require_auth def backup_tracks(): try: p = DATA_DIR / 'tracks.json' if not p.exists(): return jsonify({'error': 'No tracks data found'}), 404 return send_file(p, as_attachment=True, download_name='tracks.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/settings') @require_auth def backup_settings(): try: p = DATA_DIR / 'settings.json' if not p.exists(): return jsonify({'error': 'No settings data found'}), 404 return send_file(p, as_attachment=True, download_name='settings.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/all') @require_auth def backup_all(): try: buf = io.BytesIO() with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf: for f in DATA_DIR.glob('*.json'): zf.write(f, f.name) buf.seek(0) ts = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') return send_file( buf, as_attachment=True, download_name=f'jaeswift_backup_{ts}.zip', mimetype='application/zip' ) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Run ───────────────────────────────────────────── if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)