#!/usr/bin/env python3 """JAESWIFT HUD Backend API""" import json, os, time, subprocess, random, datetime, hashlib, zipfile, io, smtplib from functools import wraps from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from flask import Flask, request, jsonify, abort, send_file from flask_cors import CORS import jwt import requests as req import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = Flask(__name__) CORS(app) DATA_DIR = Path(__file__).parent / 'data' JWT_SECRET = 'jaeswift-hud-s3cr3t-2026!x' ADMIN_USER = 'jae' ADMIN_PASS = 'HUDAdmin2026!' ARRAY_FILES = { 'posts.json', 'tracks.json', 'navigation.json', 'links.json', 'managed_services.json', 'messages.json' } # ─── JSON Error Handlers ───────────────────────────── @app.errorhandler(400) def bad_request(e): return jsonify({'error': 'Bad Request', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 400 @app.errorhandler(401) def unauthorized(e): return jsonify({'error': 'Unauthorized', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 401 @app.errorhandler(404) def not_found(e): return jsonify({'error': 'Not Found', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 404 @app.errorhandler(500) def server_error(e): return jsonify({'error': 'Internal Server Error', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 500 # ─── Helpers ───────────────────────────────────────── def load_json(name): p = DATA_DIR / name if p.exists(): with open(p) as f: return json.load(f) return [] if name in ARRAY_FILES else {} def save_json(name, data): p = DATA_DIR / name p.parent.mkdir(parents=True, exist_ok=True) with open(p, 'w') as f: json.dump(data, f, indent=2) def require_auth(fn): @wraps(fn) def wrapper(*a, **kw): auth = request.headers.get('Authorization', '') if not auth.startswith('Bearer '): abort(401, 'Missing token') try: jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256']) except Exception: abort(401, 'Invalid token') return fn(*a, **kw) return wrapper def shell(cmd): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5) return r.stdout.strip() except Exception: return '' # ─── Auth ──────────────────────────────────────────── @app.route('/api/auth/login', methods=['POST']) def login(): d = request.get_json(force=True, silent=True) or {} if d.get('username') == ADMIN_USER and d.get('password') == ADMIN_PASS: token = jwt.encode( {'user': ADMIN_USER, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}, JWT_SECRET, algorithm='HS256' ) return jsonify({'token': token}) return jsonify({'error': 'Invalid credentials'}), 401 @app.route('/api/auth/check') def auth_check(): auth = request.headers.get('Authorization', '') if not auth.startswith('Bearer '): return jsonify({'valid': False}), 401 try: jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256']) return jsonify({'valid': True, 'user': ADMIN_USER}) except Exception: return jsonify({'valid': False}), 401 # ─── Blog Posts ────────────────────────────────────── @app.route('/api/posts') def get_posts(): posts = load_json('posts.json') return jsonify(posts) @app.route('/api/posts/') def get_post(slug): posts = load_json('posts.json') for p in posts: if p.get('slug') == slug: return jsonify(p) abort(404) @app.route('/api/posts', methods=['POST']) @require_auth def create_post(): d = request.get_json(force=True) posts = load_json('posts.json') d['id'] = max((p.get('id', 0) for p in posts), default=0) + 1 d['date'] = d.get('date', datetime.date.today().isoformat()) d['word_count'] = len(d.get('content', '').split()) posts.append(d) save_json('posts.json', posts) return jsonify(d), 201 @app.route('/api/posts/', methods=['PUT']) @require_auth def update_post(slug): d = request.get_json(force=True) posts = load_json('posts.json') for i, p in enumerate(posts): if p.get('slug') == slug: d['id'] = p['id'] d['word_count'] = len(d.get('content', '').split()) posts[i] = {**p, **d} save_json('posts.json', posts) return jsonify(posts[i]) abort(404) @app.route('/api/posts/', methods=['DELETE']) @require_auth def delete_post(slug): posts = load_json('posts.json') posts = [p for p in posts if p.get('slug') != slug] save_json('posts.json', posts) return jsonify({'ok': True}) # ─── Server Stats ──────────────────────────────────── @app.route('/api/stats') def server_stats(): # CPU load = shell("cat /proc/loadavg | awk '{print $1}'") ncpu = shell("nproc") try: cpu_pct = round(float(load) / max(int(ncpu), 1) * 100, 1) except Exception: cpu_pct = 0 # Memory mem = shell("free | awk '/Mem:/{printf \"%.1f\", $3/$2*100}'") # Disk disk = shell("df / | awk 'NR==2{print $5}' | tr -d '%'") # Network (bytes since boot) net = shell("cat /proc/net/dev | awk '/eth0|ens/{print $2,$10}'") parts = net.split() rx = int(parts[0]) if len(parts) >= 2 else 0 tx = int(parts[1]) if len(parts) >= 2 else 0 # Docker running = shell("docker ps -q 2>/dev/null | wc -l") total = shell("docker ps -aq 2>/dev/null | wc -l") # Uptime up = shell("cat /proc/uptime | awk '{print $1}'") # Connections conns = shell("ss -s | awk '/TCP:/{print $2}'") return jsonify({ 'cpu_percent': cpu_pct, 'memory_percent': float(mem) if mem else 0, 'disk_percent': int(disk) if disk else 0, 'network_rx_bytes': rx, 'network_tx_bytes': tx, 'container_running': int(running) if running else 0, 'container_total': int(total) if total else 0, 'uptime_seconds': float(up) if up else 0, 'active_connections': int(conns) if conns else 0, 'load_avg': float(load) if load else 0, 'timestamp': time.time() }) # ─── Services Status ───────────────────────────────── @app.route('/api/services') def services(): svcs = [ {'name': 'Gitea', 'url': 'https://git.jaeswift.xyz'}, {'name': 'Plex', 'url': 'https://plex.jaeswift.xyz'}, {'name': 'Search', 'url': 'https://jaeswift.xyz/search'}, {'name': 'Yoink', 'url': 'https://jaeswift.xyz/yoink/'}, {'name': 'Archive', 'url': 'https://archive.jaeswift.xyz'}, {'name': 'Agent Zero', 'url': 'https://agentzero.jaeswift.xyz'}, {'name': 'Files', 'url': 'https://files.jaeswift.xyz'}, ] def check_service(s): try: t0 = time.time() r = req.get(s['url'], timeout=2, verify=False, allow_redirects=True) ms = round((time.time() - t0) * 1000) return {**s, 'status': 'online' if r.status_code < 500 else 'offline', 'response_time_ms': ms} except Exception: return {**s, 'status': 'offline', 'response_time_ms': 0} results = [None] * len(svcs) with ThreadPoolExecutor(max_workers=7) as executor: futures = {executor.submit(check_service, s): i for i, s in enumerate(svcs)} for future in as_completed(futures): results[futures[future]] = future.result() return jsonify(results) # ─── Weather ───────────────────────────────────────── @app.route('/api/weather') def weather(): try: r = req.get('https://wttr.in/Manchester?format=j1', timeout=5, headers={'User-Agent': 'jaeswift-hud'}) d = r.json() cur = d['current_condition'][0] return jsonify({ 'temp_c': int(cur['temp_C']), 'feels_like': int(cur['FeelsLikeC']), 'condition': cur['weatherDesc'][0]['value'], 'humidity': int(cur['humidity']), 'wind_kph': int(cur['windspeedKmph']), 'wind_dir': cur['winddir16Point'], 'icon': cur.get('weatherCode', ''), }) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Now Playing (random track) ────────────────────── @app.route('/api/nowplaying') def now_playing(): tracks = load_json('tracks.json') if not tracks: return jsonify({'artist': 'Unknown', 'track': 'Silence', 'album': ''}) t = random.choice(tracks) return jsonify(t) # ─── Tracks CRUD ───────────────────────────────────── @app.route('/api/tracks') def get_tracks(): return jsonify(load_json('tracks.json')) @app.route('/api/tracks', methods=['POST']) @require_auth def add_track(): d = request.get_json(force=True) tracks = load_json('tracks.json') tracks.append({ 'artist': d.get('artist', ''), 'track': d.get('track', ''), 'album': d.get('album', ''), }) save_json('tracks.json', tracks) return jsonify(tracks[-1]), 201 @app.route('/api/tracks/', methods=['DELETE']) @require_auth def delete_track(index): tracks = load_json('tracks.json') if 0 <= index < len(tracks): removed = tracks.pop(index) save_json('tracks.json', tracks) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Git Activity (from Gitea API) ─────────────────── @app.route('/api/git-activity') def git_activity(): try: r = req.get('https://git.jaeswift.xyz/api/v1/users/jae/heatmap', timeout=2, verify=False) heatmap = r.json() if r.status_code == 200 else [] r2 = req.get('https://git.jaeswift.xyz/api/v1/repos/search?sort=updated&limit=5&owner=jae', timeout=2, verify=False) repos = [] if r2.status_code == 200: data = r2.json().get('data', r2.json()) if isinstance(r2.json(), dict) else r2.json() for repo in (data if isinstance(data, list) else [])[:5]: repos.append({ 'name': repo.get('name', ''), 'updated': repo.get('updated_at', ''), 'stars': repo.get('stars_count', 0), 'language': repo.get('language', ''), }) return jsonify({'heatmap': heatmap, 'repos': repos}) except Exception as e: return jsonify({'heatmap': [], 'repos': [], 'error': str(e)}) # ─── Threat Feed (CVE feed) ────────────────────────── @app.route('/api/threats') def threats(): try: r = req.get('https://cve.circl.lu/api/last/8', timeout=8) cves = [] if r.status_code == 200: for item in r.json()[:8]: cves.append({ 'id': item.get('id', ''), 'summary': (item.get('summary', '') or '')[:120], 'published': item.get('Published', ''), 'cvss': item.get('cvss', 0), }) return jsonify(cves) except Exception: return jsonify([]) # ─── Settings ──────────────────────────────────────── @app.route('/api/settings') def get_settings(): return jsonify(load_json('settings.json')) @app.route('/api/settings', methods=['PUT']) @require_auth def update_settings(): d = request.get_json(force=True) save_json('settings.json', d) return jsonify(d) # ═════════════════════════════════════════════════════ # NEW ENDPOINTS # ═════════════════════════════════════════════════════ # ─── Homepage Config ───────────────────��───────────── @app.route('/api/homepage') def get_homepage(): return jsonify(load_json('homepage.json')) @app.route('/api/homepage', methods=['POST']) @require_auth def save_homepage(): try: d = request.get_json(force=True) save_json('homepage.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Managed Services ─────────────────────────────── @app.route('/api/services/managed') def get_managed_services(): return jsonify(load_json('managed_services.json')) @app.route('/api/services/managed', methods=['POST']) @require_auth def add_managed_service(): try: d = request.get_json(force=True) svcs = load_json('managed_services.json') if not isinstance(svcs, list): svcs = [] svcs.append({ 'name': d.get('name', ''), 'url': d.get('url', '') }) save_json('managed_services.json', svcs) return jsonify(svcs[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/services/managed/', methods=['DELETE']) @require_auth def delete_managed_service(index): svcs = load_json('managed_services.json') if not isinstance(svcs, list): svcs = [] if 0 <= index < len(svcs): removed = svcs.pop(index) save_json('managed_services.json', svcs) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Navigation ────────────────────────────────────── @app.route('/api/navigation') def get_navigation(): return jsonify(load_json('navigation.json')) @app.route('/api/navigation', methods=['POST']) @require_auth def add_navigation(): try: d = request.get_json(force=True) nav = load_json('navigation.json') if not isinstance(nav, list): nav = [] nav.append({ 'label': d.get('label', ''), 'url': d.get('url', ''), 'order': d.get('order', len(nav) + 1) }) save_json('navigation.json', nav) return jsonify(nav[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/navigation/', methods=['DELETE']) @require_auth def delete_navigation(index): nav = load_json('navigation.json') if not isinstance(nav, list): nav = [] if 0 <= index < len(nav): removed = nav.pop(index) save_json('navigation.json', nav) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── Links ─────────────────────────────────────────── @app.route('/api/links') def get_links(): return jsonify(load_json('links.json')) @app.route('/api/links', methods=['POST']) @require_auth def add_link(): try: d = request.get_json(force=True) links = load_json('links.json') if not isinstance(links, list): links = [] links.append({ 'name': d.get('name', ''), 'url': d.get('url', ''), 'icon': d.get('icon', ''), 'category': d.get('category', '') }) save_json('links.json', links) return jsonify(links[-1]), 201 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/links/', methods=['DELETE']) @require_auth def delete_link(index): links = load_json('links.json') if not isinstance(links, list): links = [] if 0 <= index < len(links): removed = links.pop(index) save_json('links.json', links) return jsonify({'ok': True, 'removed': removed}) abort(404) # ─── API Keys ──────────────────────────────────────── def mask_value(val): """Mask a string value, showing only last 4 chars if longer than 4.""" if not isinstance(val, str) or len(val) == 0: return val if len(val) <= 4: return '••••' return '••••••' + val[-4:] def is_masked(val): """Check if a value is a masked placeholder.""" if not isinstance(val, str): return False return '••••' in val @app.route('/api/apikeys') @require_auth def get_apikeys(): try: keys = load_json('apikeys.json') masked = {} for group, fields in keys.items(): if isinstance(fields, dict): masked[group] = {} for k, v in fields.items(): masked[group][k] = mask_value(v) else: masked[group] = mask_value(fields) return jsonify(masked) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/apikeys', methods=['POST']) @require_auth def save_apikeys(): try: d = request.get_json(force=True) group = d.get('group', '') data = d.get('data', {}) if not group or not isinstance(data, dict): return jsonify({'error': 'Invalid request: need group and data'}), 400 keys = load_json('apikeys.json') if group not in keys: keys[group] = {} # Only update values that are not masked and not empty for k, v in data.items(): if isinstance(v, str) and (is_masked(v) or v == ''): continue # Skip masked or empty values keys[group][k] = v save_json('apikeys.json', keys) return jsonify({'ok': True, 'group': group}) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Theme ─────────────────────────────────────────── @app.route('/api/theme') def get_theme(): return jsonify(load_json('theme.json')) @app.route('/api/theme', methods=['POST']) @require_auth def save_theme(): try: d = request.get_json(force=True) save_json('theme.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── SEO ───────────────────────────────────────────── @app.route('/api/seo') def get_seo(): return jsonify(load_json('seo.json')) @app.route('/api/seo', methods=['POST']) @require_auth def save_seo(): try: d = request.get_json(force=True) save_json('seo.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Contact Settings ─────────────────────────────── @app.route('/api/contact-settings') @require_auth def get_contact_settings(): return jsonify(load_json('contact_settings.json')) @app.route('/api/contact-settings', methods=['POST']) @require_auth def save_contact_settings(): try: d = request.get_json(force=True) save_json('contact_settings.json', d) return jsonify(d) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Contact Form (public) ────────────────────────── @app.route('/api/contact', methods=['POST']) def contact_form(): try: d = request.get_json(force=True) name = d.get('name', '').strip() email = d.get('email', '').strip() message = d.get('message', '').strip() if not name or not email or not message: return jsonify({'error': 'All fields are required'}), 400 # Check if form is enabled settings = load_json('contact_settings.json') if not settings.get('form_enabled', True): return jsonify({'error': 'Contact form is currently disabled'}), 403 contact_email = settings.get('email', '') auto_reply = settings.get('auto_reply', '') # Save message to messages.json regardless messages = load_json('messages.json') if not isinstance(messages, list): messages = [] messages.append({ 'name': name, 'email': email, 'message': message, 'timestamp': datetime.datetime.utcnow().isoformat() + 'Z' }) save_json('messages.json', messages) # Try to send email via SMTP if configured keys = load_json('apikeys.json') smtp_cfg = keys.get('smtp', {}) smtp_host = smtp_cfg.get('host', '') smtp_port = smtp_cfg.get('port', '587') smtp_user = smtp_cfg.get('user', '') smtp_pass = smtp_cfg.get('pass', '') email_sent = False if smtp_host and smtp_user and smtp_pass and contact_email: try: # Send notification to site owner msg = MIMEMultipart() msg['From'] = smtp_user msg['To'] = contact_email msg['Subject'] = f'[JAESWIFT] Contact from {name}' body = f"Name: {name}\nEmail: {email}\n\nMessage:\n{message}" msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(smtp_host, int(smtp_port)) server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) # Send auto-reply if configured if auto_reply: reply = MIMEText(auto_reply, 'plain') reply['From'] = smtp_user reply['To'] = email reply['Subject'] = 'Re: Your message to JAESWIFT' server.send_message(reply) server.quit() email_sent = True except Exception: pass # Email failed, but message is saved return jsonify({ 'ok': True, 'email_sent': email_sent, 'message': 'Message received. Thanks for reaching out!' }) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Backups ───────────────────────────────────────── @app.route('/api/backups/posts') @require_auth def backup_posts(): try: p = DATA_DIR / 'posts.json' if not p.exists(): return jsonify({'error': 'No posts data found'}), 404 return send_file(p, as_attachment=True, download_name='posts.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/tracks') @require_auth def backup_tracks(): try: p = DATA_DIR / 'tracks.json' if not p.exists(): return jsonify({'error': 'No tracks data found'}), 404 return send_file(p, as_attachment=True, download_name='tracks.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/settings') @require_auth def backup_settings(): try: p = DATA_DIR / 'settings.json' if not p.exists(): return jsonify({'error': 'No settings data found'}), 404 return send_file(p, as_attachment=True, download_name='settings.json', mimetype='application/json') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/backups/all') @require_auth def backup_all(): try: buf = io.BytesIO() with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf: for f in DATA_DIR.glob('*.json'): zf.write(f, f.name) buf.seek(0) ts = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') return send_file( buf, as_attachment=True, download_name=f'jaeswift_backup_{ts}.zip', mimetype='application/zip' ) except Exception as e: return jsonify({'error': str(e)}), 500 # ─── Run ───────────────────────────────────────────── if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)