jaeswift-website/api/agent_tools.py
jae e73b74cfa2 feat(agent): agentic chat with wallet auth and tiered model routing
- New /api/agent/chat endpoint with Venice tool-calling loop (max 8 iter)
- Tiered models: glm-4.7-flash default, kimi-k2-thinking for Elite+
- Wallet auth: /api/auth/{nonce,verify,whoami,logout} with Ed25519 + JWT
- 10 tools registered: site search, crypto prices, SITREP, .sol lookup,
  wallet xray, contraband/awesomelist search, changelog, trigger_effect
- Per-tool rate limits, 30s timeout, \$30/mo budget guard
- Frontend: tier badge, tool call cards, wallet sign-in handshake
- Changelog v1.40.0
2026-04-20 10:40:27 +00:00

539 lines
20 KiB
Python

"""Agent tool registry + implementations for the JAE-AI agentic chat.
Register once on import. Each Tool carries:
- JSON schema for Venice function-calling
- Handler callable
- Minimum tier
- Rate limit spec ('N/min' or 'N/hour')
- Timeout (seconds)
"""
import json
import time
import re
from collections import defaultdict, deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Any
import requests as req
DATA_DIR = Path(__file__).parent / 'data'
@dataclass
class Tool:
name: str
description: str
params: dict
handler: Callable[[dict], Any]
tier: str = 'anonymous'
rate_limit: str = '60/min'
timeout: int = 30
TOOLS: dict[str, Tool] = {}
def register(tool: Tool):
TOOLS[tool.name] = tool
# ── Rate limiting (per (identity, tool) key) ─────────────────────────────
_rate_state: dict[tuple, deque] = defaultdict(lambda: deque(maxlen=200))
def parse_spec(spec: str) -> tuple[int, int]:
"""'10/min' -> (10, 60). Supports /sec /min /hour /day."""
try:
n, unit = spec.split('/')
n = int(n)
except Exception:
return 60, 60
unit = unit.lower().strip()
period = {'sec': 1, 'second': 1, 'min': 60, 'minute': 60,
'hour': 3600, 'hr': 3600, 'day': 86400}.get(unit, 60)
return n, period
def check_rate(key: tuple, spec: str) -> bool:
limit, period = parse_spec(spec)
now = time.time()
q = _rate_state[key]
while q and q[0] < now - period:
q.popleft()
if len(q) >= limit:
return False
q.append(now)
return True
# ── Simple TTL cache ─────────────────────────────────────────────────────
_cache: dict[str, tuple[float, Any]] = {}
def cache_get(key: str, ttl: float) -> Any | None:
entry = _cache.get(key)
if entry and time.time() - entry[0] < ttl:
return entry[1]
return None
def cache_set(key: str, value: Any):
_cache[key] = (time.time(), value)
def _load_json(name: str) -> Any:
try:
with open(DATA_DIR / name) as f:
return json.load(f)
except Exception:
return None
# ── Tool Handlers ────────────────────────────────────────────────────────
def tool_search_site(args: dict) -> dict:
query = (args.get('query') or '').strip().lower()
if not query:
return {'error': 'query required', 'results': []}
results = []
# Index main content files
sources = [
('changelog', 'changelog.json'),
('navigation', 'navigation.json'),
('homepage', 'homepage.json'),
('managed_services', 'managed_services.json'),
('posts', 'posts.json'),
('links', 'links.json'),
('tracks', 'tracks.json'),
]
for label, fname in sources:
data = _load_json(fname)
if data is None:
continue
blob = json.dumps(data, ensure_ascii=False).lower()
if query in blob:
# Find first match snippet
idx = blob.find(query)
snippet = blob[max(0, idx - 80): idx + 160]
results.append({
'source': label,
'title': label.replace('_', ' ').title(),
'snippet': snippet.replace('\n', ' '),
'url': f'/api/{label}' if label != 'homepage' else '/',
})
return {'results': results[:5], 'count': len(results)}
def tool_get_sol_price(args: dict) -> dict:
cached = cache_get('sol_price', 30)
if cached is not None:
return cached
try:
r = req.get('https://api.binance.com/api/v3/ticker/24hr',
params={'symbol': 'SOLUSDT'}, timeout=8)
r.raise_for_status()
j = r.json()
out = {
'symbol': 'SOL',
'price_usd': float(j.get('lastPrice', 0)),
'change_24h_pct': float(j.get('priceChangePercent', 0)),
'volume_24h': float(j.get('volume', 0)),
'source': 'binance',
}
cache_set('sol_price', out)
return out
except Exception as e:
return {'error': f'price fetch failed: {e}'}
def tool_get_crypto_price(args: dict) -> dict:
symbol = (args.get('symbol') or '').upper().strip()
if not symbol or not re.match(r'^[A-Z0-9]{2,10}$', symbol):
return {'error': 'valid symbol required (e.g. BTC, ETH)'}
ck = f'crypto_price:{symbol}'
cached = cache_get(ck, 30)
if cached is not None:
return cached
pair = f'{symbol}USDT'
try:
r = req.get('https://api.binance.com/api/v3/ticker/24hr',
params={'symbol': pair}, timeout=8)
if r.status_code != 200:
return {'error': f'symbol {symbol} not found on binance'}
j = r.json()
out = {
'symbol': symbol,
'price_usd': float(j.get('lastPrice', 0)),
'change_24h_pct': float(j.get('priceChangePercent', 0)),
'volume_24h': float(j.get('volume', 0)),
}
cache_set(ck, out)
return out
except Exception as e:
return {'error': f'price fetch failed: {e}'}
def _search_entries(entries: list, query: str, fields: list[str], limit: int = 5) -> list:
q = query.lower()
matches = []
for e in entries:
blob = ' '.join(str(e.get(f, '')) for f in fields).lower()
if q in blob:
matches.append(e)
if len(matches) >= limit:
break
return matches
def tool_search_contraband(args: dict) -> dict:
query = (args.get('query') or '').strip()
category = (args.get('category') or '').strip().lower()
if not query:
return {'error': 'query required'}
data = _load_json('contraband.json') or {}
cats = data.get('categories', []) if isinstance(data, dict) else []
results = []
for cat in cats:
cat_name = (cat.get('name') or '').lower()
if category and category not in cat_name and category not in (cat.get('slug') or '').lower():
continue
for sub in cat.get('subcategories', []) or []:
for item in sub.get('items', []) or []:
blob = f"{item.get('title','')} {item.get('description','')} {item.get('url','')}".lower()
if query.lower() in blob:
results.append({
'title': item.get('title'),
'url': item.get('url'),
'description': (item.get('description') or '')[:200],
'category': cat.get('name'),
'subcategory': sub.get('name'),
})
if len(results) >= 5:
return {'results': results, 'count': len(results)}
return {'results': results, 'count': len(results)}
def tool_search_awesomelist(args: dict) -> dict:
query = (args.get('query') or '').strip().lower()
if not query:
return {'error': 'query required'}
index = _load_json('awesomelist_index.json') or {}
sectors = index.get('sectors', []) if isinstance(index, dict) else []
results = []
# Search sector names first
for sec in sectors:
name = (sec.get('name') or '').lower()
desc = (sec.get('description') or '').lower()
if query in name or query in desc:
results.append({
'type': 'sector',
'name': sec.get('name'),
'slug': sec.get('slug'),
'count': sec.get('count', 0),
'url': f"/recon/awesomelist?sector={sec.get('slug')}",
})
# Search inside individual sector files for entry matches
awesome_dir = DATA_DIR / 'awesomelist'
if awesome_dir.exists() and len(results) < 5:
import os
for fname in sorted(os.listdir(awesome_dir))[:80]:
if len(results) >= 5:
break
try:
with open(awesome_dir / fname) as f:
sec_data = json.load(f)
for entry in (sec_data.get('entries') or [])[:300]:
blob = f"{entry.get('name','')} {entry.get('description','')}".lower()
if query in blob:
results.append({
'type': 'entry',
'name': entry.get('name'),
'url': entry.get('url'),
'description': (entry.get('description') or '')[:200],
'sector_file': fname,
})
if len(results) >= 5:
break
except Exception:
continue
return {'results': results[:5], 'count': len(results)}
def tool_get_sitrep(args: dict) -> dict:
date_arg = (args.get('date') or 'latest').strip()
sitrep_dir = DATA_DIR / 'sitreps'
if not sitrep_dir.exists():
return {'error': 'No SITREPs available yet', 'sitrep': None}
import os
files = sorted([f for f in os.listdir(sitrep_dir) if f.endswith('.json')], reverse=True)
if not files:
return {'error': 'No SITREPs archived', 'sitrep': None}
target = None
if date_arg == 'latest':
target = files[0]
else:
for f in files:
if date_arg in f:
target = f
break
if not target:
return {'error': f'No SITREP matching "{date_arg}"', 'available': files[:5]}
try:
with open(sitrep_dir / target) as f:
data = json.load(f)
summary = data.get('summary') or data.get('content', '')[:800]
return {
'date': data.get('date', target.replace('.json', '')),
'summary': summary[:1200],
'source_count': len(data.get('sources', [])),
'sectors': list((data.get('sectors') or {}).keys()),
'url': '/transmissions/sitrep',
}
except Exception as e:
return {'error': f'Failed to read SITREP: {e}'}
def tool_lookup_sol_domain(args: dict) -> dict:
name = (args.get('name') or '').strip().lower().replace('.sol', '')
if not name or not re.match(r'^[a-z0-9_-]{1,63}$', name):
return {'error': 'invalid domain name'}
try:
# Bonfida SNS public API
r = req.get(f'https://sns-api.bonfida.com/domain/lookup/{name}', timeout=8)
if r.status_code == 200:
j = r.json()
return {
'name': f'{name}.sol',
'registered': True,
'owner': j.get('result', {}).get('owner') if isinstance(j.get('result'), dict) else j.get('owner'),
'raw': j,
'solscan_url': f'https://www.sns.id/domain?domain={name}',
}
if r.status_code == 404:
return {
'name': f'{name}.sol',
'registered': False,
'available': True,
'register_url': f'https://www.sns.id/search?search={name}',
}
return {'name': f'{name}.sol', 'status_code': r.status_code, 'raw': r.text[:400]}
except Exception as e:
return {'error': f'lookup failed: {e}'}
def tool_wallet_xray(args: dict) -> dict:
address = (args.get('address') or '').strip()
if not address or not (32 <= len(address) <= 44):
return {'error': 'invalid Solana address'}
# Lazy import to avoid circular
from auth_routes import get_rpc_url
rpc = get_rpc_url()
out: dict = {'address': address, 'rpc': 'helius' if 'helius' in rpc else 'public'}
try:
# Balance
r = req.post(rpc, json={'jsonrpc': '2.0', 'id': 1, 'method': 'getBalance',
'params': [address]}, timeout=10)
r.raise_for_status()
lamports = int(r.json().get('result', {}).get('value', 0))
out['balance_sol'] = round(lamports / 1_000_000_000, 6)
except Exception as e:
return {'error': f'balance fetch failed: {e}'}
try:
# Token accounts count
r = req.post(rpc, json={
'jsonrpc': '2.0', 'id': 2, 'method': 'getTokenAccountsByOwner',
'params': [address, {'programId': 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA'},
{'encoding': 'jsonParsed'}]
}, timeout=12)
tokens = r.json().get('result', {}).get('value', []) or []
non_zero = [t for t in tokens if int(t.get('account', {}).get('data', {}).get('parsed', {}).get('info', {}).get('tokenAmount', {}).get('amount', 0) or 0) > 0]
out['token_account_count'] = len(tokens)
out['non_zero_tokens'] = len(non_zero)
except Exception as e:
out['token_error'] = str(e)
try:
# Last 5 signatures
r = req.post(rpc, json={
'jsonrpc': '2.0', 'id': 3, 'method': 'getSignaturesForAddress',
'params': [address, {'limit': 5}]
}, timeout=10)
sigs = r.json().get('result', []) or []
out['recent_txs'] = [{
'signature': s.get('signature'),
'slot': s.get('slot'),
'block_time': s.get('blockTime'),
'err': bool(s.get('err')),
} for s in sigs]
except Exception as e:
out['tx_error'] = str(e)
out['solscan_url'] = f'https://solscan.io/account/{address}'
return out
def tool_get_changelog(args: dict) -> dict:
limit = int(args.get('limit') or 5)
limit = max(1, min(limit, 25))
data = _load_json('changelog.json') or {}
entries = data.get('entries', []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
out = []
for e in entries[:limit]:
out.append({
'version': e.get('version'),
'date': e.get('date'),
'category': e.get('category'),
'title': e.get('title'),
'changes': (e.get('changes') or [])[:6],
})
return {'entries': out, 'count': len(out), 'total_available': len(entries)}
VALID_EFFECTS = {
'crt', 'vhs', 'glitch', 'redalert', 'red', 'invert', 'blueprint', 'typewriter',
'gravity', 'earthquake', 'lowgravity', 'melt', 'shuffle',
'rain', 'snow', 'fog', 'night', 'underwater',
'dimensions', 'portal', 'retro',
'partymode', 'ghostmode', 'quantum', 'sneak', 'hacker',
'matrix', 'cmatrix',
}
def tool_trigger_effect(args: dict) -> dict:
name = (args.get('name') or '').strip().lower()
if name not in VALID_EFFECTS:
return {'error': f'unknown effect. Valid: {sorted(VALID_EFFECTS)}', 'effect': None}
return {
'action': 'trigger_effect',
'effect': name,
'message': f'Activating {name} effect on your browser…',
}
# ── Registration ─────────────────────────────────────────────────────────
register(Tool(
name='search_site',
description='Full-text search across jaeswift.xyz content (changelog, posts, navigation, homepage). Returns top 5 matches with snippets.',
params={
'type': 'object',
'properties': {'query': {'type': 'string', 'description': 'search query'}},
'required': ['query'],
},
handler=tool_search_site, tier='anonymous', rate_limit='20/min',
))
register(Tool(
name='get_sol_price',
description='Get live Solana (SOL) price and 24h change from Binance.',
params={'type': 'object', 'properties': {}, 'required': []},
handler=tool_get_sol_price, tier='anonymous', rate_limit='60/min',
))
register(Tool(
name='get_crypto_price',
description='Get live USD price and 24h change for any crypto symbol (BTC, ETH, etc) from Binance.',
params={
'type': 'object',
'properties': {'symbol': {'type': 'string', 'description': 'ticker symbol e.g. BTC, ETH, DOGE'}},
'required': ['symbol'],
},
handler=tool_get_crypto_price, tier='anonymous', rate_limit='60/min',
))
register(Tool(
name='search_contraband',
description='Search the CONTRABAND archive of 16k+ curated resources across 24 categories (software, privacy, movies, books, games, AI, etc.). Top 5 matches.',
params={
'type': 'object',
'properties': {
'query': {'type': 'string', 'description': 'search query'},
'category': {'type': 'string', 'description': 'optional category filter (e.g. privacy, software, ai)'},
},
'required': ['query'],
},
handler=tool_search_contraband, tier='anonymous', rate_limit='20/min',
))
register(Tool(
name='search_awesomelist',
description='Search 28 curated awesome-list sectors with 135k+ entries (development, datascience, security, etc.). Top 5 matches.',
params={
'type': 'object',
'properties': {'query': {'type': 'string', 'description': 'search query'}},
'required': ['query'],
},
handler=tool_search_awesomelist, tier='anonymous', rate_limit='20/min',
))
register(Tool(
name='get_sitrep',
description='Get the latest (or a specific date) SITREP daily AI intelligence briefing covering tech, cybersecurity, and crypto.',
params={
'type': 'object',
'properties': {'date': {'type': 'string', 'description': 'date (YYYY-MM-DD) or "latest"', 'default': 'latest'}},
'required': [],
},
handler=tool_get_sitrep, tier='anonymous', rate_limit='20/min',
))
register(Tool(
name='lookup_sol_domain',
description='Check if a .sol domain is registered or available (via Bonfida SNS). Returns owner if registered.',
params={
'type': 'object',
'properties': {'name': {'type': 'string', 'description': 'domain name without .sol suffix'}},
'required': ['name'],
},
handler=tool_lookup_sol_domain, tier='anonymous', rate_limit='30/min',
))
register(Tool(
name='wallet_xray',
description='Scan any Solana wallet address: SOL balance, token account count, recent transactions.',
params={
'type': 'object',
'properties': {'address': {'type': 'string', 'description': 'Solana wallet pubkey (base58)'}},
'required': ['address'],
},
handler=tool_wallet_xray, tier='anonymous', rate_limit='10/min', timeout=20,
))
register(Tool(
name='get_changelog',
description='Get the last N changelog entries from jaeswift.xyz (what has been built/fixed recently).',
params={
'type': 'object',
'properties': {'limit': {'type': 'integer', 'description': 'number of entries (1-25)', 'default': 5}},
'required': [],
},
handler=tool_get_changelog, tier='anonymous', rate_limit='30/min',
))
register(Tool(
name='trigger_effect',
description='Activate a sitewide visual effect on the user\'s browser (crt, vhs, glitch, redalert, matrix, rain, snow, partymode, etc.). Returns an action payload the frontend will execute.',
params={
'type': 'object',
'properties': {'name': {'type': 'string', 'description': 'effect name (e.g. crt, rain, partymode)'}},
'required': ['name'],
},
handler=tool_trigger_effect, tier='anonymous', rate_limit='30/min',
))
# ── Public API ───────────────────────────────────────────────────────────
def get_allowed_tools(user_tier: str) -> list[Tool]:
from agent_tiers import tier_allows
return [t for t in TOOLS.values() if tier_allows(user_tier, t.tier)]
def openai_tool_schemas(tools: list[Tool]) -> list[dict]:
return [{
'type': 'function',
'function': {
'name': t.name,
'description': t.description,
'parameters': t.params,
},
} for t in tools]