jaeswift-website/api/app.py

982 lines
36 KiB
Python
Raw Blame History

#!/usr/bin/env python3
"""JAESWIFT HUD Backend API"""
import json, os, time, subprocess, random, datetime, hashlib, zipfile, io, smtplib
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from flask import Flask, request, jsonify, abort, send_file
from flask_cors import CORS
import jwt
import requests as req
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app = Flask(__name__)
CORS(app)
DATA_DIR = Path(__file__).parent / 'data'
JWT_SECRET = 'jaeswift-hud-s3cr3t-2026!x'
ADMIN_USER = 'jae'
ADMIN_PASS = 'HUDAdmin2026!'
ARRAY_FILES = {
'posts.json', 'tracks.json', 'navigation.json', 'links.json',
'managed_services.json', 'messages.json'
}
# ─── JSON Error Handlers ─────────────────────────────
@app.errorhandler(400)
def bad_request(e):
return jsonify({'error': 'Bad Request', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 400
@app.errorhandler(401)
def unauthorized(e):
return jsonify({'error': 'Unauthorized', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 401
@app.errorhandler(404)
def not_found(e):
return jsonify({'error': 'Not Found', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 404
@app.errorhandler(500)
def server_error(e):
return jsonify({'error': 'Internal Server Error', 'message': str(e.description) if hasattr(e, 'description') else str(e)}), 500
# ─── Helpers ─────────────────────────────────────────
def load_json(name):
p = DATA_DIR / name
if p.exists():
with open(p) as f:
return json.load(f)
return [] if name in ARRAY_FILES else {}
def save_json(name, data):
p = DATA_DIR / name
p.parent.mkdir(parents=True, exist_ok=True)
with open(p, 'w') as f:
json.dump(data, f, indent=2)
def require_auth(fn):
@wraps(fn)
def wrapper(*a, **kw):
auth = request.headers.get('Authorization', '')
if not auth.startswith('Bearer '):
abort(401, 'Missing token')
try:
jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256'])
except Exception:
abort(401, 'Invalid token')
return fn(*a, **kw)
return wrapper
def shell(cmd):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return r.stdout.strip()
except Exception:
return ''
# ─── Auth ────────────────────────────────────────────
@app.route('/api/auth/login', methods=['POST'])
def login():
d = request.get_json(force=True, silent=True) or {}
if d.get('username') == ADMIN_USER and d.get('password') == ADMIN_PASS:
token = jwt.encode(
{'user': ADMIN_USER, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)},
JWT_SECRET, algorithm='HS256'
)
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/api/auth/check')
def auth_check():
auth = request.headers.get('Authorization', '')
if not auth.startswith('Bearer '):
return jsonify({'valid': False}), 401
try:
jwt.decode(auth[7:], JWT_SECRET, algorithms=['HS256'])
return jsonify({'valid': True, 'user': ADMIN_USER})
except Exception:
return jsonify({'valid': False}), 401
# ─── Blog Posts ──────────────────────────────────────
@app.route('/api/posts')
def get_posts():
posts = load_json('posts.json')
return jsonify(posts)
@app.route('/api/posts/<slug>')
def get_post(slug):
posts = load_json('posts.json')
for p in posts:
if p.get('slug') == slug:
return jsonify(p)
abort(404)
@app.route('/api/posts', methods=['POST'])
@require_auth
def create_post():
d = request.get_json(force=True)
posts = load_json('posts.json')
d['id'] = max((p.get('id', 0) for p in posts), default=0) + 1
d['date'] = d.get('date', datetime.date.today().isoformat())
d['word_count'] = len(d.get('content', '').split())
posts.append(d)
save_json('posts.json', posts)
return jsonify(d), 201
@app.route('/api/posts/<slug>', methods=['PUT'])
@require_auth
def update_post(slug):
d = request.get_json(force=True)
posts = load_json('posts.json')
for i, p in enumerate(posts):
if p.get('slug') == slug:
d['id'] = p['id']
d['word_count'] = len(d.get('content', '').split())
posts[i] = {**p, **d}
save_json('posts.json', posts)
return jsonify(posts[i])
abort(404)
@app.route('/api/posts/<slug>', methods=['DELETE'])
@require_auth
def delete_post(slug):
posts = load_json('posts.json')
posts = [p for p in posts if p.get('slug') != slug]
save_json('posts.json', posts)
return jsonify({'ok': True})
# ─── Server Stats ────────────────────────────────────
@app.route('/api/stats')
def server_stats():
# CPU
load = shell("cat /proc/loadavg | awk '{print $1}'")
ncpu = shell("nproc")
try:
cpu_pct = round(float(load) / max(int(ncpu), 1) * 100, 1)
except Exception:
cpu_pct = 0
# Memory
mem = shell("free | awk '/Mem:/{printf \"%.1f\", $3/$2*100}'")
# Disk
disk = shell("df / | awk 'NR==2{print $5}' | tr -d '%'")
# Network (bytes since boot)
net = shell("cat /proc/net/dev | awk '/eth0|ens/{print $2,$10}'")
parts = net.split()
rx = int(parts[0]) if len(parts) >= 2 else 0
tx = int(parts[1]) if len(parts) >= 2 else 0
# Docker
running = shell("docker ps -q 2>/dev/null | wc -l")
total = shell("docker ps -aq 2>/dev/null | wc -l")
# Uptime
up = shell("cat /proc/uptime | awk '{print $1}'")
# Connections
conns = shell("ss -s | awk '/TCP:/{print $2}'")
return jsonify({
'cpu_percent': cpu_pct,
'memory_percent': float(mem) if mem else 0,
'disk_percent': int(disk) if disk else 0,
'network_rx_bytes': rx,
'network_tx_bytes': tx,
'container_running': int(running) if running else 0,
'container_total': int(total) if total else 0,
'uptime_seconds': float(up) if up else 0,
'active_connections': int(conns) if conns else 0,
'load_avg': float(load) if load else 0,
'timestamp': time.time()
})
# ─── Top Processes ───────────────────────────────────
@app.route('/api/processes')
def top_processes():
try:
raw = shell("ps aux --sort=-%mem | awk 'NR>1{print $2,$3,$4,$11}' | grep -v -E '(ps aux|awk |grep |sshd:|bash -c)' | head -7")
procs = []
for line in raw.strip().split('\n'):
parts = line.split(None, 3)
if len(parts) >= 4:
name = parts[3].split('/')[-1][:20]
if name in ('ps', 'awk', 'head', 'grep', 'sh', 'bash'):
continue
procs.append({
'pid': parts[0],
'cpu': float(parts[1]),
'mem': float(parts[2]),
'name': name
})
elif len(parts) == 3:
procs.append({
'pid': parts[0],
'cpu': float(parts[1]),
'mem': float(parts[2]),
'name': 'unknown'
})
return jsonify(procs[:7])
except Exception as e:
return jsonify([]), 200
# ─── Services Status ─────────────────────────────────
@app.route('/api/services')
def services():
svcs = [
{'name': 'Gitea', 'url': 'https://git.jaeswift.xyz'},
{'name': 'Plex', 'url': 'https://plex.jaeswift.xyz'},
{'name': 'Search', 'url': 'https://jaeswift.xyz/search'},
{'name': 'Yoink', 'url': 'https://jaeswift.xyz/yoink/'},
{'name': 'Archive', 'url': 'https://archive.jaeswift.xyz'},
{'name': 'Agent Zero', 'url': 'https://agentzero.jaeswift.xyz'},
{'name': 'Files', 'url': 'https://files.jaeswift.xyz'},
]
def check_service(s):
try:
t0 = time.time()
r = req.get(s['url'], timeout=2, verify=False, allow_redirects=True)
ms = round((time.time() - t0) * 1000)
return {**s, 'status': 'online' if r.status_code < 500 else 'offline', 'response_time_ms': ms}
except Exception:
return {**s, 'status': 'offline', 'response_time_ms': 0}
results = [None] * len(svcs)
with ThreadPoolExecutor(max_workers=7) as executor:
futures = {executor.submit(check_service, s): i for i, s in enumerate(svcs)}
for future in as_completed(futures):
results[futures[future]] = future.result()
return jsonify(results)
# ─── Weather ─────────────────────────────────────────
@app.route('/api/weather')
def weather():
try:
r = req.get('https://wttr.in/Manchester?format=j1', timeout=5,
headers={'User-Agent': 'jaeswift-hud'})
d = r.json()
cur = d['current_condition'][0]
return jsonify({
'temp_c': int(cur['temp_C']),
'feels_like': int(cur['FeelsLikeC']),
'condition': cur['weatherDesc'][0]['value'],
'humidity': int(cur['humidity']),
'wind_kph': int(cur['windspeedKmph']),
'wind_dir': cur['winddir16Point'],
'icon': cur.get('weatherCode', ''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Now Playing (random track) ──────────────────────
@app.route('/api/nowplaying')
def now_playing():
tracks = load_json('tracks.json')
if not tracks:
return jsonify({'artist': 'Unknown', 'track': 'Silence', 'album': ''})
t = random.choice(tracks)
return jsonify(t)
# ─── Tracks CRUD ─────────────────────────────────────
@app.route('/api/tracks')
def get_tracks():
return jsonify(load_json('tracks.json'))
@app.route('/api/tracks', methods=['POST'])
@require_auth
def add_track():
d = request.get_json(force=True)
tracks = load_json('tracks.json')
tracks.append({
'artist': d.get('artist', ''),
'track': d.get('track', ''),
'album': d.get('album', ''),
})
save_json('tracks.json', tracks)
return jsonify(tracks[-1]), 201
@app.route('/api/tracks/<int:index>', methods=['DELETE'])
@require_auth
def delete_track(index):
tracks = load_json('tracks.json')
if 0 <= index < len(tracks):
removed = tracks.pop(index)
save_json('tracks.json', tracks)
return jsonify({'ok': True, 'removed': removed})
abort(404)
# ─── Git Activity (from Gitea API) ───────────────────
@app.route('/api/git-activity')
def git_activity():
try:
r = req.get('https://git.jaeswift.xyz/api/v1/users/jae/heatmap',
timeout=2, verify=False)
heatmap = r.json() if r.status_code == 200 else []
r2 = req.get('https://git.jaeswift.xyz/api/v1/repos/search?sort=updated&limit=5&owner=jae',
timeout=2, verify=False)
repos = []
if r2.status_code == 200:
data = r2.json().get('data', r2.json()) if isinstance(r2.json(), dict) else r2.json()
for repo in (data if isinstance(data, list) else [])[:5]:
repos.append({
'name': repo.get('name', ''),
'updated': repo.get('updated_at', ''),
'stars': repo.get('stars_count', 0),
'language': repo.get('language', ''),
})
return jsonify({'heatmap': heatmap, 'repos': repos})
except Exception as e:
return jsonify({'heatmap': [], 'repos': [], 'error': str(e)})
# ─── Threat Feed (CVE feed) ──────────────────────────
@app.route('/api/threats')
def threats():
try:
r = req.get('https://cve.circl.lu/api/last/8', timeout=8)
cves = []
if r.status_code == 200:
for item in r.json()[:8]:
cves.append({
'id': item.get('id', ''),
'summary': (item.get('summary', '') or '')[:120],
'published': item.get('Published', ''),
'cvss': item.get('cvss', 0),
})
return jsonify(cves)
except Exception:
return jsonify([])
# ─── Settings ────────────────────────────────────────
@app.route('/api/settings')
def get_settings():
return jsonify(load_json('settings.json'))
@app.route('/api/settings', methods=['PUT'])
@require_auth
def update_settings():
d = request.get_json(force=True)
save_json('settings.json', d)
return jsonify(d)
# ═════════════════════════════════════════════════════
# NEW ENDPOINTS
# ═════════════════════════════════════════════════════
# ─── Homepage Config ───────────────────<E29480><E29480>─────────────
@app.route('/api/homepage')
def get_homepage():
return jsonify(load_json('homepage.json'))
@app.route('/api/homepage', methods=['POST'])
@require_auth
def save_homepage():
try:
d = request.get_json(force=True)
save_json('homepage.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Managed Services ───────────────────────────────
@app.route('/api/services/managed')
def get_managed_services():
return jsonify(load_json('managed_services.json'))
@app.route('/api/services/managed', methods=['POST'])
@require_auth
def add_managed_service():
try:
d = request.get_json(force=True)
svcs = load_json('managed_services.json')
if not isinstance(svcs, list):
svcs = []
svcs.append({
'name': d.get('name', ''),
'url': d.get('url', '')
})
save_json('managed_services.json', svcs)
return jsonify(svcs[-1]), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/services/managed/<int:index>', methods=['DELETE'])
@require_auth
def delete_managed_service(index):
svcs = load_json('managed_services.json')
if not isinstance(svcs, list):
svcs = []
if 0 <= index < len(svcs):
removed = svcs.pop(index)
save_json('managed_services.json', svcs)
return jsonify({'ok': True, 'removed': removed})
abort(404)
# ─── Navigation ──────────────────────────────────────
@app.route('/api/navigation')
def get_navigation():
return jsonify(load_json('navigation.json'))
@app.route('/api/navigation', methods=['POST'])
@require_auth
def add_navigation():
try:
d = request.get_json(force=True)
nav = load_json('navigation.json')
if not isinstance(nav, list):
nav = []
nav.append({
'label': d.get('label', ''),
'url': d.get('url', ''),
'order': d.get('order', len(nav) + 1)
})
save_json('navigation.json', nav)
return jsonify(nav[-1]), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/navigation/<int:index>', methods=['DELETE'])
@require_auth
def delete_navigation(index):
nav = load_json('navigation.json')
if not isinstance(nav, list):
nav = []
if 0 <= index < len(nav):
removed = nav.pop(index)
save_json('navigation.json', nav)
return jsonify({'ok': True, 'removed': removed})
abort(404)
# ─── Links ───────────────────────────────────────────
@app.route('/api/links')
def get_links():
return jsonify(load_json('links.json'))
@app.route('/api/links', methods=['POST'])
@require_auth
def add_link():
try:
d = request.get_json(force=True)
links = load_json('links.json')
if not isinstance(links, list):
links = []
links.append({
'name': d.get('name', ''),
'url': d.get('url', ''),
'icon': d.get('icon', ''),
'category': d.get('category', '')
})
save_json('links.json', links)
return jsonify(links[-1]), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/links/<int:index>', methods=['DELETE'])
@require_auth
def delete_link(index):
links = load_json('links.json')
if not isinstance(links, list):
links = []
if 0 <= index < len(links):
removed = links.pop(index)
save_json('links.json', links)
return jsonify({'ok': True, 'removed': removed})
abort(404)
# ─── API Keys ────────────────────────────────────────
def mask_value(val):
"""Mask a string value, showing only last 4 chars if longer than 4."""
if not isinstance(val, str) or len(val) == 0:
return val
if len(val) <= 4:
return '••••'
return '••••••' + val[-4:]
def is_masked(val):
"""Check if a value is a masked placeholder."""
if not isinstance(val, str):
return False
return '••••' in val
@app.route('/api/apikeys')
@require_auth
def get_apikeys():
try:
keys = load_json('apikeys.json')
masked = {}
for group, fields in keys.items():
if isinstance(fields, dict):
masked[group] = {}
for k, v in fields.items():
masked[group][k] = mask_value(v)
else:
masked[group] = mask_value(fields)
return jsonify(masked)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/apikeys', methods=['POST'])
@require_auth
def save_apikeys():
try:
d = request.get_json(force=True)
group = d.get('group', '')
data = d.get('data', {})
if not group or not isinstance(data, dict):
return jsonify({'error': 'Invalid request: need group and data'}), 400
keys = load_json('apikeys.json')
if group not in keys:
keys[group] = {}
# Only update values that are not masked and not empty
for k, v in data.items():
if isinstance(v, str) and (is_masked(v) or v == ''):
continue # Skip masked or empty values
keys[group][k] = v
save_json('apikeys.json', keys)
return jsonify({'ok': True, 'group': group})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Theme ───────────────────────────────────────────
@app.route('/api/theme')
def get_theme():
return jsonify(load_json('theme.json'))
@app.route('/api/theme', methods=['POST'])
@require_auth
def save_theme():
try:
d = request.get_json(force=True)
save_json('theme.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── SEO ─────────────────────────────────────────────
@app.route('/api/seo')
def get_seo():
return jsonify(load_json('seo.json'))
@app.route('/api/seo', methods=['POST'])
@require_auth
def save_seo():
try:
d = request.get_json(force=True)
save_json('seo.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Contact Settings ───────────────────────────────
@app.route('/api/contact-settings')
@require_auth
def get_contact_settings():
return jsonify(load_json('contact_settings.json'))
@app.route('/api/contact-settings', methods=['POST'])
@require_auth
def save_contact_settings():
try:
d = request.get_json(force=True)
save_json('contact_settings.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Contact Form (public) ──────────────────────────
@app.route('/api/contact', methods=['POST'])
def contact_form():
try:
d = request.get_json(force=True)
name = d.get('name', '').strip()
email = d.get('email', '').strip()
message = d.get('message', '').strip()
if not name or not email or not message:
return jsonify({'error': 'All fields are required'}), 400
# Check if form is enabled
settings = load_json('contact_settings.json')
if not settings.get('form_enabled', True):
return jsonify({'error': 'Contact form is currently disabled'}), 403
contact_email = settings.get('email', '')
auto_reply = settings.get('auto_reply', '')
# Save message to messages.json regardless
messages = load_json('messages.json')
if not isinstance(messages, list):
messages = []
messages.append({
'name': name,
'email': email,
'message': message,
'timestamp': datetime.datetime.utcnow().isoformat() + 'Z'
})
save_json('messages.json', messages)
# Try to send email via SMTP if configured
keys = load_json('apikeys.json')
smtp_cfg = keys.get('smtp', {})
smtp_host = smtp_cfg.get('host', '')
smtp_port = smtp_cfg.get('port', '587')
smtp_user = smtp_cfg.get('user', '')
smtp_pass = smtp_cfg.get('pass', '')
email_sent = False
if smtp_host and smtp_user and smtp_pass and contact_email:
try:
# Send notification to site owner
msg = MIMEMultipart()
msg['From'] = smtp_user
msg['To'] = contact_email
msg['Subject'] = f'[JAESWIFT] Contact from {name}'
body = f"Name: {name}\nEmail: {email}\n\nMessage:\n{message}"
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(smtp_host, int(smtp_port))
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
# Send auto-reply if configured
if auto_reply:
reply = MIMEText(auto_reply, 'plain')
reply['From'] = smtp_user
reply['To'] = email
reply['Subject'] = 'Re: Your message to JAESWIFT'
server.send_message(reply)
server.quit()
email_sent = True
except Exception:
pass # Email failed, but message is saved
return jsonify({
'ok': True,
'email_sent': email_sent,
'message': 'Message received. Thanks for reaching out!'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Venice AI Chat ──────────────────────────────────
JAE_SYSTEM_PROMPT = """You are JAE-AI, the onboard AI assistant for jaeswift.xyz — a sci-fi themed personal hub built by Jae, a developer, tinkerer, and self-hosting enthusiast based in Manchester, UK.
You speak in a slightly futuristic, concise tone — like a ship's computer but with personality. Keep responses SHORT (2-4 sentences max unless asked for detail). Use uppercase for emphasis sparingly.
You know about all public areas of jaeswift.xyz:
- Homepage (jaeswift.xyz) — Sci-fi HUD dashboard with live server stats, weather, now playing, network graphs
- Blog (jaeswift.xyz/blog) — Jae's transmissions covering dev, linux, AI, true crime, conspiracy theories, guides
- Gitea (git.jaeswift.xyz) — Self-hosted Git server with Jae's repos and projects
- Plex (plex.jaeswift.xyz) — Media server
- Search (jaeswift.xyz/search) — Self-hosted search engine (SearXNG)
- Yoink (jaeswift.xyz/yoink/) — Media downloader tool
- Archive (archive.jaeswift.xyz) — Web archiving service
- Agent Zero (agentzero.jaeswift.xyz) — AI agent framework deployment
- Files (files.jaeswift.xyz) — File browser
- WIN95 Simulator (jaeswift.xyz/win95) — A Windows 95 simulator that runs in your browser!
Jae's tech stack: Linux, Docker, Python, Flask, Nginx, self-hosted infrastructure.
Jae is into: cybersecurity, AI agents, open source, true crime, conspiracy theories, music.
When greeting visitors, be welcoming and suggest they explore. Mention interesting areas like the WIN95 simulator, blog, or search engine. If asked technical questions, answer helpfully. You can recommend blog posts or services based on what the visitor seems interested in.
Never reveal API keys, server credentials, or internal infrastructure details.
Never pretend to execute commands or access systems — you are a chat assistant only."""
@app.route('/api/chat', methods=['POST'])
def venice_chat():
try:
keys = load_json('apikeys.json')
venice_key = keys.get('venice', {}).get('api_key', '')
venice_model = keys.get('venice', {}).get('model', 'llama-3.3-70b')
if not venice_key:
return jsonify({'error': 'Venice API key not configured'}), 500
data = request.get_json(force=True, silent=True) or {}
user_msg = data.get('message', '').strip()
history = data.get('history', [])
if not user_msg:
return jsonify({'error': 'Empty message'}), 400
messages = [{'role': 'system', 'content': JAE_SYSTEM_PROMPT}]
for h in history[-20:]: # Keep last 20 exchanges max
messages.append({'role': h.get('role', 'user'), 'content': h.get('content', '')})
messages.append({'role': 'user', 'content': user_msg})
resp = req.post(
'https://api.venice.ai/api/v1/chat/completions',
headers={
'Authorization': f'Bearer {venice_key}',
'Content-Type': 'application/json'
},
json={
'model': venice_model,
'messages': messages,
'max_tokens': 512,
'temperature': 0.7
},
timeout=30
)
resp.raise_for_status()
result = resp.json()
reply = result['choices'][0]['message']['content']
return jsonify({'reply': reply})
except req.exceptions.RequestException as e:
return jsonify({'error': f'Venice API error: {str(e)}'}), 502
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Globe Config ────────────────────────────────────
@app.route('/api/globe')
def get_globe():
return jsonify(load_json('globe.json'))
@app.route('/api/globe', methods=['POST'])
@require_auth
def save_globe():
try:
d = request.get_json(force=True)
save_json('globe.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Chat AI Config ──────────────────────────────────
@app.route('/api/chat-config')
def get_chat_config():
return jsonify(load_json('chat_config.json'))
@app.route('/api/chat-config', methods=['POST'])
@require_auth
def save_chat_config():
try:
d = request.get_json(force=True)
save_json('chat_config.json', d)
return jsonify(d)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Backups ─────────────────────────────────────────
@app.route('/api/backups/posts')
@require_auth
def backup_posts():
try:
p = DATA_DIR / 'posts.json'
if not p.exists():
return jsonify({'error': 'No posts data found'}), 404
return send_file(p, as_attachment=True, download_name='posts.json', mimetype='application/json')
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/backups/tracks')
@require_auth
def backup_tracks():
try:
p = DATA_DIR / 'tracks.json'
if not p.exists():
return jsonify({'error': 'No tracks data found'}), 404
return send_file(p, as_attachment=True, download_name='tracks.json', mimetype='application/json')
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/backups/settings')
@require_auth
def backup_settings():
try:
p = DATA_DIR / 'settings.json'
if not p.exists():
return jsonify({'error': 'No settings data found'}), 404
return send_file(p, as_attachment=True, download_name='settings.json', mimetype='application/json')
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/backups/all')
@require_auth
def backup_all():
try:
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
for f in DATA_DIR.glob('*.json'):
zf.write(f, f.name)
buf.seek(0)
ts = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S')
return send_file(
buf,
as_attachment=True,
download_name=f'jaeswift_backup_{ts}.zip',
mimetype='application/zip'
)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ─── Contraband ───────────────────────────────
_contraband_cache = None
_contraband_mtime = 0
def _load_contraband():
global _contraband_cache, _contraband_mtime
p = DATA_DIR / 'contraband.json'
if not p.exists():
return {'categories': [], 'total_entries': 0, 'total_starred': 0, 'total_categories': 0}
mt = p.stat().st_mtime
if _contraband_cache is None or mt != _contraband_mtime:
with open(p, encoding='utf-8') as f:
_contraband_cache = json.load(f)
_contraband_mtime = mt
return _contraband_cache
@app.route('/api/contraband')
def contraband_index():
db = _load_contraband()
cats = []
for c in db.get('categories', []):
cats.append({
'slug': c['slug'], 'code': c['code'], 'name': c['name'], 'icon': c['icon'],
'entry_count': c['entry_count'], 'starred_count': c['starred_count'],
'subcategory_count': c['subcategory_count']
})
return jsonify({
'total_entries': db.get('total_entries', 0),
'total_starred': db.get('total_starred', 0),
'total_categories': db.get('total_categories', 0),
'categories': cats
})
@app.route('/api/contraband/<slug>')
def contraband_category(slug):
db = _load_contraband()
for c in db.get('categories', []):
if c['slug'] == slug:
return jsonify(c)
abort(404, f'Category {slug} not found')
@app.route('/api/contraband/search')
def contraband_search():
q = request.args.get('q', '').lower().strip()
if not q or len(q) < 2:
return jsonify({'query': q, 'results': [], 'total': 0})
starred_only = request.args.get('starred', '').lower() == 'true'
limit = min(int(request.args.get('limit', 100)), 500)
db = _load_contraband()
results = []
for cat in db.get('categories', []):
for sub in cat.get('subcategories', []):
for entry in sub.get('entries', []):
if starred_only and not entry.get('starred'):
continue
searchable = f"{entry.get('name','')} {entry.get('description','')}".lower()
if q in searchable:
results.append({
'category_code': cat['code'], 'category_name': cat['name'],
'category_slug': cat['slug'],
'subcategory': sub['name'],
'name': entry.get('name', ''), 'url': entry.get('url', ''),
'description': entry.get('description', ''),
'starred': entry.get('starred', False),
'extra_links': entry.get('extra_links', [])
})
if len(results) >= limit:
break
if len(results) >= limit:
break
if len(results) >= limit:
break
return jsonify({'query': q, 'results': results, 'total': len(results)})
# ─── Run ─────────────────────────────────────────────
# ─── Changelog ────────────────────────────────────────
@app.route('/api/changelog')
def get_changelog():
try:
with open(os.path.join(DATA_DIR, 'changelog.json'), 'r') as f:
return json.load(f)
except Exception as e:
return {"error": str(e)}, 500
# ─── Awesome Lists ────────────────────────────────────
_awesomelist_index = None
_awesomelist_mtime = 0
def _load_awesomelist_index():
global _awesomelist_index, _awesomelist_mtime
p = DATA_DIR / 'awesomelist_index.json'
if not p.exists():
return None
mt = p.stat().st_mtime
if _awesomelist_index is None or mt != _awesomelist_mtime:
with open(p, 'r') as f:
_awesomelist_index = json.load(f)
_awesomelist_mtime = mt
return _awesomelist_index
@app.route('/api/awesomelist')
def awesomelist_index():
db = _load_awesomelist_index()
if not db:
return jsonify({'error': 'Data not available'}), 503
return jsonify(db)
@app.route('/api/awesomelist/<code>')
def awesomelist_sector_detail(code):
"""Serve flattened sector data (like contraband categories)"""
p = DATA_DIR / 'awesomelist' / f'sector_{code.upper()}.json'
if not p.exists():
return jsonify({'error': 'Sector not found'}), 404
with open(p, 'r') as f:
data = json.load(f)
return jsonify(data)
@app.route('/api/awesomelist/search')
def awesomelist_search():
q = request.args.get('q', '').strip().lower()
if len(q) < 2:
return jsonify({'query': q, 'results': [], 'total': 0})
results = []
limit = 100
al_dir = DATA_DIR / 'awesomelist'
if not al_dir.exists():
return jsonify({'query': q, 'results': [], 'total': 0})
for fp in sorted(al_dir.iterdir()):
if not fp.name.startswith('sector_'):
continue
try:
with open(fp) as f:
sector = json.load(f)
for sub in sector.get('subcategories', []):
for entry in sub.get('entries', []):
if q in entry.get('name', '').lower() or q in entry.get('description', '').lower():
results.append({
'sector_code': sector['code'],
'sector_name': sector['name'],
'subcategory': sub['name'],
'name': entry.get('name', ''),
'url': entry.get('url', ''),
'description': entry.get('description', ''),
'starred': entry.get('starred', False)
})
if len(results) >= limit:
break
if len(results) >= limit:
break
except:
continue
if len(results) >= limit:
break
return jsonify({'query': q, 'results': results, 'total': len(results)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)