jaeswift-website/api/govdomains_sync.py
jae 2bc40ac285 feat: .GOV Domain Tracker tab on RADAR page
- Add govdomains_sync.py: clones CISA dotgov-data, parses CSV, tracks first_seen dates
- Add /api/govdomains and /api/govdomains/stats Flask endpoints with range/type/search filters
- Add NEWS FEED | .GOV TRACKER toggle to RADAR page
- Domain type badges (Federal=red, State=blue, City=green, County=amber)
- New domain detection with pulsing green highlight and NEW badge
- Responsive grid layout with stats bar and result count
2026-04-15 15:52:32 +00:00

228 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""JAESWIFT .GOV Domain Tracker — Sync Script
Clones/pulls cisagov/dotgov-data, parses current-full.csv,
tracks first_seen dates and new domain additions.
Designed to run via cron every 12 hours.
"""
import csv
import json
import os
import subprocess
import sys
from datetime import datetime, date
from pathlib import Path
# ─── Configuration ─────────────────────────────────────
BASE_DIR = Path(__file__).parent
SOURCE_DIR = BASE_DIR / 'govdomains-source'
DATA_DIR = BASE_DIR / 'data'
DOMAINS_FILE = DATA_DIR / 'govdomains.json'
HISTORY_FILE = DATA_DIR / 'govdomains_history.json'
REPO_URL = 'https://github.com/cisagov/dotgov-data.git'
CSV_FILE = SOURCE_DIR / 'current-full.csv'
def log(msg):
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
print(f'[{ts}] {msg}', flush=True)
def clone_or_pull():
"""Clone the CISA dotgov-data repo or pull latest changes."""
if (SOURCE_DIR / '.git').exists():
log('Pulling latest dotgov-data...')
result = subprocess.run(
['git', '-C', str(SOURCE_DIR), 'pull', '--ff-only'],
capture_output=True, text=True, timeout=120
)
if result.returncode != 0:
log(f'Git pull failed: {result.stderr.strip()}')
# Force reset
subprocess.run(
['git', '-C', str(SOURCE_DIR), 'fetch', 'origin'],
capture_output=True, text=True, timeout=120
)
subprocess.run(
['git', '-C', str(SOURCE_DIR), 'reset', '--hard', 'origin/main'],
capture_output=True, text=True, timeout=60
)
log('Force-reset to origin/main')
else:
log(f'Pull OK: {result.stdout.strip()}')
else:
log('Cloning dotgov-data repo...')
SOURCE_DIR.mkdir(parents=True, exist_ok=True)
result = subprocess.run(
['git', 'clone', '--depth', '1', REPO_URL, str(SOURCE_DIR)],
capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
log(f'Clone failed: {result.stderr.strip()}')
sys.exit(1)
log('Clone complete')
def parse_csv():
"""Parse current-full.csv and return list of domain dicts."""
if not CSV_FILE.exists():
log(f'CSV not found: {CSV_FILE}')
sys.exit(1)
domains = []
with open(CSV_FILE, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
domain = (row.get('Domain Name') or row.get('Domain name') or '').strip().lower()
if not domain:
continue
# Ensure .gov suffix
if not domain.endswith('.gov'):
domain += '.gov'
domains.append({
'domain': domain,
'type': (row.get('Domain Type') or row.get('Domain type') or '').strip(),
'agency': (row.get('Agency') or '').strip(),
'organization': (row.get('Organization') or row.get('Organization name') or '').strip(),
'city': (row.get('City') or '').strip(),
'state': (row.get('State') or '').strip(),
'security_contact': (row.get('Security Contact Email') or row.get('Security contact email') or '').strip(),
})
log(f'Parsed {len(domains)} domains from CSV')
return domains
def load_existing():
"""Load existing govdomains.json if it exists."""
if DOMAINS_FILE.exists():
try:
with open(DOMAINS_FILE, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
log(f'Error loading existing data: {e}')
return None
def load_history():
"""Load domain addition history."""
if HISTORY_FILE.exists():
try:
with open(HISTORY_FILE, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {}
def sync():
"""Main sync logic."""
today = date.today().isoformat()
now_iso = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Clone or pull the repo
clone_or_pull()
# Parse fresh CSV data
csv_domains = parse_csv()
if not csv_domains:
log('No domains parsed — aborting')
sys.exit(1)
# Build lookup from CSV: domain -> record
csv_lookup = {}
for d in csv_domains:
csv_lookup[d['domain']] = d
# Load existing data
existing = load_existing()
history = load_history()
# Build lookup of existing domains: domain -> record (with first_seen)
existing_lookup = {}
if existing and 'domains' in existing:
for d in existing['domains']:
existing_lookup[d['domain']] = d
# Merge: preserve first_seen for known domains, mark new ones
new_domains_today = []
merged = []
for domain_name, csv_record in csv_lookup.items():
if domain_name in existing_lookup:
# Existing domain — preserve first_seen, update other fields
entry = {
**csv_record,
'first_seen': existing_lookup[domain_name].get('first_seen', today),
'is_new': False,
}
else:
# New domain
entry = {
**csv_record,
'first_seen': today,
'is_new': True,
}
new_domains_today.append(domain_name)
merged.append(entry)
# Sort by first_seen descending then domain name
merged.sort(key=lambda x: (x['first_seen'], x['domain']), reverse=True)
# On first run, don't mark everything as "new" in history
# (all domains get first_seen=today but that's expected)
is_first_run = existing is None
# Build output
output = {
'last_sync': now_iso,
'total': len(merged),
'new_today': len(new_domains_today),
'is_first_run': is_first_run,
'domains': merged,
}
# Save domains file
with open(DOMAINS_FILE, 'w') as f:
json.dump(output, f, indent=2)
log(f'Saved {len(merged)} domains to {DOMAINS_FILE}')
# Update history
if new_domains_today and not is_first_run:
if today not in history:
history[today] = []
# Append without duplicating
existing_in_day = set(history[today])
for d in new_domains_today:
if d not in existing_in_day:
history[today].append(d)
log(f'Recorded {len(new_domains_today)} new domains for {today}')
elif is_first_run:
# First run — record total as baseline, not individual domains
history[today] = [f'__baseline__:{len(merged)}_domains']
log(f'First run — baseline of {len(merged)} domains established')
with open(HISTORY_FILE, 'w') as f:
json.dump(history, f, indent=2)
log(f'History updated: {HISTORY_FILE}')
# Summary
log(f'Sync complete: {len(merged)} total domains, {len(new_domains_today)} new today')
if new_domains_today and len(new_domains_today) <= 20:
for d in new_domains_today:
log(f' NEW: {d}')
elif new_domains_today:
log(f' (too many to list individually)')
if __name__ == '__main__':
try:
sync()
except Exception as e:
log(f'FATAL ERROR: {e}')
import traceback
traceback.print_exc()
sys.exit(1)