feat(agent): expand toolbox to 37 tools across 4 tiers

This commit is contained in:
jae 2026-04-20 11:03:20 +00:00
parent b94bbe2f3e
commit 36d1ba6268
3 changed files with 923 additions and 0 deletions

View file

@ -537,3 +537,11 @@ def openai_tool_schemas(tools: list[Tool]) -> list[dict]:
'parameters': t.params, 'parameters': t.params,
}, },
} for t in tools] } for t in tools]
# ── Extended tool registry (Phase 3 — 27 additional tools) ──────────────
try:
import agent_tools_extended # noqa: F401 registers on import
except Exception as _ext_err:
import sys
print(f'[agent_tools] extended tools failed to load: {_ext_err}', file=sys.stderr)

892
api/agent_tools_extended.py Normal file
View file

@ -0,0 +1,892 @@
"""Extended JAE-AI agent tools (Phase 3).
Adds 27 new tools across anonymous / operator / elite / admin tiers.
Imported at the end of agent_tools.py so it runs at app startup.
"""
from __future__ import annotations
import json
import os
import re
import time
import subprocess
from pathlib import Path
from typing import Any
import requests as req
from agent_tools import (
Tool, register, DATA_DIR, _load_json, cache_get, cache_set,
)
MEM_DIR = DATA_DIR / 'agent_memories'
MEM_DIR.mkdir(parents=True, exist_ok=True)
ALERTS_PATH = DATA_DIR / 'alerts.json'
GUESTBOOK_PATH = DATA_DIR / 'guestbook.json'
BROADCAST_PATH = DATA_DIR / 'broadcast.json'
MAX_MEMORIES_PER_WALLET = 100
# ═══════════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════════
def _current_wallet() -> str | None:
"""Pull the authenticated wallet address from the request session."""
try:
from auth_routes import read_session
sess = read_session()
return sess.get('address') if sess else None
except Exception:
return None
def _is_admin(address: str | None) -> bool:
if not address:
return False
try:
from agent_tiers import ADMIN_WALLETS
return address in ADMIN_WALLETS
except Exception:
return False
def _safe_read_json(path: Path, default: Any) -> Any:
try:
if path.exists():
with open(path) as f:
return json.load(f)
except Exception:
pass
return default
def _safe_write_json(path: Path, data: Any) -> bool:
try:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix('.tmp')
with open(tmp, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
tmp.replace(path)
return True
except Exception:
return False
def _search_nested(obj: Any, q: str, limit: int = 5) -> list[dict]:
"""Walk a JSON tree and return objects whose combined text contains q."""
q = q.lower()
results: list[dict] = []
def blob_of(item) -> str:
try:
return json.dumps(item, ensure_ascii=False)[:4000].lower()
except Exception:
return str(item).lower()
def walk(node, depth=0):
if len(results) >= limit or depth > 5:
return
if isinstance(node, dict):
if q in blob_of(node):
results.append(node)
return
for v in node.values():
walk(v, depth + 1)
elif isinstance(node, list):
for v in node:
walk(v, depth + 1)
if len(results) >= limit:
return
walk(obj)
return results
def _trim_addr(addr: str) -> str:
if not addr:
return ''
return addr[:4] + '' + addr[-4:]
# ═══════════════════════════════════════════════════════════════════════════
# TIER: ANONYMOUS
# ═══════════════════════════════════════════════════════════════════════════
def tool_search_unredacted(args: dict) -> dict:
query = (args.get('query') or '').strip()
if not query:
return {'error': 'query required'}
data = _load_json('unredacted.json')
if data is None:
return {'error': 'unredacted archive unavailable', 'results': []}
matches = _search_nested(data, query, limit=5)
out = []
for m in matches:
out.append({
'title': m.get('title') or m.get('name') or m.get('document') or m.get('id') or 'Untitled',
'collection': m.get('collection') or m.get('category') or m.get('source'),
'summary': (m.get('summary') or m.get('description') or '')[:300],
'url': m.get('url') or m.get('pdf') or '/depot/unredacted',
})
return {'results': out, 'count': len(out)}
def tool_search_crimescene(args: dict) -> dict:
query = (args.get('query') or '').strip()
if not query:
return {'error': 'query required'}
data = _load_json('crimescene.json')
if data is None:
return {'error': 'crimescene archive unavailable', 'results': []}
matches = _search_nested(data, query, limit=5)
out = []
for m in matches:
out.append({
'case': m.get('case') or m.get('title') or m.get('name'),
'year': m.get('year') or m.get('date'),
'summary': (m.get('summary') or m.get('description') or '')[:300],
'status': m.get('status'),
'url': m.get('url') or '/depot/crimescene',
})
return {'results': out, 'count': len(out)}
def tool_search_radar(args: dict) -> dict:
query = (args.get('query') or '').strip().lower()
if not query:
return {'error': 'query required'}
try:
from app import get_radar_items # type: ignore
items = get_radar_items() or []
except Exception as e:
return {'error': f'radar unavailable: {e}', 'results': []}
results = []
for it in items:
blob = f"{it.get('title','')} {it.get('summary','')} {it.get('source','')}".lower()
if query in blob:
results.append({
'title': it.get('title'),
'source': it.get('source'),
'published': it.get('published'),
'url': it.get('url'),
'summary': (it.get('summary') or '')[:250],
})
if len(results) >= 5:
break
return {'results': results, 'count': len(results)}
def tool_get_gov_domains_stats(args: dict) -> dict:
data = _load_json('govdomains.json')
if data is None:
return {'error': 'govdomains data unavailable'}
domains = data.get('domains') if isinstance(data, dict) else (data if isinstance(data, list) else [])
total = len(domains) if isinstance(domains, list) else 0
now = time.time()
recent = 0
if isinstance(domains, list):
for d in domains:
try:
added = d.get('added') or d.get('first_seen') or d.get('date')
if not added:
continue
if isinstance(added, (int, float)):
if now - added < 86400:
recent += 1
else:
# ISO string
import datetime as _dt
try:
ts = _dt.datetime.fromisoformat(str(added).replace('Z', '+00:00')).timestamp()
if now - ts < 86400:
recent += 1
except Exception:
pass
except Exception:
continue
return {
'total_domains': total,
'added_last_24h': recent,
'source': 'api/data/govdomains.json',
'url': '/depot/recon',
}
def tool_search_docs(args: dict) -> dict:
query = (args.get('query') or '').strip()
if not query:
return {'error': 'query required'}
combined = []
try:
u = tool_search_unredacted({'query': query}).get('results', [])
for r in u:
r['source'] = 'unredacted'
combined.extend(u)
except Exception:
pass
try:
c = tool_search_crimescene({'query': query}).get('results', [])
for r in c:
r['source'] = 'crimescene'
combined.extend(c)
except Exception:
pass
try:
from agent_tools import tool_get_sitrep
latest = tool_get_sitrep({'date': 'latest'})
if query.lower() in (latest.get('summary') or '').lower():
combined.append({
'source': 'sitrep',
'title': f"SITREP {latest.get('date','')}",
'summary': (latest.get('summary') or '')[:300],
'url': '/transmissions/sitrep',
})
except Exception:
pass
return {'results': combined[:10], 'count': len(combined[:10])}
def tool_get_server_status(args: dict) -> dict:
cached = cache_get('server_status', 15)
if cached is not None:
return cached
try:
from telemetry_snapshot import gather
snap = gather()
except Exception as e:
return {'error': f'telemetry unavailable: {e}'}
out = {
'cpu_pct': snap.get('cpu_pct'),
'ram_pct': snap.get('ram_pct'),
'disk_pct': snap.get('disk_pct'),
'load_avg': snap.get('load_avg'),
'uptime_seconds': snap.get('uptime_seconds'),
'timestamp': snap.get('ts') or time.time(),
}
cache_set('server_status', out)
return out
_FORTUNES_SFW = [
'You will find the bug before lunch.',
'A stranger will ask you for a Solana airdrop. Decline politely.',
"Today's commit will be remembered.",
'The moon is closer than your merge conflict.',
"Don't feed the AI after midnight.",
'Debugging is twice as hard as writing the code in the first place.',
'Your next deploy will be flawless. Probably.',
'The quieter you become, the more you can grep.',
'A watched service never restarts cleanly.',
'Trust the process. Then kill it with SIGKILL.',
]
_FORTUNES_SPICY = [
'Go outside. The server will still be broken when you get back.',
'Your rubber duck has filed a grievance.',
'Stack Overflow is down. Good luck, nerd.',
'That variable name is a war crime.',
'You are not a 10x developer. You are a 10x debugger.',
]
def tool_random_fortune(args: dict) -> dict:
import random
offensive = bool(args.get('offensive'))
# Try system fortune first
for path in ('/usr/games/fortune', '/usr/bin/fortune'):
if os.path.exists(path):
try:
flags = ['-o'] if offensive else ['-s']
r = subprocess.run([path] + flags, capture_output=True, text=True, timeout=3)
line = (r.stdout or '').strip()
if line:
return {'fortune': line, 'source': 'fortune(1)'}
except Exception:
pass
pool = _FORTUNES_SPICY + _FORTUNES_SFW if offensive else _FORTUNES_SFW
return {'fortune': random.choice(pool), 'source': 'builtin'}
# Tiny built-in block-letter ASCII for A-Z 0-9 (single-row, 3x5 font)
_ASCII_FONT = {
'A': ['', '█▀█', '█ █'],
'B': ['██ ', '██ ', '██ '],
'C': ['██ ', '', '██ '],
'D': ['██ ', '█ █', '██ '],
'E': ['███', '██ ', '███'],
'F': ['███', '██ ', ''],
'G': ['██ ', '█ █', '███'],
'H': ['█ █', '███', '█ █'],
'I': ['███', '', '███'],
'J': ['███', '', '██ '],
'K': ['█ █', '██ ', '█ █'],
'L': ['', '', '███'],
'M': ['█▄█', '█▀█', '█ █'],
'N': ['█▄█', '█▀█', '█ █'],
'O': ['███', '█ █', '███'],
'P': ['██ ', '██ ', ''],
'Q': ['███', '█ █', '██▄'],
'R': ['██ ', '██ ', '█ █'],
'S': ['██ ', '', '██ '],
'T': ['███', '', ''],
'U': ['█ █', '█ █', '███'],
'V': ['█ █', '█ █', ''],
'W': ['█ █', '███', '█▀█'],
'X': ['█ █', '', '█ █'],
'Y': ['█ █', '', ''],
'Z': ['███', '', '███'],
'0': ['███', '█ █', '███'],
'1': ['', '', ''],
'2': ['██ ', '', ' ██'],
'3': ['██ ', '', '██ '],
'4': ['█ █', '███', ''],
'5': ['███', '██ ', ' ██'],
'6': ['██ ', '███', '███'],
'7': ['███', '', ''],
'8': ['███', '███', '███'],
'9': ['███', '███', ''],
' ': [' ', ' ', ' '],
'.': [' ', ' ', ''],
'!': ['', '', ''],
'?': ['██ ', '', ''],
'-': [' ', '███', ' '],
}
def tool_ascii_banner(args: dict) -> dict:
text = (args.get('text') or '').upper().strip()
if not text:
return {'error': 'text required'}
text = text[:30]
try:
import pyfiglet # type: ignore
art = pyfiglet.figlet_format(text)
return {'banner': art, 'font': 'figlet', 'lines': art.count('\n')}
except Exception:
pass
rows = ['', '', '']
for ch in text:
glyph = _ASCII_FONT.get(ch, _ASCII_FONT[' '])
for i in range(3):
rows[i] += glyph[i] + ' '
return {'banner': '\n'.join(rows), 'font': 'builtin', 'lines': 3}
def tool_get_leaderboards(args: dict) -> dict:
try:
r = req.get('http://127.0.0.1:5000/api/leaderboards', timeout=8)
if r.status_code == 200:
j = r.json()
return {
'top_countries': (j.get('countries') or j.get('top_countries') or [])[:5],
'top_pages': (j.get('pages') or j.get('top_pages') or [])[:5],
'top_referrers': (j.get('referrers') or j.get('top_referrers') or [])[:5],
'source': 'internal',
}
except Exception as e:
return {'error': f'leaderboards fetch failed: {e}'}
return {'error': 'no data'}
def tool_get_network_graph_data(args: dict) -> dict:
try:
r = req.get('http://127.0.0.1:5000/api/visitor/recent-arcs', timeout=8)
arcs = r.json() if r.status_code == 200 else []
except Exception:
arcs = []
cc_count: dict[str, int] = {}
for a in arcs if isinstance(arcs, list) else []:
cc = a.get('country_code') or a.get('cc')
if cc:
cc_count[cc] = cc_count.get(cc, 0) + 1
top = sorted(cc_count.items(), key=lambda kv: -kv[1])[:5]
return {
'total_arcs': len(arcs) if isinstance(arcs, list) else 0,
'top_countries': [{'code': c, 'count': n} for c, n in top],
'source': '/api/visitor/recent-arcs',
}
def tool_get_guestbook(args: dict) -> dict:
limit = max(1, min(int(args.get('limit') or 10), 50))
data = _safe_read_json(GUESTBOOK_PATH, {'entries': []})
entries = data.get('entries', []) if isinstance(data, dict) else []
entries = sorted(entries, key=lambda e: e.get('timestamp', 0), reverse=True)[:limit]
return {'entries': entries, 'count': len(entries)}
# ═══════════════════════════════════════════════════════════════════════════
# TIER: OPERATOR (wallet required)
# ═══════════════════════════════════════════════════════════════════════════
def _require_wallet() -> tuple[str | None, dict | None]:
addr = _current_wallet()
if not addr:
return None, {'error': 'wallet authentication required'}
return addr, None
def _user_mem_path(addr: str) -> Path:
safe = re.sub(r'[^A-Za-z0-9]', '_', addr)[:64]
return MEM_DIR / f'{safe}.json'
def _load_user_mem(addr: str) -> dict:
return _safe_read_json(_user_mem_path(addr), {
'address': addr, 'memories': [], 'watchlist': [], 'created_at': time.time(),
})
def _save_user_mem(addr: str, data: dict) -> bool:
return _safe_write_json(_user_mem_path(addr), data)
def tool_save_memory(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
fact = (args.get('fact') or '').strip()
if not fact or len(fact) < 3:
return {'error': 'fact required (min 3 chars)'}
fact = fact[:500]
mem = _load_user_mem(addr)
memories = mem.get('memories', [])
if len(memories) >= MAX_MEMORIES_PER_WALLET:
memories = memories[-(MAX_MEMORIES_PER_WALLET - 1):]
memories.append({'id': len(memories), 'fact': fact, 'timestamp': time.time()})
# Reindex ids
for i, m in enumerate(memories):
m['id'] = i
mem['memories'] = memories
if not _save_user_mem(addr, mem):
return {'error': 'failed to persist memory'}
return {'saved': True, 'id': memories[-1]['id'], 'total': len(memories)}
def tool_list_memories(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
mem = _load_user_mem(addr)
return {'memories': mem.get('memories', []), 'count': len(mem.get('memories', []))}
def tool_delete_memory(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
try:
idx = int(args.get('id'))
except (TypeError, ValueError):
return {'error': 'valid id required'}
mem = _load_user_mem(addr)
memories = mem.get('memories', [])
if idx < 0 or idx >= len(memories):
return {'error': f'id {idx} out of range (0..{len(memories)-1})'}
removed = memories.pop(idx)
# Reindex
for i, m in enumerate(memories):
m['id'] = i
mem['memories'] = memories
_save_user_mem(addr, mem)
return {'deleted': True, 'removed_fact': removed.get('fact'), 'remaining': len(memories)}
def tool_get_my_wallet_summary(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
try:
from agent_tools import tool_wallet_xray
return tool_wallet_xray({'address': addr})
except Exception as e:
return {'error': f'summary failed: {e}'}
def tool_get_my_transactions(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
limit = max(1, min(int(args.get('limit') or 5), 25))
try:
from auth_routes import get_rpc_url
rpc = get_rpc_url()
r = req.post(rpc, json={
'jsonrpc': '2.0', 'id': 1, 'method': 'getSignaturesForAddress',
'params': [addr, {'limit': limit}],
}, timeout=10)
sigs = r.json().get('result', []) or []
return {
'address': addr,
'transactions': [{
'signature': s.get('signature'),
'slot': s.get('slot'),
'block_time': s.get('blockTime'),
'err': bool(s.get('err')),
'solscan_url': f"https://solscan.io/tx/{s.get('signature')}",
} for s in sigs[:limit]],
'count': len(sigs[:limit]),
}
except Exception as e:
return {'error': f'tx fetch failed: {e}'}
def tool_post_guestbook_entry(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
message = (args.get('message') or '').strip()
if not message or len(message) < 2:
return {'error': 'message required (min 2 chars)'}
# Sanitise: strip HTML, limit length
message = re.sub(r'<[^>]+>', '', message)[:280]
data = _safe_read_json(GUESTBOOK_PATH, {'entries': []})
entries = data.get('entries', []) if isinstance(data, dict) else []
entries.append({
'address': addr,
'truncated_address': _trim_addr(addr),
'message': message,
'timestamp': time.time(),
})
entries = entries[-500:] # cap total
_safe_write_json(GUESTBOOK_PATH, {'entries': entries})
return {'posted': True, 'message': message, 'total_entries': len(entries)}
# ═══════════════════════════════════════════════════════════════════════════
# TIER: ELITE
# ═══════════════════════════════════════════════════════════════════════════
def tool_deep_research(args: dict) -> dict:
topic = (args.get('topic') or '').strip()
if not topic:
return {'error': 'topic required'}
from agent_tools import tool_search_site, tool_search_contraband
report: dict = {'topic': topic, 'sources': {}}
try:
report['sources']['site'] = tool_search_site({'query': topic}).get('results', [])
except Exception as e:
report['sources']['site_error'] = str(e)
try:
report['sources']['contraband'] = tool_search_contraband({'query': topic}).get('results', [])
except Exception as e:
report['sources']['contraband_error'] = str(e)
try:
report['sources']['unredacted'] = tool_search_unredacted({'query': topic}).get('results', [])
except Exception as e:
report['sources']['unredacted_error'] = str(e)
try:
report['sources']['radar'] = tool_search_radar({'query': topic}).get('results', [])
except Exception as e:
report['sources']['radar_error'] = str(e)
total = sum(len(v) for v in report['sources'].values() if isinstance(v, list))
report['total_results'] = total
report['summary'] = f'Gathered {total} hits across site, contraband, unredacted, and RADAR on "{topic}".'
return report
def tool_generate_report(args: dict) -> dict:
# 🟡 Placeholder: calling Venice from inside a tool handler re-entrantly is
# non-trivial. Return a stub that advises the user to ask the agent directly.
topic = (args.get('topic') or '').strip()
fmt = (args.get('format') or 'markdown').lower()
if not topic:
return {'error': 'topic required'}
return {
'status': 'placeholder',
'topic': topic,
'format': fmt,
'note': ('Long-form report generation is queued in the next release. '
'For now, ask JAE-AI directly — the Elite-tier Kimi-K2 model will '
'produce a detailed write-up inline. Use deep_research() first for sourcing.'),
}
def tool_track_token(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
mint = (args.get('mint_address') or '').strip()
if not mint or not (32 <= len(mint) <= 44):
return {'error': 'valid Solana mint address required'}
mem = _load_user_mem(addr)
watchlist = mem.get('watchlist', [])
if mint in watchlist:
return {'already_tracked': True, 'mint': mint, 'total': len(watchlist)}
if len(watchlist) >= 50:
return {'error': 'watchlist full (max 50)'}
watchlist.append(mint)
mem['watchlist'] = watchlist
_save_user_mem(addr, mem)
return {'tracked': True, 'mint': mint, 'total': len(watchlist)}
def tool_set_alert(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
alert_type = (args.get('type') or '').strip().lower()
if alert_type not in ('price_above', 'price_below', 'volume_above'):
return {'error': 'type must be one of: price_above, price_below, volume_above'}
try:
threshold = float(args.get('threshold'))
except (TypeError, ValueError):
return {'error': 'numeric threshold required'}
token = (args.get('token') or '').strip().upper()
if not token:
return {'error': 'token symbol required'}
data = _safe_read_json(ALERTS_PATH, {'alerts': []})
alerts = data.get('alerts', []) if isinstance(data, dict) else []
user_alerts = [a for a in alerts if a.get('address') == addr]
if len(user_alerts) >= 20:
return {'error': 'max 20 active alerts per wallet'}
new_alert = {
'id': f"alrt_{int(time.time()*1000)}",
'address': addr,
'type': alert_type,
'threshold': threshold,
'token': token,
'created_at': time.time(),
'active': True,
}
alerts.append(new_alert)
_safe_write_json(ALERTS_PATH, {'alerts': alerts})
return {'created': True, 'alert': new_alert, 'note': 'Notification delivery (email/discord) coming soon.'}
def tool_list_alerts(args: dict) -> dict:
addr, err = _require_wallet()
if err:
return err
data = _safe_read_json(ALERTS_PATH, {'alerts': []})
alerts = data.get('alerts', []) if isinstance(data, dict) else []
mine = [a for a in alerts if a.get('address') == addr and a.get('active', True)]
return {'alerts': mine, 'count': len(mine)}
def tool_ask_agent_chain(args: dict) -> dict:
# 🟡 Placeholder — spawning a nested tool-chain from inside a tool call
# risks infinite recursion under the current single-pass loop. Advise user.
prompt = (args.get('prompt') or '').strip()
max_steps = int(args.get('max_steps') or 3)
if not prompt:
return {'error': 'prompt required'}
return {
'status': 'placeholder',
'prompt': prompt,
'max_steps': max_steps,
'note': ('Recursive agent chains run in the superior agent loop by default — '
'ask JAE-AI directly with "plan a %d-step research on X" to get the same behaviour.' % max_steps),
}
# ═══════════════════════════════════════════════════════════════════════════
# TIER: ADMIN
# ═══════════════════════════════════════════════════════════════════════════
def tool_deploy_sitrep_now(args: dict) -> dict:
if not _is_admin(_current_wallet()):
return {'error': 'admin only'}
sitrep = Path(__file__).parent / 'sitrep_generator.py'
if not sitrep.exists():
return {'error': 'sitrep_generator.py not found'}
try:
r = subprocess.run(
['python3', str(sitrep)],
capture_output=True, text=True, timeout=180, cwd=str(sitrep.parent),
)
return {
'triggered': True,
'exit_code': r.returncode,
'stdout_tail': (r.stdout or '')[-600:],
'stderr_tail': (r.stderr or '')[-300:],
}
except subprocess.TimeoutExpired:
return {'error': 'sitrep generation timed out after 180s'}
except Exception as e:
return {'error': f'failed: {e}'}
def tool_broadcast_banner(args: dict) -> dict:
if not _is_admin(_current_wallet()):
return {'error': 'admin only'}
message = (args.get('message') or '').strip()
if not message:
return {'error': 'message required'}
try:
duration_h = float(args.get('duration_hours') or 1)
except (TypeError, ValueError):
duration_h = 1.0
duration_h = max(0.1, min(duration_h, 72.0))
expires_at = time.time() + duration_h * 3600
_safe_write_json(BROADCAST_PATH, {
'message': message[:400],
'created_at': time.time(),
'expires_at': expires_at,
'active': True,
})
return {'broadcasting': True, 'message': message, 'expires_in_hours': duration_h}
def tool_purge_cache(args: dict) -> dict:
if not _is_admin(_current_wallet()):
return {'error': 'admin only'}
name = (args.get('cache_name') or 'all').strip().lower()
cleared = []
try:
from agent_tools import _cache
if name == 'all':
n = len(_cache)
_cache.clear()
cleared.append(f'agent_tools._cache ({n} entries)')
else:
keys = [k for k in _cache if name in k.lower()]
for k in keys:
_cache.pop(k, None)
cleared.append(f'agent_tools._cache matching "{name}" ({len(keys)} entries)')
except Exception as e:
cleared.append(f'agent_tools._cache error: {e}')
# RADAR cache
if name in ('all', 'radar'):
try:
from app import RADAR_CACHE # type: ignore
RADAR_CACHE.clear() if hasattr(RADAR_CACHE, 'clear') else RADAR_CACHE.update({'items': [], 'ts': 0})
cleared.append('RADAR_CACHE')
except Exception as e:
cleared.append(f'RADAR_CACHE error: {e}')
return {'cleared': cleared, 'name': name}
def tool_view_private_stats(args: dict) -> dict:
if not _is_admin(_current_wallet()):
return {'error': 'admin only'}
out: dict = {'timestamp': time.time()}
try:
r = req.get('http://127.0.0.1:5000/api/leaderboards', timeout=5)
out['leaderboards'] = r.json() if r.status_code == 200 else None
except Exception as e:
out['leaderboards_error'] = str(e)
try:
r = req.get('http://127.0.0.1:5000/api/telemetry/overview', timeout=5)
out['telemetry'] = r.json() if r.status_code == 200 else None
except Exception as e:
out['telemetry_error'] = str(e)
try:
r = req.get('http://127.0.0.1:5000/api/telemetry/visitors', timeout=5)
out['recent_visitors'] = r.json() if r.status_code == 200 else None
except Exception as e:
out['visitors_error'] = str(e)
return out
# ═══════════════════════════════════════════════════════════════════════════
# REGISTRATION
# ═══════════════════════════════════════════════════════════════════════════
_EXTENDED = [
# Anonymous
Tool('search_unredacted', 'Search the UNREDACTED UFO/declassified archive. Top 5 matches.',
{'type': 'object', 'properties': {'query': {'type': 'string'}}, 'required': ['query']},
tool_search_unredacted, 'anonymous', '20/min', 10),
Tool('search_crimescene', 'Search the CRIMESCENE cold-case archive. Top 5 matches.',
{'type': 'object', 'properties': {'query': {'type': 'string'}}, 'required': ['query']},
tool_search_crimescene, 'anonymous', '20/min', 10),
Tool('search_radar', 'Search the live RADAR news feed aggregator. Top 5 matches.',
{'type': 'object', 'properties': {'query': {'type': 'string'}}, 'required': ['query']},
tool_search_radar, 'anonymous', '30/min', 10),
Tool('get_gov_domains_stats', 'Government domains monitor stats (total count + recent additions).',
{'type': 'object', 'properties': {}, 'required': []},
tool_get_gov_domains_stats, 'anonymous', '30/min', 8),
Tool('search_docs', 'Unified search across unredacted, crimescene, and latest SITREP. Top 10 hybrid results.',
{'type': 'object', 'properties': {'query': {'type': 'string'}}, 'required': ['query']},
tool_search_docs, 'anonymous', '10/min', 15),
Tool('get_server_status', 'Live CPU, RAM, disk, load, and uptime for jaeswift.xyz.',
{'type': 'object', 'properties': {}, 'required': []},
tool_get_server_status, 'anonymous', '30/min', 8),
Tool('random_fortune', 'Return a random one-liner fortune. Pass offensive=true for spicier pool.',
{'type': 'object', 'properties': {'offensive': {'type': 'boolean', 'default': False}}, 'required': []},
tool_random_fortune, 'anonymous', '30/min', 5),
Tool('ascii_banner', 'Generate ASCII-art banner of short text (≤30 chars).',
{'type': 'object', 'properties': {'text': {'type': 'string'}}, 'required': ['text']},
tool_ascii_banner, 'anonymous', '20/min', 5),
Tool('get_leaderboards', 'Top countries, pages, and referrers visiting jaeswift.xyz.',
{'type': 'object', 'properties': {}, 'required': []},
tool_get_leaderboards, 'anonymous', '30/min', 10),
Tool('get_network_graph_data', 'Live globe arc counts + top source countries of recent visitors.',
{'type': 'object', 'properties': {}, 'required': []},
tool_get_network_graph_data, 'anonymous', '30/min', 10),
Tool('get_guestbook', 'Read the public guestbook (latest N entries, default 10).',
{'type': 'object', 'properties': {'limit': {'type': 'integer', 'default': 10}}, 'required': []},
tool_get_guestbook, 'anonymous', '30/min', 5),
# Operator
Tool('save_memory', 'Save a personal fact keyed to your wallet (max 100, 500 chars each).',
{'type': 'object', 'properties': {'fact': {'type': 'string'}}, 'required': ['fact']},
tool_save_memory, 'operator', '10/min', 5),
Tool('list_memories', 'List all your saved memories.',
{'type': 'object', 'properties': {}, 'required': []},
tool_list_memories, 'operator', '30/min', 5),
Tool('delete_memory', 'Forget a memory by its index id.',
{'type': 'object', 'properties': {'id': {'type': 'integer'}}, 'required': ['id']},
tool_delete_memory, 'operator', '10/min', 5),
Tool('get_my_wallet_summary', 'Summary of your authenticated wallet: balance, tokens, recent tx count.',
{'type': 'object', 'properties': {}, 'required': []},
tool_get_my_wallet_summary, 'operator', '20/min', 15),
Tool('get_my_transactions', 'Your last N transactions on Solana (default 5, max 25).',
{'type': 'object', 'properties': {'limit': {'type': 'integer', 'default': 5}}, 'required': []},
tool_get_my_transactions, 'operator', '10/min', 15),
Tool('post_guestbook_entry', 'Post a message (≤280 chars) to the public guestbook, signed by your wallet.',
{'type': 'object', 'properties': {'message': {'type': 'string'}}, 'required': ['message']},
tool_post_guestbook_entry, 'operator', '3/hour', 5),
# Elite
Tool('deep_research', 'Run a multi-source research pass (site + contraband + unredacted + radar) on a topic.',
{'type': 'object', 'properties': {'topic': {'type': 'string'}}, 'required': ['topic']},
tool_deep_research, 'elite', '5/hour', 45),
Tool('generate_report', 'Request a long-form report on a topic. (Currently returns a placeholder.)',
{'type': 'object', 'properties': {
'topic': {'type': 'string'},
'format': {'type': 'string', 'default': 'markdown'},
}, 'required': ['topic']},
tool_generate_report, 'elite', '2/hour', 10),
Tool('track_token', 'Add a Solana token mint to your watchlist (max 50).',
{'type': 'object', 'properties': {'mint_address': {'type': 'string'}}, 'required': ['mint_address']},
tool_track_token, 'elite', '10/hour', 5),
Tool('set_alert', 'Create a price/volume alert for a token. type ∈ {price_above, price_below, volume_above}.',
{'type': 'object', 'properties': {
'type': {'type': 'string'},
'threshold': {'type': 'number'},
'token': {'type': 'string'},
}, 'required': ['type', 'threshold', 'token']},
tool_set_alert, 'elite', '10/hour', 5),
Tool('list_alerts', 'List your active alerts.',
{'type': 'object', 'properties': {}, 'required': []},
tool_list_alerts, 'elite', '30/min', 5),
Tool('ask_agent_chain', 'Spawn a nested sub-agent to chain up to N tool calls on a prompt. (Placeholder.)',
{'type': 'object', 'properties': {
'prompt': {'type': 'string'},
'max_steps': {'type': 'integer', 'default': 3},
}, 'required': ['prompt']},
tool_ask_agent_chain, 'elite', '3/hour', 60),
# Admin
Tool('deploy_sitrep_now', 'Trigger the SITREP generator immediately.',
{'type': 'object', 'properties': {}, 'required': []},
tool_deploy_sitrep_now, 'admin', '2/hour', 180),
Tool('broadcast_banner', 'Show a sitewide banner to all visitors (duration 0.1-72 hours).',
{'type': 'object', 'properties': {
'message': {'type': 'string'},
'duration_hours': {'type': 'number', 'default': 1},
}, 'required': ['message']},
tool_broadcast_banner, 'admin', '5/day', 5),
Tool('purge_cache', 'Clear in-memory caches. cache_name ∈ {all, radar, sol_price, crypto_price, server_status}.',
{'type': 'object', 'properties': {'cache_name': {'type': 'string', 'default': 'all'}}, 'required': []},
tool_purge_cache, 'admin', '10/hour', 5),
Tool('view_private_stats', 'Full visitor/telemetry/leaderboard data (admin only).',
{'type': 'object', 'properties': {}, 'required': []},
tool_view_private_stats, 'admin', '30/min', 15),
]
for _t in _EXTENDED:
register(_t)

View file

@ -1803,5 +1803,28 @@ def get_crimescene_category(cat_id):
return jsonify(c) return jsonify(c)
abort(404, f'Category {cat_id} not found') abort(404, f'Category {cat_id} not found')
# ── Broadcast banner (Phase 3) ──────────────────────────────────────────
@app.route('/api/broadcast/current')
def api_broadcast_current():
import time as _time
path = DATA_DIR / 'broadcast.json'
if not path.exists():
return ('', 204)
try:
with open(path) as f:
data = json.load(f)
except Exception:
return ('', 204)
if not data.get('active'):
return ('', 204)
if _time.time() > (data.get('expires_at') or 0):
return ('', 204)
return jsonify({
'message': data.get('message', ''),
'expires_at': data.get('expires_at'),
'created_at': data.get('created_at'),
})
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False) app.run(host='0.0.0.0', port=5000, debug=False)