#!/usr/bin/env python3 """JAESWIFT .GOV Domain Tracker — Sync Script Clones/pulls cisagov/dotgov-data, parses current-full.csv, tracks first_seen dates and new domain additions. Designed to run via cron every 12 hours. """ import csv import json import os import subprocess import sys from datetime import datetime, date from pathlib import Path # ─── Configuration ───────────────────────────────────── BASE_DIR = Path(__file__).parent SOURCE_DIR = BASE_DIR / 'govdomains-source' DATA_DIR = BASE_DIR / 'data' DOMAINS_FILE = DATA_DIR / 'govdomains.json' HISTORY_FILE = DATA_DIR / 'govdomains_history.json' REPO_URL = 'https://github.com/cisagov/dotgov-data.git' CSV_FILE = SOURCE_DIR / 'current-full.csv' def log(msg): ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') print(f'[{ts}] {msg}', flush=True) def clone_or_pull(): """Clone the CISA dotgov-data repo or pull latest changes.""" if (SOURCE_DIR / '.git').exists(): log('Pulling latest dotgov-data...') result = subprocess.run( ['git', '-C', str(SOURCE_DIR), 'pull', '--ff-only'], capture_output=True, text=True, timeout=120 ) if result.returncode != 0: log(f'Git pull failed: {result.stderr.strip()}') # Force reset subprocess.run( ['git', '-C', str(SOURCE_DIR), 'fetch', 'origin'], capture_output=True, text=True, timeout=120 ) subprocess.run( ['git', '-C', str(SOURCE_DIR), 'reset', '--hard', 'origin/main'], capture_output=True, text=True, timeout=60 ) log('Force-reset to origin/main') else: log(f'Pull OK: {result.stdout.strip()}') else: log('Cloning dotgov-data repo...') SOURCE_DIR.mkdir(parents=True, exist_ok=True) result = subprocess.run( ['git', 'clone', '--depth', '1', REPO_URL, str(SOURCE_DIR)], capture_output=True, text=True, timeout=300 ) if result.returncode != 0: log(f'Clone failed: {result.stderr.strip()}') sys.exit(1) log('Clone complete') def parse_csv(): """Parse current-full.csv and return list of domain dicts.""" if not CSV_FILE.exists(): log(f'CSV not found: {CSV_FILE}') sys.exit(1) domains = [] with open(CSV_FILE, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: domain = (row.get('Domain Name') or row.get('Domain name') or '').strip().lower() if not domain: continue # Ensure .gov suffix if not domain.endswith('.gov'): domain += '.gov' domains.append({ 'domain': domain, 'type': (row.get('Domain Type') or row.get('Domain type') or '').strip(), 'agency': (row.get('Agency') or '').strip(), 'organization': (row.get('Organization') or row.get('Organization name') or '').strip(), 'city': (row.get('City') or '').strip(), 'state': (row.get('State') or '').strip(), 'security_contact': (row.get('Security Contact Email') or row.get('Security contact email') or '').strip(), }) log(f'Parsed {len(domains)} domains from CSV') return domains def load_existing(): """Load existing govdomains.json if it exists.""" if DOMAINS_FILE.exists(): try: with open(DOMAINS_FILE, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: log(f'Error loading existing data: {e}') return None def load_history(): """Load domain addition history.""" if HISTORY_FILE.exists(): try: with open(HISTORY_FILE, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {} def sync(): """Main sync logic.""" today = date.today().isoformat() now_iso = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Clone or pull the repo clone_or_pull() # Parse fresh CSV data csv_domains = parse_csv() if not csv_domains: log('No domains parsed — aborting') sys.exit(1) # Build lookup from CSV: domain -> record csv_lookup = {} for d in csv_domains: csv_lookup[d['domain']] = d # Load existing data existing = load_existing() history = load_history() # Build lookup of existing domains: domain -> record (with first_seen) existing_lookup = {} if existing and 'domains' in existing: for d in existing['domains']: existing_lookup[d['domain']] = d # Merge: preserve first_seen for known domains, mark new ones new_domains_today = [] merged = [] for domain_name, csv_record in csv_lookup.items(): if domain_name in existing_lookup: # Existing domain — preserve first_seen, update other fields entry = { **csv_record, 'first_seen': existing_lookup[domain_name].get('first_seen', today), 'is_new': False, } else: # New domain entry = { **csv_record, 'first_seen': today, 'is_new': True, } new_domains_today.append(domain_name) merged.append(entry) # Sort by first_seen descending then domain name merged.sort(key=lambda x: (x['first_seen'], x['domain']), reverse=True) # On first run, don't mark everything as "new" in history # (all domains get first_seen=today but that's expected) is_first_run = existing is None # Build output output = { 'last_sync': now_iso, 'total': len(merged), 'new_today': len(new_domains_today), 'is_first_run': is_first_run, 'domains': merged, } # Save domains file with open(DOMAINS_FILE, 'w') as f: json.dump(output, f, indent=2) log(f'Saved {len(merged)} domains to {DOMAINS_FILE}') # Update history if new_domains_today and not is_first_run: if today not in history: history[today] = [] # Append without duplicating existing_in_day = set(history[today]) for d in new_domains_today: if d not in existing_in_day: history[today].append(d) log(f'Recorded {len(new_domains_today)} new domains for {today}') elif is_first_run: # First run — record total as baseline, not individual domains history[today] = [f'__baseline__:{len(merged)}_domains'] log(f'First run — baseline of {len(merged)} domains established') with open(HISTORY_FILE, 'w') as f: json.dump(history, f, indent=2) log(f'History updated: {HISTORY_FILE}') # Summary log(f'Sync complete: {len(merged)} total domains, {len(new_domains_today)} new today') if new_domains_today and len(new_domains_today) <= 20: for d in new_domains_today: log(f' NEW: {d}') elif new_domains_today: log(f' (too many to list individually)') if __name__ == '__main__': try: sync() except Exception as e: log(f'FATAL ERROR: {e}') import traceback traceback.print_exc() sys.exit(1)