Files
autarch/web/routes/chat.py
SsSnake da53899f66 AUTARCH v1.9 — remote monitoring, SSH manager, daemon, vault, cleanup
- Add Remote Monitoring Station with PIAP device profile system
- Add SSH/SSHD manager with fail2ban integration
- Add privileged daemon architecture for safe root operations
- Add encrypted vault, HAL memory, HAL auto-analyst
- Add network security suite, module creator, codex training
- Add start.sh launcher script and GTK3 desktop launcher
- Remove Output/ build artifacts, installer files, loose docs
- Update .gitignore for runtime data and build artifacts
- Update README for v1.9 with new launch method, screenshots, and features

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 06:59:06 -07:00

470 lines
16 KiB
Python

"""Chat and Agent API routes — Hal chat with Agent system for module creation."""
import json
import os
import threading
import time
import uuid
from pathlib import Path
from flask import Blueprint, request, jsonify, Response
from web.auth import login_required
chat_bp = Blueprint('chat', __name__, url_prefix='/api')
_agent_runs: dict = {} # run_id -> {'steps': [], 'done': bool, 'stop': threading.Event}
_system_prompt = None
def _get_system_prompt():
"""Load the Hal system prompt from data/hal_system_prompt.txt."""
global _system_prompt
if _system_prompt is None:
prompt_path = Path(__file__).parent.parent.parent / 'data' / 'hal_system_prompt.txt'
if prompt_path.exists():
_system_prompt = prompt_path.read_text(encoding='utf-8')
else:
_system_prompt = (
"You are Hal, the AI agent for AUTARCH. You can create new modules, "
"run shell commands, read and write files. When asked to create a module, "
"use the create_module tool."
)
return _system_prompt
def _ensure_model_loaded():
"""Load the LLM model if not already loaded. Returns (llm, error)."""
from core.llm import get_llm, LLMError
llm = get_llm()
if not llm.is_loaded:
try:
llm.load_model(verbose=False)
except LLMError as e:
return None, str(e)
return llm, None
@chat_bp.route('/chat', methods=['POST'])
@login_required
def chat():
"""Handle chat messages — direct chat or agent mode based on user toggle.
Streams response via SSE."""
data = request.get_json(silent=True) or {}
message = data.get('message', '').strip()
mode = data.get('mode', 'chat') # 'chat' (default) or 'agent'
if not message:
return jsonify({'error': 'No message provided'})
# Store in HAL's encrypted memory
try:
from core.hal_memory import get_hal_memory
mem = get_hal_memory()
mem.add('user', message, metadata={'mode': mode})
except Exception:
pass
if mode == 'agent':
return _handle_agent_chat(message)
else:
return _handle_direct_chat(message)
def _handle_direct_chat(message):
"""Direct chat mode — streams tokens from the LLM without the Agent system."""
def generate():
from core.llm import get_llm, LLMError
llm = get_llm()
if not llm.is_loaded:
yield f"data: {json.dumps({'type': 'status', 'content': 'Loading model...'})}\n\n"
try:
llm.load_model(verbose=False)
except LLMError as e:
yield f"data: {json.dumps({'type': 'error', 'content': f'Failed to load model: {e}'})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return
system_prompt = _get_system_prompt()
try:
token_gen = llm.chat(message, system_prompt=system_prompt, stream=True)
full_response = []
for token in token_gen:
full_response.append(token)
yield f"data: {json.dumps({'token': token})}\n\n"
# Store HAL's response in memory
try:
from core.hal_memory import get_hal_memory
get_hal_memory().add('hal', ''.join(full_response))
except Exception:
pass
except LLMError as e:
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
def _handle_agent_chat(message):
"""Agent mode — uses the Agent system with tools for complex tasks."""
run_id = str(uuid.uuid4())
stop_event = threading.Event()
steps = []
_agent_runs[run_id] = {'steps': steps, 'done': False, 'stop': stop_event}
def worker():
try:
from core.agent import Agent
from core.tools import get_tool_registry
from core.llm import get_llm, LLMError
llm = get_llm()
if not llm.is_loaded:
steps.append({'type': 'status', 'content': 'Loading model...'})
try:
llm.load_model(verbose=False)
except LLMError as e:
steps.append({'type': 'error', 'content': f'Failed to load model: {e}'})
return
tools = get_tool_registry()
agent = Agent(llm=llm, tools=tools, max_steps=20, verbose=False)
# Inject system prompt — keep the THOUGHT/ACTION/PARAMS format from Agent,
# prepend with our behavioral rules
hal_prompt = _get_system_prompt()
agent.SYSTEM_PROMPT = hal_prompt + """
FORMAT — you MUST use this exact format:
THOUGHT: your reasoning
ACTION: tool_name
PARAMS: {{"param": "value"}}
When done: ACTION: task_complete PARAMS: {{"summary": "what was done"}}
When you need input: ACTION: ask_user PARAMS: {{"question": "your question"}}
{tools_description}
"""
def on_step(step):
# Check stop signal
if stop_event.is_set():
return
if step.thought:
steps.append({'type': 'thought', 'content': step.thought})
if step.tool_name and step.tool_name not in ('task_complete', 'ask_user'):
steps.append({'type': 'action', 'content': f"{step.tool_name}({json.dumps(step.tool_args or {})})"})
if step.tool_result:
result = step.tool_result
if len(result) > 800:
result = result[:800] + '...'
steps.append({'type': 'result', 'content': result})
result = agent.run(message, step_callback=on_step)
if result.success:
steps.append({'type': 'answer', 'content': result.summary})
else:
steps.append({'type': 'error', 'content': result.error or result.summary})
# Store agent conversation in HAL memory
try:
from core.hal_memory import get_hal_memory
mem = get_hal_memory()
for step in result.steps:
if step.thought:
mem.add('hal_thought', step.thought)
if step.tool_name:
mem.add('hal_action', f'{step.tool_name}({json.dumps(step.tool_args or {})})')
if step.tool_result:
mem.add('hal_result', step.tool_result[:2000])
mem.add('hal', result.summary if result.success else (result.error or result.summary))
mem.save()
except Exception:
pass
except Exception as e:
steps.append({'type': 'error', 'content': str(e)})
finally:
_agent_runs[run_id]['done'] = True
threading.Thread(target=worker, daemon=True).start()
# Stream the agent steps as SSE
def generate():
run = _agent_runs.get(run_id)
if not run:
yield f"data: {json.dumps({'error': 'Run not found'})}\n\n"
return
sent = 0
while True:
current_steps = run['steps']
while sent < len(current_steps):
yield f"data: {json.dumps(current_steps[sent])}\n\n"
sent += 1
if run['done']:
yield f"data: {json.dumps({'done': True})}\n\n"
return
time.sleep(0.15)
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@chat_bp.route('/chat/reset', methods=['POST'])
@login_required
def chat_reset():
"""Clear LLM conversation history."""
try:
from core.llm import get_llm
llm = get_llm()
if hasattr(llm, 'clear_history'):
llm.clear_history()
elif hasattr(llm, 'reset'):
llm.reset()
elif hasattr(llm, 'conversation_history'):
llm.conversation_history = []
except Exception:
pass
return jsonify({'ok': True})
@chat_bp.route('/hal/analyze', methods=['POST'])
@login_required
def hal_analyze():
"""Send tool output to HAL for AI analysis.
Expects JSON: {tool_name, output, context?, category?}
Returns JSON: {available, analysis, risk_level, has_fixes, tool_name}
"""
data = request.get_json(silent=True) or {}
tool_name = data.get('tool_name', 'unknown')
output = data.get('output', '')
context = data.get('context', '')
category = data.get('category', 'default')
if not output:
return jsonify({'available': False, 'analysis': 'No output provided', 'tool_name': tool_name})
from core.hal_analyst import analyze_output
result = analyze_output(tool_name, output, context=context, category=category)
return jsonify(result)
@chat_bp.route('/hal/fix', methods=['POST'])
@login_required
def hal_fix():
"""Execute a fix command suggested by HAL.
Expects JSON: {command: str}
Returns JSON: {ok, output, exit_code}
"""
from core.daemon import root_exec
import shlex
import subprocess as _subprocess
data = request.get_json(silent=True) or {}
command = data.get('command', '').strip()
if not command:
return jsonify({'ok': False, 'error': 'No command provided'})
# Safety: block obviously dangerous commands
dangerous = ['rm -rf /', 'mkfs', 'dd if=', ':(){', 'format c:']
for d in dangerous:
if d in command.lower():
return jsonify({'ok': False, 'error': f'Blocked dangerous command: {d}'})
# Clean the command: strip sudo, shell redirections
import re
command = re.sub(r'\s*2>/dev/null\s*', ' ', command)
command = re.sub(r'\s*>/dev/null\s*', ' ', command)
command = re.sub(r'\s*2>&1\s*', ' ', command)
command = command.strip()
if command.startswith('sudo '):
command = command[5:].strip()
# Commands that should run as the normal user, not root
USER_COMMANDS = {'adb', 'fastboot'}
def _is_user_cmd(cmd_str):
"""Check if a command should run as normal user."""
base = cmd_str.split()[0] if cmd_str.split() else ''
return os.path.basename(base) in USER_COMMANDS
def _run_user(cmd_parts, timeout=60):
"""Run a command as the normal user via subprocess."""
try:
result = _subprocess.run(
cmd_parts, capture_output=True, text=True, timeout=timeout
)
return {
'ok': result.returncode == 0,
'stdout': result.stdout,
'stderr': result.stderr,
'code': result.returncode,
}
except _subprocess.TimeoutExpired:
return {'ok': False, 'stdout': '', 'stderr': f'Timeout after {timeout}s', 'code': -2}
except FileNotFoundError:
return {'ok': False, 'stdout': '', 'stderr': f'Command not found: {cmd_parts[0]}', 'code': -3}
except Exception as e:
return {'ok': False, 'stdout': '', 'stderr': str(e), 'code': -4}
def _exec(cmd_parts, timeout=60):
"""Route to user or root execution based on command."""
if cmd_parts and os.path.basename(cmd_parts[0]) in USER_COMMANDS:
return _run_user(cmd_parts, timeout=timeout)
return root_exec(cmd_parts, timeout=timeout)
# Handle pipes (cmd1 | cmd2) — run as shell command through bash
if '|' in command:
if _is_user_cmd(command):
r = _run_user(['bash', '-c', command], timeout=60)
else:
r = root_exec(['bash', '-c', command], timeout=60)
return jsonify({'ok': r['ok'], 'output': r['stdout'] + r['stderr'], 'exit_code': r['code']})
# Handle chained commands (&&) by running them sequentially
if '&&' in command:
parts = [c.strip() for c in command.split('&&') if c.strip()]
all_output = ''
for part in parts:
if part.startswith('sudo '):
part = part[5:].strip()
part = re.sub(r'\s*2>/dev/null\s*', ' ', part).strip()
part = re.sub(r'\s*>/dev/null\s*', ' ', part).strip()
try:
cmd_parts = shlex.split(part)
except ValueError:
cmd_parts = part.split()
r = _exec(cmd_parts, timeout=60)
all_output += r['stdout'] + r['stderr']
if not r['ok']:
return jsonify({'ok': False, 'output': all_output, 'exit_code': r['code']})
return jsonify({'ok': True, 'output': all_output, 'exit_code': 0})
# Single command
try:
cmd_parts = shlex.split(command)
except ValueError:
cmd_parts = command.split()
r = _exec(cmd_parts, timeout=60)
return jsonify({
'ok': r['ok'],
'output': r['stdout'] + r['stderr'],
'exit_code': r['code'],
})
@chat_bp.route('/hal/available')
@login_required
def hal_available():
"""Quick check if HAL analysis is available (LLM loaded)."""
from core.hal_analyst import is_llm_available
return jsonify({'available': is_llm_available()})
@chat_bp.route('/chat/status')
@login_required
def chat_status():
"""Get LLM model status."""
try:
from core.llm import get_llm
llm = get_llm()
return jsonify({
'loaded': llm.is_loaded,
'model': llm.model_name if llm.is_loaded else None,
})
except Exception as e:
return jsonify({'loaded': False, 'error': str(e)})
@chat_bp.route('/agent/run', methods=['POST'])
@login_required
def agent_run():
"""Start an autonomous agent run in a background thread. Returns run_id."""
data = request.get_json(silent=True) or {}
task = data.get('task', '').strip()
if not task:
return jsonify({'error': 'No task provided'})
run_id = str(uuid.uuid4())
stop_event = threading.Event()
steps = []
_agent_runs[run_id] = {'steps': steps, 'done': False, 'stop': stop_event}
def worker():
try:
from core.agent import Agent
from core.tools import get_tool_registry
from core.llm import get_llm, LLMError
llm = get_llm()
if not llm.is_loaded:
try:
llm.load_model(verbose=False)
except LLMError as e:
steps.append({'type': 'error', 'content': f'Failed to load model: {e}'})
return
tools = get_tool_registry()
agent = Agent(llm=llm, tools=tools, verbose=False)
# Inject system prompt
system_prompt = _get_system_prompt()
agent.SYSTEM_PROMPT = system_prompt + "\n\n{tools_description}"
def on_step(step):
steps.append({'type': 'thought', 'content': step.thought})
if step.tool_name and step.tool_name not in ('task_complete', 'ask_user'):
steps.append({'type': 'action', 'content': f"{step.tool_name}({json.dumps(step.tool_args or {})})"})
if step.tool_result:
steps.append({'type': 'result', 'content': step.tool_result[:800]})
agent.run(task, step_callback=on_step)
except Exception as e:
steps.append({'type': 'error', 'content': str(e)})
finally:
_agent_runs[run_id]['done'] = True
threading.Thread(target=worker, daemon=True).start()
return jsonify({'run_id': run_id})
@chat_bp.route('/agent/stream/<run_id>')
@login_required
def agent_stream(run_id):
"""SSE stream of agent steps for a given run_id."""
def generate():
run = _agent_runs.get(run_id)
if not run:
yield f"data: {json.dumps({'error': 'Run not found'})}\n\n"
return
sent = 0
while True:
current_steps = run['steps']
while sent < len(current_steps):
yield f"data: {json.dumps(current_steps[sent])}\n\n"
sent += 1
if run['done']:
yield f"data: {json.dumps({'done': True})}\n\n"
return
time.sleep(0.15)
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@chat_bp.route('/agent/stop/<run_id>', methods=['POST'])
@login_required
def agent_stop(run_id):
"""Signal a running agent to stop."""
run = _agent_runs.get(run_id)
if run:
run['stop'].set()
run['done'] = True
return jsonify({'stopped': bool(run)})