"""Agentic chat endpoint for JAE-AI. POST /api/agent/chat {messages: [{role, content}], session_id?} Runs an agent loop calling Venice API with tool-calling support. Tier-routed model selection, per-(identity, tool) rate limiting, monthly output-token budget cap. """ import json import os import time import calendar import datetime import threading from pathlib import Path from flask import Blueprint, request, jsonify import requests as req from auth_routes import read_session, fetch_sol_balance, _load_apikeys from agent_tiers import compute_tier, pick_model, tier_allows from agent_tools import ( TOOLS, get_allowed_tools, openai_tool_schemas, check_rate, ) agent_bp = Blueprint('agent_bp', __name__, url_prefix='/api/agent') DATA_DIR = Path(__file__).parent / 'data' BUDGET_FILE = DATA_DIR / 'agent_budget.json' # Venice API VENICE_URL = 'https://api.venice.ai/api/v1/chat/completions' # Model pricing ($ per 1M tokens) — input, output MODEL_PRICING = { 'zai-org-glm-4.7-flash': (0.125, 0.50), 'kimi-k2-thinking': (0.75, 3.20), } MONTHLY_BUDGET_USD = 30.0 MAX_ITERATIONS = 8 MAX_MESSAGES = 40 # cap sent to venice _budget_lock = threading.Lock() SYSTEM_PROMPT = ( 'You are JAE-AI, the agentic AI operator embedded at jaeswift.xyz — a military / cyberpunk ' 'themed command centre. You speak like a tactical operator: terse, confident, a little cheeky. ' 'You have access to tools to search the site, fetch live crypto prices, scan Solana wallets, ' 'look up .sol domains, fetch SITREP briefings, and trigger sitewide visual effects on the ' "user's browser. Use tools proactively when helpful — don't just describe what you could do, " 'do it. After a tool returns, briefly explain the result in plain English. Keep replies under ' '200 words unless the user asks for detail. Never invent tool results. British English, occasional ' 'profanity is fine, never censor words. Current date: {date}.' ) def _load_budget() -> dict: try: with open(BUDGET_FILE) as f: return json.load(f) except Exception: return {} def _save_budget(b: dict): try: BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUDGET_FILE, 'w') as f: json.dump(b, f, indent=2) except Exception as e: print(f'[agent] budget save failed: {e}') def _current_month_key() -> str: return datetime.datetime.utcnow().strftime('%Y-%m') def _estimate_cost(model: str, in_tokens: int, out_tokens: int) -> float: in_p, out_p = MODEL_PRICING.get(model, (0.5, 1.0)) return (in_tokens * in_p + out_tokens * out_p) / 1_000_000 def _record_usage(model: str, in_tokens: int, out_tokens: int) -> tuple[float, float]: """Update budget. Returns (month_total_usd, remaining_usd).""" with _budget_lock: b = _load_budget() key = _current_month_key() month = b.setdefault(key, {'total_usd': 0.0, 'calls': 0, 'by_model': {}}) cost = _estimate_cost(model, in_tokens, out_tokens) month['total_usd'] = round(month['total_usd'] + cost, 6) month['calls'] = month['calls'] + 1 mm = month['by_model'].setdefault(model, {'calls': 0, 'in_tokens': 0, 'out_tokens': 0, 'usd': 0.0}) mm['calls'] += 1 mm['in_tokens'] += in_tokens mm['out_tokens'] += out_tokens mm['usd'] = round(mm['usd'] + cost, 6) _save_budget(b) return month['total_usd'], max(0.0, MONTHLY_BUDGET_USD - month['total_usd']) def _budget_exceeded() -> bool: b = _load_budget() month = b.get(_current_month_key(), {}) return month.get('total_usd', 0) >= MONTHLY_BUDGET_USD def _identity_key() -> str: sess = read_session() if sess and sess.get('address'): return f"wallet:{sess['address']}" return f"ip:{request.headers.get('X-Forwarded-For', request.remote_addr or 'unknown').split(',')[0].strip()}" def _execute_tool(name: str, args: dict, identity: str) -> dict: tool = TOOLS.get(name) if not tool: return {'error': f'unknown tool: {name}'} # Rate limit check rate_key = (identity, name) if not check_rate(rate_key, tool.rate_limit): return {'error': f'rate limit exceeded for {name} ({tool.rate_limit})'} try: return tool.handler(args or {}) except Exception as e: return {'error': f'tool {name} failed: {e}'} def _call_venice(model: str, messages: list, tools_schema: list, api_key: str) -> dict: resp = req.post( VENICE_URL, headers={'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}, json={ 'model': model, 'messages': messages, 'tools': tools_schema if tools_schema else None, 'tool_choice': 'auto' if tools_schema else None, 'temperature': 0.7, 'max_tokens': 1500, }, timeout=90, ) if resp.status_code != 200: raise RuntimeError(f'Venice HTTP {resp.status_code}: {resp.text[:400]}') return resp.json() @agent_bp.route('/chat', methods=['POST']) def agent_chat(): # ── Identity / tier ────────────────────────────────────────────────── sess = read_session() if sess and sess.get('address'): address = sess['address'] balance = fetch_sol_balance(address) tier = compute_tier(address, balance) else: address = None tier = 'anonymous' # ── Budget guard ──────────────────────────────────────────────────── if _budget_exceeded(): return jsonify({ 'error': 'Monthly agent budget exceeded — please try again next month', 'budget_usd': MONTHLY_BUDGET_USD, }), 429 # ── Input ─────────────────────────────────────────────────────────── data = request.get_json(silent=True) or {} messages_in = data.get('messages') or [] if not isinstance(messages_in, list) or not messages_in: return jsonify({'error': 'messages array required'}), 400 memory_context = (data.get('memory_context') or '').strip() # Sanitise + cap cleaned = [] for m in messages_in[-MAX_MESSAGES:]: role = m.get('role') if role not in ('user', 'assistant', 'system', 'tool'): continue cleaned.append({k: v for k, v in m.items() if k in ('role', 'content', 'tool_calls', 'tool_call_id', 'name')}) if not cleaned: return jsonify({'error': 'no valid messages'}), 400 # ── Build system prompt ───────────────────────────────────────────── sys_prompt = SYSTEM_PROMPT.format(date=datetime.date.today().isoformat()) if memory_context and tier != 'anonymous': sys_prompt = memory_context + '\n\n' + sys_prompt if tier == 'elite': sys_prompt += '\n\n[TIER: ELITE — $JAE holder. Greet them accordingly.]' elif tier == 'admin': sys_prompt += '\n\n[TIER: ADMIN — this is Jae, the site operator.]' elif tier == 'operator': sys_prompt += '\n\n[TIER: OPERATOR — wallet authenticated.]' messages = [{'role': 'system', 'content': sys_prompt}] + cleaned # ── Model + tools ─────────────────────────────────────────────────── model = pick_model(tier) allowed = get_allowed_tools(tier) tools_schema = openai_tool_schemas(allowed) # ── API key ───────────────────────────────────────────────────────── keys = _load_apikeys() api_key = (keys.get('venice') or {}).get('api_key', '') if not api_key: return jsonify({'error': 'Venice API key not configured'}), 500 identity = _identity_key() tool_trace = [] total_in = 0 total_out = 0 final_content = '' frontend_actions = [] # effects etc. to execute client-side try: for iteration in range(MAX_ITERATIONS): resp_json = _call_venice(model, messages, tools_schema, api_key) usage = resp_json.get('usage') or {} total_in += int(usage.get('prompt_tokens', 0) or 0) total_out += int(usage.get('completion_tokens', 0) or 0) choice = (resp_json.get('choices') or [{}])[0] msg = choice.get('message') or {} tool_calls = msg.get('tool_calls') or [] # Append assistant turn assistant_turn = {'role': 'assistant', 'content': msg.get('content') or ''} if tool_calls: assistant_turn['tool_calls'] = tool_calls messages.append(assistant_turn) if not tool_calls: final_content = msg.get('content') or msg.get('reasoning_content') or '' break # Execute tool calls for tc in tool_calls: fn = (tc.get('function') or {}) name = fn.get('name', '') raw_args = fn.get('arguments') or '{}' try: args = json.loads(raw_args) if isinstance(raw_args, str) else (raw_args or {}) except Exception: args = {} result = _execute_tool(name, args, identity) tool_trace.append({ 'name': name, 'args': args, 'result': result, 'iteration': iteration, }) # Capture frontend trigger_effect action if isinstance(result, dict) and result.get('action') == 'trigger_effect': frontend_actions.append({ 'action': 'trigger_effect', 'effect': result.get('effect'), }) messages.append({ 'role': 'tool', 'tool_call_id': tc.get('id'), 'name': name, 'content': json.dumps(result, default=str)[:4000], }) else: final_content = final_content or '(agent reached iteration limit)' except Exception as e: return jsonify({ 'error': f'agent failure: {e}', 'tool_calls': tool_trace, 'tier': tier, 'model_used': model, }), 502 # ── Record usage ──────────────────────────────────────────────────── month_total, remaining = _record_usage(model, total_in, total_out) return jsonify({ 'content': final_content, 'tool_calls': tool_trace, 'frontend_actions': frontend_actions, 'model_used': model, 'tier': tier, 'authenticated': bool(address), 'address': address, 'tokens': {'in': total_in, 'out': total_out}, 'budget': { 'month_usd': round(month_total, 4), 'remaining_usd': round(remaining, 4), 'cap_usd': MONTHLY_BUDGET_USD, }, }) @agent_bp.route('/tools', methods=['GET']) def list_tools(): """Debug: list tools available to current tier.""" sess = read_session() address = sess.get('address') if sess else None balance = fetch_sol_balance(address) if address else 0.0 tier = compute_tier(address, balance) if address else 'anonymous' allowed = get_allowed_tools(tier) return jsonify({ 'tier': tier, 'model': pick_model(tier), 'tools': [{'name': t.name, 'tier': t.tier, 'rate_limit': t.rate_limit, 'description': t.description} for t in allowed], }) @agent_bp.route('/budget', methods=['GET']) def budget_status(): b = _load_budget() month = b.get(_current_month_key(), {'total_usd': 0.0, 'calls': 0, 'by_model': {}}) return jsonify({ 'month': _current_month_key(), 'cap_usd': MONTHLY_BUDGET_USD, 'total_usd': round(month.get('total_usd', 0), 4), 'remaining_usd': round(max(0.0, MONTHLY_BUDGET_USD - month.get('total_usd', 0)), 4), 'calls': month.get('calls', 0), 'by_model': month.get('by_model', {}), })