- Add govdomains_sync.py: clones CISA dotgov-data, parses CSV, tracks first_seen dates - Add /api/govdomains and /api/govdomains/stats Flask endpoints with range/type/search filters - Add NEWS FEED | .GOV TRACKER toggle to RADAR page - Domain type badges (Federal=red, State=blue, City=green, County=amber) - New domain detection with pulsing green highlight and NEW badge - Responsive grid layout with stats bar and result count
228 lines
7.4 KiB
Python
228 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""JAESWIFT .GOV Domain Tracker — Sync Script
|
|
Clones/pulls cisagov/dotgov-data, parses current-full.csv,
|
|
tracks first_seen dates and new domain additions.
|
|
Designed to run via cron every 12 hours.
|
|
"""
|
|
import csv
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
|
|
# ─── Configuration ─────────────────────────────────────
|
|
BASE_DIR = Path(__file__).parent
|
|
SOURCE_DIR = BASE_DIR / 'govdomains-source'
|
|
DATA_DIR = BASE_DIR / 'data'
|
|
DOMAINS_FILE = DATA_DIR / 'govdomains.json'
|
|
HISTORY_FILE = DATA_DIR / 'govdomains_history.json'
|
|
REPO_URL = 'https://github.com/cisagov/dotgov-data.git'
|
|
CSV_FILE = SOURCE_DIR / 'current-full.csv'
|
|
|
|
|
|
def log(msg):
|
|
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
print(f'[{ts}] {msg}', flush=True)
|
|
|
|
|
|
def clone_or_pull():
|
|
"""Clone the CISA dotgov-data repo or pull latest changes."""
|
|
if (SOURCE_DIR / '.git').exists():
|
|
log('Pulling latest dotgov-data...')
|
|
result = subprocess.run(
|
|
['git', '-C', str(SOURCE_DIR), 'pull', '--ff-only'],
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
if result.returncode != 0:
|
|
log(f'Git pull failed: {result.stderr.strip()}')
|
|
# Force reset
|
|
subprocess.run(
|
|
['git', '-C', str(SOURCE_DIR), 'fetch', 'origin'],
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
subprocess.run(
|
|
['git', '-C', str(SOURCE_DIR), 'reset', '--hard', 'origin/main'],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
log('Force-reset to origin/main')
|
|
else:
|
|
log(f'Pull OK: {result.stdout.strip()}')
|
|
else:
|
|
log('Cloning dotgov-data repo...')
|
|
SOURCE_DIR.mkdir(parents=True, exist_ok=True)
|
|
result = subprocess.run(
|
|
['git', 'clone', '--depth', '1', REPO_URL, str(SOURCE_DIR)],
|
|
capture_output=True, text=True, timeout=300
|
|
)
|
|
if result.returncode != 0:
|
|
log(f'Clone failed: {result.stderr.strip()}')
|
|
sys.exit(1)
|
|
log('Clone complete')
|
|
|
|
|
|
def parse_csv():
|
|
"""Parse current-full.csv and return list of domain dicts."""
|
|
if not CSV_FILE.exists():
|
|
log(f'CSV not found: {CSV_FILE}')
|
|
sys.exit(1)
|
|
|
|
domains = []
|
|
with open(CSV_FILE, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
domain = (row.get('Domain Name') or row.get('Domain name') or '').strip().lower()
|
|
if not domain:
|
|
continue
|
|
# Ensure .gov suffix
|
|
if not domain.endswith('.gov'):
|
|
domain += '.gov'
|
|
domains.append({
|
|
'domain': domain,
|
|
'type': (row.get('Domain Type') or row.get('Domain type') or '').strip(),
|
|
'agency': (row.get('Agency') or '').strip(),
|
|
'organization': (row.get('Organization') or row.get('Organization name') or '').strip(),
|
|
'city': (row.get('City') or '').strip(),
|
|
'state': (row.get('State') or '').strip(),
|
|
'security_contact': (row.get('Security Contact Email') or row.get('Security contact email') or '').strip(),
|
|
})
|
|
|
|
log(f'Parsed {len(domains)} domains from CSV')
|
|
return domains
|
|
|
|
|
|
def load_existing():
|
|
"""Load existing govdomains.json if it exists."""
|
|
if DOMAINS_FILE.exists():
|
|
try:
|
|
with open(DOMAINS_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
log(f'Error loading existing data: {e}')
|
|
return None
|
|
|
|
|
|
def load_history():
|
|
"""Load domain addition history."""
|
|
if HISTORY_FILE.exists():
|
|
try:
|
|
with open(HISTORY_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def sync():
|
|
"""Main sync logic."""
|
|
today = date.today().isoformat()
|
|
now_iso = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
# Ensure data directory exists
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Clone or pull the repo
|
|
clone_or_pull()
|
|
|
|
# Parse fresh CSV data
|
|
csv_domains = parse_csv()
|
|
if not csv_domains:
|
|
log('No domains parsed — aborting')
|
|
sys.exit(1)
|
|
|
|
# Build lookup from CSV: domain -> record
|
|
csv_lookup = {}
|
|
for d in csv_domains:
|
|
csv_lookup[d['domain']] = d
|
|
|
|
# Load existing data
|
|
existing = load_existing()
|
|
history = load_history()
|
|
|
|
# Build lookup of existing domains: domain -> record (with first_seen)
|
|
existing_lookup = {}
|
|
if existing and 'domains' in existing:
|
|
for d in existing['domains']:
|
|
existing_lookup[d['domain']] = d
|
|
|
|
# Merge: preserve first_seen for known domains, mark new ones
|
|
new_domains_today = []
|
|
merged = []
|
|
|
|
for domain_name, csv_record in csv_lookup.items():
|
|
if domain_name in existing_lookup:
|
|
# Existing domain — preserve first_seen, update other fields
|
|
entry = {
|
|
**csv_record,
|
|
'first_seen': existing_lookup[domain_name].get('first_seen', today),
|
|
'is_new': False,
|
|
}
|
|
else:
|
|
# New domain
|
|
entry = {
|
|
**csv_record,
|
|
'first_seen': today,
|
|
'is_new': True,
|
|
}
|
|
new_domains_today.append(domain_name)
|
|
|
|
merged.append(entry)
|
|
|
|
# Sort by first_seen descending then domain name
|
|
merged.sort(key=lambda x: (x['first_seen'], x['domain']), reverse=True)
|
|
|
|
# On first run, don't mark everything as "new" in history
|
|
# (all domains get first_seen=today but that's expected)
|
|
is_first_run = existing is None
|
|
|
|
# Build output
|
|
output = {
|
|
'last_sync': now_iso,
|
|
'total': len(merged),
|
|
'new_today': len(new_domains_today),
|
|
'is_first_run': is_first_run,
|
|
'domains': merged,
|
|
}
|
|
|
|
# Save domains file
|
|
with open(DOMAINS_FILE, 'w') as f:
|
|
json.dump(output, f, indent=2)
|
|
log(f'Saved {len(merged)} domains to {DOMAINS_FILE}')
|
|
|
|
# Update history
|
|
if new_domains_today and not is_first_run:
|
|
if today not in history:
|
|
history[today] = []
|
|
# Append without duplicating
|
|
existing_in_day = set(history[today])
|
|
for d in new_domains_today:
|
|
if d not in existing_in_day:
|
|
history[today].append(d)
|
|
log(f'Recorded {len(new_domains_today)} new domains for {today}')
|
|
elif is_first_run:
|
|
# First run — record total as baseline, not individual domains
|
|
history[today] = [f'__baseline__:{len(merged)}_domains']
|
|
log(f'First run — baseline of {len(merged)} domains established')
|
|
|
|
with open(HISTORY_FILE, 'w') as f:
|
|
json.dump(history, f, indent=2)
|
|
log(f'History updated: {HISTORY_FILE}')
|
|
|
|
# Summary
|
|
log(f'Sync complete: {len(merged)} total domains, {len(new_domains_today)} new today')
|
|
if new_domains_today and len(new_domains_today) <= 20:
|
|
for d in new_domains_today:
|
|
log(f' NEW: {d}')
|
|
elif new_domains_today:
|
|
log(f' (too many to list individually)')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
sync()
|
|
except Exception as e:
|
|
log(f'FATAL ERROR: {e}')
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|