#!/usr/bin/env python3 """JAESWIFT Visitor Intelligence Endpoints Provides /api/visitor/* endpoints: /api/visitor/scan - scan current visitor (IP/geo/UA/device/threat) /api/visitor/recent-arcs - last N visitor lat/lon pairs for globe traffic arcs """ import os, re, time, socket, json from pathlib import Path from datetime import datetime, timezone from collections import OrderedDict, Counter from functools import lru_cache from flask import Blueprint, jsonify, request visitor_bp = Blueprint('visitor', __name__) DATA_DIR = Path(__file__).parent / 'data' NGINX_LOG = '/var/log/nginx/access.log' GEOIP_PATHS = [ '/usr/share/GeoIP/GeoLite2-Country.mmdb', '/usr/share/GeoIP/GeoLite2-City.mmdb', '/var/lib/GeoIP/GeoLite2-Country.mmdb', ] CENTROIDS_FILE = DATA_DIR / 'country_centroids.json' # ─── Lazy imports (optional deps) ────────────────────── _geoip_reader = None _geoip_has_city = False def _get_geoip(): global _geoip_reader, _geoip_has_city if _geoip_reader is not None: return _geoip_reader try: import geoip2.database for p in GEOIP_PATHS: if os.path.exists(p): _geoip_reader = geoip2.database.Reader(p) _geoip_has_city = 'City' in p return _geoip_reader except Exception: pass return None def _parse_ua(ua_string): """Parse user-agent. Uses user-agents lib if available, else crude regex.""" try: from user_agents import parse ua = parse(ua_string or '') browser_family = ua.browser.family or 'Unknown' browser_version = '.'.join(str(x) for x in ua.browser.version[:2] if x is not None) or '' os_family = ua.os.family or 'Unknown' os_version = '.'.join(str(x) for x in ua.os.version[:2] if x is not None) or '' device = 'Mobile' if ua.is_mobile else ('Tablet' if ua.is_tablet else ('Bot' if ua.is_bot else 'Desktop')) return { 'browser': browser_family, 'browser_version': browser_version, 'os': os_family, 'os_version': os_version, 'device': device, 'is_bot': bool(ua.is_bot), } except Exception: s = (ua_string or '').lower() browser = 'Unknown'; bver = '' if 'firefox' in s: browser = 'Firefox' m = re.search(r'firefox/([0-9.]+)', s); bver = m.group(1)[:6] if m else '' elif 'edg/' in s: browser = 'Edge' m = re.search(r'edg/([0-9.]+)', s); bver = m.group(1)[:6] if m else '' elif 'chrome' in s: browser = 'Chrome' m = re.search(r'chrome/([0-9.]+)', s); bver = m.group(1)[:6] if m else '' elif 'safari' in s: browser = 'Safari' is_bot = any(k in s for k in ('bot', 'crawl', 'spider', 'wget', 'curl')) os_family = 'Unknown' if 'windows' in s: os_family = 'Windows' elif 'mac os' in s or 'macintosh' in s: os_family = 'macOS' elif 'android' in s: os_family = 'Android' elif 'iphone' in s or 'ipad' in s or 'ios' in s: os_family = 'iOS' elif 'linux' in s: os_family = 'Linux' device = 'Mobile' if ('mobile' in s or 'android' in s or 'iphone' in s) else ('Bot' if is_bot else 'Desktop') return { 'browser': browser, 'browser_version': bver, 'os': os_family, 'os_version': '', 'device': device, 'is_bot': is_bot, } def _mask_ip(ip): if not ip: return 'UNKNOWN' if ':' in ip: # IPv6 — mask middle groups parts = ip.split(':') if len(parts) >= 3: return f"{parts[0]}:****:****:{parts[-1]}" return ip parts = ip.split('.') if len(parts) == 4: return f"{parts[0]}.***.***.{ parts[3]}" return ip def _client_ip(): xff = request.headers.get('X-Forwarded-For', '') if xff: return xff.split(',')[0].strip() xr = request.headers.get('X-Real-IP', '') if xr: return xr.strip() return request.remote_addr or '' def _reverse_dns(ip, timeout=1.0): try: socket.setdefaulttimeout(timeout) host, _, _ = socket.gethostbyaddr(ip) return host except Exception: return '' finally: socket.setdefaulttimeout(None) def _isp_guess(hostname): """Guess ISP from reverse DNS hostname — crude but free.""" if not hostname: return 'REDACTED' h = hostname.lower() known = { 'bt.com': 'British Telecommunications', 'btcentralplus': 'British Telecommunications', 'virginm.net': 'Virgin Media', 'sky.com': 'Sky Broadband', 'talktalk': 'TalkTalk', 'plus.net': 'Plusnet', 'vodafone': 'Vodafone', 'three.co.uk': 'Three UK', 'ee.co.uk': 'EE', 'comcast': 'Comcast', 'verizon': 'Verizon', 'amazonaws': 'Amazon AWS', 'googleusercontent': 'Google Cloud', 'googlebot': 'Google (Bot)', 'azure': 'Microsoft Azure', 'hetzner': 'Hetzner', 'digitalocean': 'DigitalOcean', 'ovh': 'OVH', 'cloudflare': 'Cloudflare', 'linode': 'Linode', 'deutsche-telekom': 'Deutsche Telekom', 'telekom': 'Deutsche Telekom', 'orange.fr': 'Orange', 'free.fr': 'Free', } for key, val in known.items(): if key in h: return val # fallback: extract last 2 labels as domain parts = h.split('.') if len(parts) >= 2: return parts[-2].capitalize() + '.' + parts[-1] return 'REDACTED' def _country_flag(cc): if not cc or len(cc) != 2: return '' try: return chr(0x1F1E6 + ord(cc[0].upper()) - ord('A')) + chr(0x1F1E6 + ord(cc[1].upper()) - ord('A')) except Exception: return '' # ─── Rate limiting (in-memory, simple) ───────────────── _rate_cache = OrderedDict() _RATE_LIMIT_SECS = 10 def _rate_limited(ip): now = time.time() # purge old expired = [k for k, t in _rate_cache.items() if now - t > _RATE_LIMIT_SECS] for k in expired: _rate_cache.pop(k, None) last = _rate_cache.get(ip) if last and now - last < _RATE_LIMIT_SECS: return True _rate_cache[ip] = now # cap size while len(_rate_cache) > 1000: _rate_cache.popitem(last=False) return False # ─── Country centroids (lat/lon) for arcs ────────────── _CENTROIDS = None def _load_centroids(): global _CENTROIDS if _CENTROIDS is not None: return _CENTROIDS if CENTROIDS_FILE.exists(): try: with open(CENTROIDS_FILE) as f: _CENTROIDS = json.load(f) return _CENTROIDS except Exception: pass _CENTROIDS = {} return _CENTROIDS # ─── /api/visitor/scan ───────────────────────────────── @visitor_bp.route('/api/visitor/scan') def visitor_scan(): ip = _client_ip() ua_string = request.headers.get('User-Agent', '') lang = request.headers.get('Accept-Language', 'en').split(',')[0].strip() if _rate_limited(ip): return jsonify({'error': 'rate_limited', 'retry_after': _RATE_LIMIT_SECS}), 429 country = 'UNKNOWN' country_code = 'XX' city = '' latlon = None reader = _get_geoip() if reader and ip: try: if _geoip_has_city: resp = reader.city(ip) country = resp.country.name or 'UNKNOWN' country_code = resp.country.iso_code or 'XX' city = resp.city.name or '' if resp.location.latitude is not None: latlon = [resp.location.latitude, resp.location.longitude] else: resp = reader.country(ip) country = resp.country.name or 'UNKNOWN' country_code = resp.country.iso_code or 'XX' except Exception: pass ua_info = _parse_ua(ua_string) # ISP via reverse DNS (1s timeout) hostname = _reverse_dns(ip, timeout=1.0) if ip else '' isp = _isp_guess(hostname) # Threat heuristic threat_level = 'GREEN' threat_reason = 'TRUSTED OPERATOR' if ua_info.get('is_bot'): threat_level = 'AMBER' threat_reason = 'AUTOMATED AGENT DETECTED' # crude TOR heuristic via hostname is_tor = 'tor-exit' in hostname.lower() or 'torproject' in hostname.lower() if is_tor: threat_level = 'AMBER' threat_reason = 'TOR EXIT NODE' return jsonify({ 'ip_masked': _mask_ip(ip), 'country': country, 'country_code': country_code, 'country_flag': _country_flag(country_code), 'city': city, 'isp': isp, 'hostname': hostname if hostname else '', 'browser': ua_info['browser'], 'browser_version': ua_info['browser_version'], 'os': ua_info['os'], 'os_version': ua_info['os_version'], 'device': ua_info['device'], 'language': lang, 'threat_level': threat_level, 'threat_reason': threat_reason, 'is_tor': is_tor, 'is_bot': ua_info['is_bot'], 'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), }) # ─── /api/visitor/recent-arcs ────────────────────────── _arcs_cache = {'ts': 0, 'data': []} _ARCS_CACHE_SECS = 300 def _parse_nginx_recent_ips(limit=20000): """Read tail of nginx log, extract (ip, timestamp, path) from recent visits.""" if not os.path.exists(NGINX_LOG): return [] try: import subprocess r = subprocess.run(['tail', '-n', str(limit), NGINX_LOG], capture_output=True, text=True, timeout=8) lines = r.stdout.strip().split('\n') except Exception: return [] # Typical combined log: IP - - [dd/Mmm/yyyy:HH:MM:SS +0000] "METHOD /path HTTP/1.1" status size "ref" "ua" pat = re.compile(r'^(\S+) .* \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+)') rows = [] for line in lines: m = pat.match(line) if not m: continue ip, ts, method, path, status = m.groups() if method != 'GET': continue # skip asset/api requests for arc relevance if path.startswith(('/api/', '/css/', '/js/', '/assets/', '/fonts/', '/mascot/')): continue if any(path.endswith(ext) for ext in ('.ico', '.png', '.jpg', '.svg', '.webp', '.woff', '.woff2')): continue rows.append((ip, ts, path)) return rows @visitor_bp.route('/api/visitor/recent-arcs') def visitor_recent_arcs(): now = time.time() if _arcs_cache['data'] and now - _arcs_cache['ts'] < _ARCS_CACHE_SECS: return jsonify(_arcs_cache['data']) rows = _parse_nginx_recent_ips(20000) reader = _get_geoip() centroids = _load_centroids() seen_ips = OrderedDict() # preserve order, last-seen per IP for ip, ts, path in rows: seen_ips[ip] = (ts, path) # Build arcs — most recent 50 unique arcs = [] items = list(seen_ips.items())[-200:] items.reverse() # most recent first for ip, (ts, path) in items: if len(arcs) >= 50: break cc = 'XX' country_name = 'Unknown' lat = lon = None if reader: try: if _geoip_has_city: r = reader.city(ip) cc = r.country.iso_code or 'XX' country_name = r.country.name or 'Unknown' if r.location.latitude is not None: lat, lon = r.location.latitude, r.location.longitude else: r = reader.country(ip) cc = r.country.iso_code or 'XX' country_name = r.country.name or 'Unknown' except Exception: continue if lat is None and cc in centroids: lat, lon = centroids[cc][0], centroids[cc][1] if lat is None: continue # jitter slightly so multiple from same country don't overlap exactly import random lat += (random.random() - 0.5) * 1.5 lon += (random.random() - 0.5) * 1.5 arcs.append({ 'country_code': cc, 'country_name': country_name, 'lat': round(lat, 3), 'lon': round(lon, 3), 'timestamp': ts, 'page_viewed': path, }) _arcs_cache['ts'] = now _arcs_cache['data'] = arcs return jsonify(arcs) # ─── /api/leaderboards ───────────────────────────────── _lb_cache = {'ts': 0, 'data': None} _LB_CACHE_SECS = 60 _UA_BROWSER_PAT = re.compile(r'(Firefox|Edg|Chrome|Safari|Opera|DuckDuckGo|SamsungBrowser|MSIE|Trident)/?([0-9.]*)', re.I) def _lb_parse_log(limit_lines=50000): """Parse nginx log for leaderboard data. Returns list of dicts.""" if not os.path.exists(NGINX_LOG): return [] try: import subprocess r = subprocess.run(['tail', '-n', str(limit_lines), NGINX_LOG], capture_output=True, text=True, timeout=10) lines = r.stdout.strip().split('\n') except Exception: return [] pat = re.compile(r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) \S+ "([^"]*)" "([^"]*)"') rows = [] for line in lines: m = pat.match(line) if not m: continue ip, ts, method, path, status, referer, ua = m.groups() rows.append({ 'ip': ip, 'ts': ts, 'method': method, 'path': path, 'status': int(status) if status.isdigit() else 0, 'referer': referer, 'ua': ua, }) return rows def _parse_nginx_ts(ts_str): try: return datetime.strptime(ts_str.split()[0], '%d/%b/%Y:%H:%M:%S') except Exception: return None @visitor_bp.route('/api/leaderboards') def leaderboards(): now = time.time() if _lb_cache['data'] and now - _lb_cache['ts'] < _LB_CACHE_SECS: return jsonify(_lb_cache['data']) rows = _lb_parse_log(50000) reader = _get_geoip() now_dt = datetime.utcnow() # split 24h / 7d rows_24h = [] rows_7d = [] for r in rows: dt = _parse_nginx_ts(r['ts']) if not dt: continue age_h = (now_dt - dt).total_seconds() / 3600 if age_h <= 24: rows_24h.append({**r, 'dt': dt}) if age_h <= 24 * 7: rows_7d.append({**r, 'dt': dt}) # Top Countries (24h) country_counter = Counter() if reader: for r in rows_24h: try: if _geoip_has_city: resp = reader.city(r['ip']) else: resp = reader.country(r['ip']) cc = resp.country.iso_code or 'XX' name = resp.country.name or 'Unknown' country_counter[(cc, name)] += 1 except Exception: continue top_countries = [{'code': cc, 'name': name, 'count': c, 'flag': _country_flag(cc)} for (cc, name), c in country_counter.most_common(15)] # Top Pages (24h) — exclude api/assets pages_counter = Counter() for r in rows_24h: p = r['path'].split('?', 1)[0] if p.startswith(('/api/', '/css/', '/js/', '/assets/', '/fonts/', '/mascot/')): continue if any(p.endswith(ext) for ext in ('.ico', '.png', '.jpg', '.jpeg', '.svg', '.webp', '.woff', '.woff2', '.gif', '.map')): continue if r['status'] >= 400: continue if r['method'] != 'GET': continue pages_counter[p] += 1 top_pages = [{'path': p, 'count': c} for p, c in pages_counter.most_common(20)] # Top Referrers (7d) — exclude self/empty ref_counter = Counter() for r in rows_7d: ref = r['referer'] if not ref or ref == '-': continue if 'jaeswift.xyz' in ref: continue # extract hostname m = re.match(r'https?://([^/]+)', ref) if m: host = m.group(1) ref_counter[host] += 1 top_referrers = [{'host': h, 'count': c} for h, c in ref_counter.most_common(10)] # Peak Hours (24h) hour_counter = Counter() for r in rows_24h: hour_counter[r['dt'].hour] += 1 peak_hours = [{'hour': h, 'count': hour_counter.get(h, 0)} for h in range(24)] # Browser Breakdown (24h) browser_counter = Counter() for r in rows_24h: m = _UA_BROWSER_PAT.search(r['ua'] or '') if m: name = m.group(1) if name == 'Edg': name = 'Edge' browser_counter[name] += 1 else: s = (r['ua'] or '').lower() if any(k in s for k in ('bot', 'crawl', 'spider', 'curl', 'wget', 'python')): browser_counter['Bot/CLI'] += 1 else: browser_counter['Other'] += 1 browsers = [{'name': n, 'count': c} for n, c in browser_counter.most_common(10)] # Operator Leaderboard (7d) — top IPs ip_counter = Counter() ip_last_seen = {} for r in rows_7d: ip_counter[r['ip']] += 1 ip_last_seen[r['ip']] = r['dt'] top_ops = [] for ip, c in ip_counter.most_common(10): last = ip_last_seen[ip] delta = (now_dt - last).total_seconds() if delta < 60: last_seen_str = f"{int(delta)}s ago" elif delta < 3600: last_seen_str = f"{int(delta/60)}m ago" elif delta < 86400: last_seen_str = f"{int(delta/3600)}h ago" else: last_seen_str = f"{int(delta/86400)}d ago" top_ops.append({ 'ip_masked': _mask_ip(ip), 'count': c, 'last_seen': last_seen_str, }) data = { 'generated_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), 'total_requests_24h': len(rows_24h), 'total_requests_7d': len(rows_7d), 'top_countries': top_countries, 'top_pages': top_pages, 'top_referrers': top_referrers, 'peak_hours': peak_hours, 'browsers': browsers, 'top_operators': top_ops, } _lb_cache['ts'] = now _lb_cache['data'] = data return jsonify(data)