- New /api/agent/chat endpoint with Venice tool-calling loop (max 8 iter)
- Tiered models: glm-4.7-flash default, kimi-k2-thinking for Elite+
- Wallet auth: /api/auth/{nonce,verify,whoami,logout} with Ed25519 + JWT
- 10 tools registered: site search, crypto prices, SITREP, .sol lookup,
wallet xray, contraband/awesomelist search, changelog, trigger_effect
- Per-tool rate limits, 30s timeout, \$30/mo budget guard
- Frontend: tier badge, tool call cards, wallet sign-in handshake
- Changelog v1.40.0
539 lines
20 KiB
Python
539 lines
20 KiB
Python
"""Agent tool registry + implementations for the JAE-AI agentic chat.
|
|
|
|
Register once on import. Each Tool carries:
|
|
- JSON schema for Venice function-calling
|
|
- Handler callable
|
|
- Minimum tier
|
|
- Rate limit spec ('N/min' or 'N/hour')
|
|
- Timeout (seconds)
|
|
"""
|
|
import json
|
|
import time
|
|
import re
|
|
from collections import defaultdict, deque
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Callable, Any
|
|
|
|
import requests as req
|
|
|
|
DATA_DIR = Path(__file__).parent / 'data'
|
|
|
|
|
|
@dataclass
|
|
class Tool:
|
|
name: str
|
|
description: str
|
|
params: dict
|
|
handler: Callable[[dict], Any]
|
|
tier: str = 'anonymous'
|
|
rate_limit: str = '60/min'
|
|
timeout: int = 30
|
|
|
|
|
|
TOOLS: dict[str, Tool] = {}
|
|
|
|
|
|
def register(tool: Tool):
|
|
TOOLS[tool.name] = tool
|
|
|
|
|
|
# ── Rate limiting (per (identity, tool) key) ─────────────────────────────
|
|
_rate_state: dict[tuple, deque] = defaultdict(lambda: deque(maxlen=200))
|
|
|
|
|
|
def parse_spec(spec: str) -> tuple[int, int]:
|
|
"""'10/min' -> (10, 60). Supports /sec /min /hour /day."""
|
|
try:
|
|
n, unit = spec.split('/')
|
|
n = int(n)
|
|
except Exception:
|
|
return 60, 60
|
|
unit = unit.lower().strip()
|
|
period = {'sec': 1, 'second': 1, 'min': 60, 'minute': 60,
|
|
'hour': 3600, 'hr': 3600, 'day': 86400}.get(unit, 60)
|
|
return n, period
|
|
|
|
|
|
def check_rate(key: tuple, spec: str) -> bool:
|
|
limit, period = parse_spec(spec)
|
|
now = time.time()
|
|
q = _rate_state[key]
|
|
while q and q[0] < now - period:
|
|
q.popleft()
|
|
if len(q) >= limit:
|
|
return False
|
|
q.append(now)
|
|
return True
|
|
|
|
|
|
# ── Simple TTL cache ─────────────────────────────────────────────────────
|
|
_cache: dict[str, tuple[float, Any]] = {}
|
|
|
|
|
|
def cache_get(key: str, ttl: float) -> Any | None:
|
|
entry = _cache.get(key)
|
|
if entry and time.time() - entry[0] < ttl:
|
|
return entry[1]
|
|
return None
|
|
|
|
|
|
def cache_set(key: str, value: Any):
|
|
_cache[key] = (time.time(), value)
|
|
|
|
|
|
def _load_json(name: str) -> Any:
|
|
try:
|
|
with open(DATA_DIR / name) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ── Tool Handlers ────────────────────────────────────────────────────────
|
|
|
|
def tool_search_site(args: dict) -> dict:
|
|
query = (args.get('query') or '').strip().lower()
|
|
if not query:
|
|
return {'error': 'query required', 'results': []}
|
|
results = []
|
|
# Index main content files
|
|
sources = [
|
|
('changelog', 'changelog.json'),
|
|
('navigation', 'navigation.json'),
|
|
('homepage', 'homepage.json'),
|
|
('managed_services', 'managed_services.json'),
|
|
('posts', 'posts.json'),
|
|
('links', 'links.json'),
|
|
('tracks', 'tracks.json'),
|
|
]
|
|
for label, fname in sources:
|
|
data = _load_json(fname)
|
|
if data is None:
|
|
continue
|
|
blob = json.dumps(data, ensure_ascii=False).lower()
|
|
if query in blob:
|
|
# Find first match snippet
|
|
idx = blob.find(query)
|
|
snippet = blob[max(0, idx - 80): idx + 160]
|
|
results.append({
|
|
'source': label,
|
|
'title': label.replace('_', ' ').title(),
|
|
'snippet': snippet.replace('\n', ' '),
|
|
'url': f'/api/{label}' if label != 'homepage' else '/',
|
|
})
|
|
return {'results': results[:5], 'count': len(results)}
|
|
|
|
|
|
def tool_get_sol_price(args: dict) -> dict:
|
|
cached = cache_get('sol_price', 30)
|
|
if cached is not None:
|
|
return cached
|
|
try:
|
|
r = req.get('https://api.binance.com/api/v3/ticker/24hr',
|
|
params={'symbol': 'SOLUSDT'}, timeout=8)
|
|
r.raise_for_status()
|
|
j = r.json()
|
|
out = {
|
|
'symbol': 'SOL',
|
|
'price_usd': float(j.get('lastPrice', 0)),
|
|
'change_24h_pct': float(j.get('priceChangePercent', 0)),
|
|
'volume_24h': float(j.get('volume', 0)),
|
|
'source': 'binance',
|
|
}
|
|
cache_set('sol_price', out)
|
|
return out
|
|
except Exception as e:
|
|
return {'error': f'price fetch failed: {e}'}
|
|
|
|
|
|
def tool_get_crypto_price(args: dict) -> dict:
|
|
symbol = (args.get('symbol') or '').upper().strip()
|
|
if not symbol or not re.match(r'^[A-Z0-9]{2,10}$', symbol):
|
|
return {'error': 'valid symbol required (e.g. BTC, ETH)'}
|
|
ck = f'crypto_price:{symbol}'
|
|
cached = cache_get(ck, 30)
|
|
if cached is not None:
|
|
return cached
|
|
pair = f'{symbol}USDT'
|
|
try:
|
|
r = req.get('https://api.binance.com/api/v3/ticker/24hr',
|
|
params={'symbol': pair}, timeout=8)
|
|
if r.status_code != 200:
|
|
return {'error': f'symbol {symbol} not found on binance'}
|
|
j = r.json()
|
|
out = {
|
|
'symbol': symbol,
|
|
'price_usd': float(j.get('lastPrice', 0)),
|
|
'change_24h_pct': float(j.get('priceChangePercent', 0)),
|
|
'volume_24h': float(j.get('volume', 0)),
|
|
}
|
|
cache_set(ck, out)
|
|
return out
|
|
except Exception as e:
|
|
return {'error': f'price fetch failed: {e}'}
|
|
|
|
|
|
def _search_entries(entries: list, query: str, fields: list[str], limit: int = 5) -> list:
|
|
q = query.lower()
|
|
matches = []
|
|
for e in entries:
|
|
blob = ' '.join(str(e.get(f, '')) for f in fields).lower()
|
|
if q in blob:
|
|
matches.append(e)
|
|
if len(matches) >= limit:
|
|
break
|
|
return matches
|
|
|
|
|
|
def tool_search_contraband(args: dict) -> dict:
|
|
query = (args.get('query') or '').strip()
|
|
category = (args.get('category') or '').strip().lower()
|
|
if not query:
|
|
return {'error': 'query required'}
|
|
data = _load_json('contraband.json') or {}
|
|
cats = data.get('categories', []) if isinstance(data, dict) else []
|
|
results = []
|
|
for cat in cats:
|
|
cat_name = (cat.get('name') or '').lower()
|
|
if category and category not in cat_name and category not in (cat.get('slug') or '').lower():
|
|
continue
|
|
for sub in cat.get('subcategories', []) or []:
|
|
for item in sub.get('items', []) or []:
|
|
blob = f"{item.get('title','')} {item.get('description','')} {item.get('url','')}".lower()
|
|
if query.lower() in blob:
|
|
results.append({
|
|
'title': item.get('title'),
|
|
'url': item.get('url'),
|
|
'description': (item.get('description') or '')[:200],
|
|
'category': cat.get('name'),
|
|
'subcategory': sub.get('name'),
|
|
})
|
|
if len(results) >= 5:
|
|
return {'results': results, 'count': len(results)}
|
|
return {'results': results, 'count': len(results)}
|
|
|
|
|
|
def tool_search_awesomelist(args: dict) -> dict:
|
|
query = (args.get('query') or '').strip().lower()
|
|
if not query:
|
|
return {'error': 'query required'}
|
|
index = _load_json('awesomelist_index.json') or {}
|
|
sectors = index.get('sectors', []) if isinstance(index, dict) else []
|
|
results = []
|
|
# Search sector names first
|
|
for sec in sectors:
|
|
name = (sec.get('name') or '').lower()
|
|
desc = (sec.get('description') or '').lower()
|
|
if query in name or query in desc:
|
|
results.append({
|
|
'type': 'sector',
|
|
'name': sec.get('name'),
|
|
'slug': sec.get('slug'),
|
|
'count': sec.get('count', 0),
|
|
'url': f"/recon/awesomelist?sector={sec.get('slug')}",
|
|
})
|
|
# Search inside individual sector files for entry matches
|
|
awesome_dir = DATA_DIR / 'awesomelist'
|
|
if awesome_dir.exists() and len(results) < 5:
|
|
import os
|
|
for fname in sorted(os.listdir(awesome_dir))[:80]:
|
|
if len(results) >= 5:
|
|
break
|
|
try:
|
|
with open(awesome_dir / fname) as f:
|
|
sec_data = json.load(f)
|
|
for entry in (sec_data.get('entries') or [])[:300]:
|
|
blob = f"{entry.get('name','')} {entry.get('description','')}".lower()
|
|
if query in blob:
|
|
results.append({
|
|
'type': 'entry',
|
|
'name': entry.get('name'),
|
|
'url': entry.get('url'),
|
|
'description': (entry.get('description') or '')[:200],
|
|
'sector_file': fname,
|
|
})
|
|
if len(results) >= 5:
|
|
break
|
|
except Exception:
|
|
continue
|
|
return {'results': results[:5], 'count': len(results)}
|
|
|
|
|
|
def tool_get_sitrep(args: dict) -> dict:
|
|
date_arg = (args.get('date') or 'latest').strip()
|
|
sitrep_dir = DATA_DIR / 'sitreps'
|
|
if not sitrep_dir.exists():
|
|
return {'error': 'No SITREPs available yet', 'sitrep': None}
|
|
import os
|
|
files = sorted([f for f in os.listdir(sitrep_dir) if f.endswith('.json')], reverse=True)
|
|
if not files:
|
|
return {'error': 'No SITREPs archived', 'sitrep': None}
|
|
target = None
|
|
if date_arg == 'latest':
|
|
target = files[0]
|
|
else:
|
|
for f in files:
|
|
if date_arg in f:
|
|
target = f
|
|
break
|
|
if not target:
|
|
return {'error': f'No SITREP matching "{date_arg}"', 'available': files[:5]}
|
|
try:
|
|
with open(sitrep_dir / target) as f:
|
|
data = json.load(f)
|
|
summary = data.get('summary') or data.get('content', '')[:800]
|
|
return {
|
|
'date': data.get('date', target.replace('.json', '')),
|
|
'summary': summary[:1200],
|
|
'source_count': len(data.get('sources', [])),
|
|
'sectors': list((data.get('sectors') or {}).keys()),
|
|
'url': '/transmissions/sitrep',
|
|
}
|
|
except Exception as e:
|
|
return {'error': f'Failed to read SITREP: {e}'}
|
|
|
|
|
|
def tool_lookup_sol_domain(args: dict) -> dict:
|
|
name = (args.get('name') or '').strip().lower().replace('.sol', '')
|
|
if not name or not re.match(r'^[a-z0-9_-]{1,63}$', name):
|
|
return {'error': 'invalid domain name'}
|
|
try:
|
|
# Bonfida SNS public API
|
|
r = req.get(f'https://sns-api.bonfida.com/domain/lookup/{name}', timeout=8)
|
|
if r.status_code == 200:
|
|
j = r.json()
|
|
return {
|
|
'name': f'{name}.sol',
|
|
'registered': True,
|
|
'owner': j.get('result', {}).get('owner') if isinstance(j.get('result'), dict) else j.get('owner'),
|
|
'raw': j,
|
|
'solscan_url': f'https://www.sns.id/domain?domain={name}',
|
|
}
|
|
if r.status_code == 404:
|
|
return {
|
|
'name': f'{name}.sol',
|
|
'registered': False,
|
|
'available': True,
|
|
'register_url': f'https://www.sns.id/search?search={name}',
|
|
}
|
|
return {'name': f'{name}.sol', 'status_code': r.status_code, 'raw': r.text[:400]}
|
|
except Exception as e:
|
|
return {'error': f'lookup failed: {e}'}
|
|
|
|
|
|
def tool_wallet_xray(args: dict) -> dict:
|
|
address = (args.get('address') or '').strip()
|
|
if not address or not (32 <= len(address) <= 44):
|
|
return {'error': 'invalid Solana address'}
|
|
# Lazy import to avoid circular
|
|
from auth_routes import get_rpc_url
|
|
rpc = get_rpc_url()
|
|
out: dict = {'address': address, 'rpc': 'helius' if 'helius' in rpc else 'public'}
|
|
try:
|
|
# Balance
|
|
r = req.post(rpc, json={'jsonrpc': '2.0', 'id': 1, 'method': 'getBalance',
|
|
'params': [address]}, timeout=10)
|
|
r.raise_for_status()
|
|
lamports = int(r.json().get('result', {}).get('value', 0))
|
|
out['balance_sol'] = round(lamports / 1_000_000_000, 6)
|
|
except Exception as e:
|
|
return {'error': f'balance fetch failed: {e}'}
|
|
try:
|
|
# Token accounts count
|
|
r = req.post(rpc, json={
|
|
'jsonrpc': '2.0', 'id': 2, 'method': 'getTokenAccountsByOwner',
|
|
'params': [address, {'programId': 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA'},
|
|
{'encoding': 'jsonParsed'}]
|
|
}, timeout=12)
|
|
tokens = r.json().get('result', {}).get('value', []) or []
|
|
non_zero = [t for t in tokens if int(t.get('account', {}).get('data', {}).get('parsed', {}).get('info', {}).get('tokenAmount', {}).get('amount', 0) or 0) > 0]
|
|
out['token_account_count'] = len(tokens)
|
|
out['non_zero_tokens'] = len(non_zero)
|
|
except Exception as e:
|
|
out['token_error'] = str(e)
|
|
try:
|
|
# Last 5 signatures
|
|
r = req.post(rpc, json={
|
|
'jsonrpc': '2.0', 'id': 3, 'method': 'getSignaturesForAddress',
|
|
'params': [address, {'limit': 5}]
|
|
}, timeout=10)
|
|
sigs = r.json().get('result', []) or []
|
|
out['recent_txs'] = [{
|
|
'signature': s.get('signature'),
|
|
'slot': s.get('slot'),
|
|
'block_time': s.get('blockTime'),
|
|
'err': bool(s.get('err')),
|
|
} for s in sigs]
|
|
except Exception as e:
|
|
out['tx_error'] = str(e)
|
|
out['solscan_url'] = f'https://solscan.io/account/{address}'
|
|
return out
|
|
|
|
|
|
def tool_get_changelog(args: dict) -> dict:
|
|
limit = int(args.get('limit') or 5)
|
|
limit = max(1, min(limit, 25))
|
|
data = _load_json('changelog.json') or {}
|
|
entries = data.get('entries', []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
|
|
out = []
|
|
for e in entries[:limit]:
|
|
out.append({
|
|
'version': e.get('version'),
|
|
'date': e.get('date'),
|
|
'category': e.get('category'),
|
|
'title': e.get('title'),
|
|
'changes': (e.get('changes') or [])[:6],
|
|
})
|
|
return {'entries': out, 'count': len(out), 'total_available': len(entries)}
|
|
|
|
|
|
VALID_EFFECTS = {
|
|
'crt', 'vhs', 'glitch', 'redalert', 'red', 'invert', 'blueprint', 'typewriter',
|
|
'gravity', 'earthquake', 'lowgravity', 'melt', 'shuffle',
|
|
'rain', 'snow', 'fog', 'night', 'underwater',
|
|
'dimensions', 'portal', 'retro',
|
|
'partymode', 'ghostmode', 'quantum', 'sneak', 'hacker',
|
|
'matrix', 'cmatrix',
|
|
}
|
|
|
|
|
|
def tool_trigger_effect(args: dict) -> dict:
|
|
name = (args.get('name') or '').strip().lower()
|
|
if name not in VALID_EFFECTS:
|
|
return {'error': f'unknown effect. Valid: {sorted(VALID_EFFECTS)}', 'effect': None}
|
|
return {
|
|
'action': 'trigger_effect',
|
|
'effect': name,
|
|
'message': f'Activating {name} effect on your browser…',
|
|
}
|
|
|
|
|
|
# ── Registration ─────────────────────────────────────────────────────────
|
|
|
|
register(Tool(
|
|
name='search_site',
|
|
description='Full-text search across jaeswift.xyz content (changelog, posts, navigation, homepage). Returns top 5 matches with snippets.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'query': {'type': 'string', 'description': 'search query'}},
|
|
'required': ['query'],
|
|
},
|
|
handler=tool_search_site, tier='anonymous', rate_limit='20/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='get_sol_price',
|
|
description='Get live Solana (SOL) price and 24h change from Binance.',
|
|
params={'type': 'object', 'properties': {}, 'required': []},
|
|
handler=tool_get_sol_price, tier='anonymous', rate_limit='60/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='get_crypto_price',
|
|
description='Get live USD price and 24h change for any crypto symbol (BTC, ETH, etc) from Binance.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'symbol': {'type': 'string', 'description': 'ticker symbol e.g. BTC, ETH, DOGE'}},
|
|
'required': ['symbol'],
|
|
},
|
|
handler=tool_get_crypto_price, tier='anonymous', rate_limit='60/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='search_contraband',
|
|
description='Search the CONTRABAND archive of 16k+ curated resources across 24 categories (software, privacy, movies, books, games, AI, etc.). Top 5 matches.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {
|
|
'query': {'type': 'string', 'description': 'search query'},
|
|
'category': {'type': 'string', 'description': 'optional category filter (e.g. privacy, software, ai)'},
|
|
},
|
|
'required': ['query'],
|
|
},
|
|
handler=tool_search_contraband, tier='anonymous', rate_limit='20/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='search_awesomelist',
|
|
description='Search 28 curated awesome-list sectors with 135k+ entries (development, datascience, security, etc.). Top 5 matches.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'query': {'type': 'string', 'description': 'search query'}},
|
|
'required': ['query'],
|
|
},
|
|
handler=tool_search_awesomelist, tier='anonymous', rate_limit='20/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='get_sitrep',
|
|
description='Get the latest (or a specific date) SITREP daily AI intelligence briefing covering tech, cybersecurity, and crypto.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'date': {'type': 'string', 'description': 'date (YYYY-MM-DD) or "latest"', 'default': 'latest'}},
|
|
'required': [],
|
|
},
|
|
handler=tool_get_sitrep, tier='anonymous', rate_limit='20/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='lookup_sol_domain',
|
|
description='Check if a .sol domain is registered or available (via Bonfida SNS). Returns owner if registered.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'name': {'type': 'string', 'description': 'domain name without .sol suffix'}},
|
|
'required': ['name'],
|
|
},
|
|
handler=tool_lookup_sol_domain, tier='anonymous', rate_limit='30/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='wallet_xray',
|
|
description='Scan any Solana wallet address: SOL balance, token account count, recent transactions.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'address': {'type': 'string', 'description': 'Solana wallet pubkey (base58)'}},
|
|
'required': ['address'],
|
|
},
|
|
handler=tool_wallet_xray, tier='anonymous', rate_limit='10/min', timeout=20,
|
|
))
|
|
|
|
register(Tool(
|
|
name='get_changelog',
|
|
description='Get the last N changelog entries from jaeswift.xyz (what has been built/fixed recently).',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'limit': {'type': 'integer', 'description': 'number of entries (1-25)', 'default': 5}},
|
|
'required': [],
|
|
},
|
|
handler=tool_get_changelog, tier='anonymous', rate_limit='30/min',
|
|
))
|
|
|
|
register(Tool(
|
|
name='trigger_effect',
|
|
description='Activate a sitewide visual effect on the user\'s browser (crt, vhs, glitch, redalert, matrix, rain, snow, partymode, etc.). Returns an action payload the frontend will execute.',
|
|
params={
|
|
'type': 'object',
|
|
'properties': {'name': {'type': 'string', 'description': 'effect name (e.g. crt, rain, partymode)'}},
|
|
'required': ['name'],
|
|
},
|
|
handler=tool_trigger_effect, tier='anonymous', rate_limit='30/min',
|
|
))
|
|
|
|
|
|
# ── Public API ───────────────────────────────────────────────────────────
|
|
|
|
def get_allowed_tools(user_tier: str) -> list[Tool]:
|
|
from agent_tiers import tier_allows
|
|
return [t for t in TOOLS.values() if tier_allows(user_tier, t.tier)]
|
|
|
|
|
|
def openai_tool_schemas(tools: list[Tool]) -> list[dict]:
|
|
return [{
|
|
'type': 'function',
|
|
'function': {
|
|
'name': t.name,
|
|
'description': t.description,
|
|
'parameters': t.params,
|
|
},
|
|
} for t in tools]
|