- Add Remote Monitoring Station with PIAP device profile system - Add SSH/SSHD manager with fail2ban integration - Add privileged daemon architecture for safe root operations - Add encrypted vault, HAL memory, HAL auto-analyst - Add network security suite, module creator, codex training - Add start.sh launcher script and GTK3 desktop launcher - Remove Output/ build artifacts, installer files, loose docs - Update .gitignore for runtime data and build artifacts - Update README for v1.9 with new launch method, screenshots, and features Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
434 lines
15 KiB
Python
434 lines
15 KiB
Python
"""
|
|
AUTARCH Agent System
|
|
Autonomous agent that uses LLM to accomplish tasks with tools
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
from typing import Optional, List, Dict, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
|
|
from .llm import get_llm, LLM, LLMError
|
|
from .tools import get_tool_registry, ToolRegistry
|
|
from .banner import Colors
|
|
|
|
|
|
class AgentState(Enum):
|
|
"""Agent execution states."""
|
|
IDLE = "idle"
|
|
THINKING = "thinking"
|
|
EXECUTING = "executing"
|
|
WAITING_USER = "waiting_user"
|
|
COMPLETE = "complete"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class AgentStep:
|
|
"""Record of a single agent step."""
|
|
thought: str
|
|
tool_name: Optional[str] = None
|
|
tool_args: Optional[Dict[str, Any]] = None
|
|
tool_result: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class AgentResult:
|
|
"""Result of an agent task execution."""
|
|
success: bool
|
|
summary: str
|
|
steps: List[AgentStep] = field(default_factory=list)
|
|
error: Optional[str] = None
|
|
|
|
|
|
class Agent:
|
|
"""Autonomous agent that uses LLM and tools to accomplish tasks."""
|
|
|
|
SYSTEM_PROMPT = """You are Hal, the autonomous agent for AUTARCH (darkHal Security Group).
|
|
|
|
RULES:
|
|
- No markdown. No ASCII art. Plain text only.
|
|
- Detect the OS before running commands. Run "uname -s" first if unsure.
|
|
- Only run commands for the detected OS. Never list commands for multiple platforms.
|
|
- On Linux: detect the distro first (cat /etc/os-release). Use apt for Debian/Ubuntu, dnf for Fedora, etc.
|
|
- Never prefix commands with sudo. The system handles root access automatically via the daemon.
|
|
- Run ONE command at a time. Check the result before continuing.
|
|
- Keep responses short. No filler.
|
|
|
|
FORMAT — you MUST use this exact format for every response:
|
|
|
|
THOUGHT: your reasoning
|
|
ACTION: tool_name
|
|
PARAMS: {{"param": "value"}}
|
|
|
|
When done:
|
|
THOUGHT: summary
|
|
ACTION: task_complete
|
|
PARAMS: {{"summary": "what was done"}}
|
|
|
|
When you need input:
|
|
THOUGHT: why you need to ask
|
|
ACTION: ask_user
|
|
PARAMS: {{"question": "your question"}}
|
|
|
|
{tools_description}
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
llm: LLM = None,
|
|
tools: ToolRegistry = None,
|
|
max_steps: int = 20,
|
|
verbose: bool = True
|
|
):
|
|
"""Initialize the agent.
|
|
|
|
Args:
|
|
llm: LLM instance to use. Uses global if not provided.
|
|
tools: Tool registry to use. Uses global if not provided.
|
|
max_steps: Maximum steps before stopping.
|
|
verbose: Whether to print progress.
|
|
"""
|
|
self.llm = llm or get_llm()
|
|
self.tools = tools or get_tool_registry()
|
|
self.max_steps = max_steps
|
|
self.verbose = verbose
|
|
|
|
self.state = AgentState.IDLE
|
|
self.current_task: Optional[str] = None
|
|
self.steps: List[AgentStep] = []
|
|
self.conversation: List[Dict[str, str]] = []
|
|
|
|
# Callbacks
|
|
self.on_step: Optional[Callable[[AgentStep], None]] = None
|
|
self.on_state_change: Optional[Callable[[AgentState], None]] = None
|
|
|
|
def _set_state(self, state: AgentState):
|
|
"""Update agent state and notify callback."""
|
|
self.state = state
|
|
if self.on_state_change:
|
|
self.on_state_change(state)
|
|
|
|
def _log(self, message: str, level: str = "info"):
|
|
"""Log a message if verbose mode is on."""
|
|
if not self.verbose:
|
|
return
|
|
|
|
colors = {
|
|
"info": Colors.CYAN,
|
|
"success": Colors.GREEN,
|
|
"warning": Colors.YELLOW,
|
|
"error": Colors.RED,
|
|
"thought": Colors.MAGENTA,
|
|
"action": Colors.BLUE,
|
|
"result": Colors.WHITE,
|
|
}
|
|
symbols = {
|
|
"info": "*",
|
|
"success": "+",
|
|
"warning": "!",
|
|
"error": "X",
|
|
"thought": "?",
|
|
"action": ">",
|
|
"result": "<",
|
|
}
|
|
|
|
color = colors.get(level, Colors.WHITE)
|
|
symbol = symbols.get(level, "*")
|
|
print(f"{color}[{symbol}] {message}{Colors.RESET}")
|
|
|
|
def _build_system_prompt(self) -> str:
|
|
"""Build the system prompt with tools description."""
|
|
tools_desc = self.tools.get_tools_prompt()
|
|
return self.SYSTEM_PROMPT.format(tools_description=tools_desc)
|
|
|
|
def _parse_response(self, response: str) -> tuple[str, str, Dict[str, Any]]:
|
|
"""Parse LLM response into thought, action, and params.
|
|
|
|
Args:
|
|
response: The raw LLM response.
|
|
|
|
Returns:
|
|
Tuple of (thought, action_name, params_dict)
|
|
|
|
Raises:
|
|
ValueError: If response cannot be parsed.
|
|
"""
|
|
# Extract THOUGHT
|
|
thought_match = re.search(r'THOUGHT:\s*(.+?)(?=ACTION:|$)', response, re.DOTALL)
|
|
thought = thought_match.group(1).strip() if thought_match else ""
|
|
|
|
# Extract ACTION
|
|
action_match = re.search(r'ACTION:\s*(\w+)', response)
|
|
if not action_match:
|
|
raise ValueError("No ACTION found in response")
|
|
action = action_match.group(1).strip()
|
|
|
|
# Extract PARAMS
|
|
params_match = re.search(r'PARAMS:\s*(\{.*?\})', response, re.DOTALL)
|
|
if params_match:
|
|
try:
|
|
params = json.loads(params_match.group(1))
|
|
except json.JSONDecodeError:
|
|
# Try to fix common JSON issues
|
|
params_str = params_match.group(1)
|
|
# Replace single quotes with double quotes
|
|
params_str = params_str.replace("'", '"')
|
|
try:
|
|
params = json.loads(params_str)
|
|
except json.JSONDecodeError:
|
|
params = {}
|
|
else:
|
|
params = {}
|
|
|
|
return thought, action, params
|
|
|
|
def _execute_tool(self, tool_name: str, params: Dict[str, Any]) -> str:
|
|
"""Execute a tool and return the result.
|
|
|
|
Args:
|
|
tool_name: Name of the tool to execute.
|
|
params: Parameters for the tool.
|
|
|
|
Returns:
|
|
Tool result string.
|
|
"""
|
|
result = self.tools.execute(tool_name, **params)
|
|
|
|
if result["success"]:
|
|
return str(result["result"])
|
|
else:
|
|
return f"[Error]: {result['error']}"
|
|
|
|
def run(self, task: str, user_input_handler: Callable[[str], str] = None,
|
|
step_callback: Optional[Callable[['AgentStep'], None]] = None) -> AgentResult:
|
|
"""Run the agent on a task.
|
|
|
|
Args:
|
|
task: The task description.
|
|
user_input_handler: Callback for handling ask_user actions.
|
|
If None, uses default input().
|
|
step_callback: Optional per-step callback invoked after each step completes.
|
|
Overrides self.on_step for this run if provided.
|
|
|
|
Returns:
|
|
AgentResult with execution details.
|
|
"""
|
|
if step_callback is not None:
|
|
self.on_step = step_callback
|
|
self.current_task = task
|
|
self.steps = []
|
|
self.conversation = []
|
|
|
|
# Ensure model is loaded
|
|
if not self.llm.is_loaded:
|
|
self._log("Loading model...", "info")
|
|
try:
|
|
self.llm.load_model(verbose=self.verbose)
|
|
except LLMError as e:
|
|
self._set_state(AgentState.ERROR)
|
|
return AgentResult(
|
|
success=False,
|
|
summary="Failed to load model",
|
|
error=str(e)
|
|
)
|
|
|
|
self._set_state(AgentState.THINKING)
|
|
self._log(f"Starting task: {task}", "info")
|
|
|
|
# Build initial prompt
|
|
system_prompt = self._build_system_prompt()
|
|
self.conversation.append({"role": "system", "content": system_prompt})
|
|
self.conversation.append({"role": "user", "content": f"Task: {task}"})
|
|
|
|
step_count = 0
|
|
parse_failures = 0 # Track consecutive format failures
|
|
|
|
while step_count < self.max_steps:
|
|
step_count += 1
|
|
self._log(f"Step {step_count}/{self.max_steps}", "info")
|
|
|
|
# Generate response
|
|
self._set_state(AgentState.THINKING)
|
|
try:
|
|
prompt = self._build_prompt()
|
|
response = self.llm.generate(
|
|
prompt,
|
|
stop=["OBSERVATION:", "\nUser:", "\nTask:"],
|
|
temperature=0.3, # Lower temperature for more focused responses
|
|
)
|
|
except LLMError as e:
|
|
self._set_state(AgentState.ERROR)
|
|
return AgentResult(
|
|
success=False,
|
|
summary="LLM generation failed",
|
|
steps=self.steps,
|
|
error=str(e)
|
|
)
|
|
|
|
# Parse response
|
|
try:
|
|
thought, action, params = self._parse_response(response)
|
|
parse_failures = 0 # Reset on success
|
|
except ValueError as e:
|
|
parse_failures += 1
|
|
self._log(f"Failed to parse response: {e}", "error")
|
|
self._log(f"Raw response: {response[:200]}...", "warning")
|
|
|
|
# After 2 consecutive parse failures, the model can't follow
|
|
# the structured format — treat its response as a direct answer
|
|
if parse_failures >= 2:
|
|
# Clean up the raw response for display
|
|
answer = response.strip()
|
|
# Remove ChatML tokens if present
|
|
for tag in ['<|im_end|>', '<|im_start|>', '<|endoftext|>']:
|
|
answer = answer.split(tag)[0]
|
|
answer = answer.strip()
|
|
if not answer:
|
|
answer = "I could not process that request in agent mode. Try switching to Chat mode."
|
|
|
|
self._log("Model cannot follow structured format, returning direct answer", "warning")
|
|
step = AgentStep(thought="Direct response (model does not support agent format)", tool_name="task_complete", tool_args={"summary": answer})
|
|
step.tool_result = answer
|
|
self.steps.append(step)
|
|
if self.on_step:
|
|
self.on_step(step)
|
|
self._set_state(AgentState.COMPLETE)
|
|
return AgentResult(success=True, summary=answer, steps=self.steps)
|
|
|
|
# First failure — give one retry with format correction
|
|
self.conversation.append({
|
|
"role": "assistant",
|
|
"content": response
|
|
})
|
|
self.conversation.append({
|
|
"role": "user",
|
|
"content": "Error: Could not parse your response. Please use the exact format:\nTHOUGHT: [reasoning]\nACTION: [tool_name]\nPARAMS: {\"param\": \"value\"}"
|
|
})
|
|
continue
|
|
|
|
self._log(f"Thought: {thought[:100]}..." if len(thought) > 100 else f"Thought: {thought}", "thought")
|
|
self._log(f"Action: {action}", "action")
|
|
|
|
step = AgentStep(thought=thought, tool_name=action, tool_args=params)
|
|
|
|
# Handle task_complete
|
|
if action == "task_complete":
|
|
summary = params.get("summary", thought)
|
|
step.tool_result = summary
|
|
self.steps.append(step)
|
|
|
|
if self.on_step:
|
|
self.on_step(step)
|
|
|
|
self._set_state(AgentState.COMPLETE)
|
|
self._log(f"Task complete: {summary}", "success")
|
|
|
|
return AgentResult(
|
|
success=True,
|
|
summary=summary,
|
|
steps=self.steps
|
|
)
|
|
|
|
# Handle ask_user
|
|
if action == "ask_user":
|
|
question = params.get("question", "What should I do?")
|
|
self._set_state(AgentState.WAITING_USER)
|
|
self._log(f"Agent asks: {question}", "info")
|
|
|
|
if user_input_handler:
|
|
user_response = user_input_handler(question)
|
|
else:
|
|
print(f"\n{Colors.YELLOW}Agent question: {question}{Colors.RESET}")
|
|
user_response = input(f"{Colors.GREEN}Your answer: {Colors.RESET}").strip()
|
|
|
|
step.tool_result = f"User response: {user_response}"
|
|
self.steps.append(step)
|
|
|
|
if self.on_step:
|
|
self.on_step(step)
|
|
|
|
# Add to conversation
|
|
self.conversation.append({
|
|
"role": "assistant",
|
|
"content": f"THOUGHT: {thought}\nACTION: {action}\nPARAMS: {json.dumps(params)}"
|
|
})
|
|
self.conversation.append({
|
|
"role": "user",
|
|
"content": f"OBSERVATION: User responded: {user_response}"
|
|
})
|
|
continue
|
|
|
|
# Execute tool
|
|
self._set_state(AgentState.EXECUTING)
|
|
self._log(f"Executing: {action}({params})", "action")
|
|
|
|
result = self._execute_tool(action, params)
|
|
step.tool_result = result
|
|
self.steps.append(step)
|
|
|
|
if self.on_step:
|
|
self.on_step(step)
|
|
|
|
# Truncate long results for display
|
|
display_result = result[:200] + "..." if len(result) > 200 else result
|
|
self._log(f"Result: {display_result}", "result")
|
|
|
|
# Add to conversation
|
|
self.conversation.append({
|
|
"role": "assistant",
|
|
"content": f"THOUGHT: {thought}\nACTION: {action}\nPARAMS: {json.dumps(params)}"
|
|
})
|
|
self.conversation.append({
|
|
"role": "user",
|
|
"content": f"OBSERVATION: {result}"
|
|
})
|
|
|
|
# Max steps reached
|
|
self._set_state(AgentState.ERROR)
|
|
self._log(f"Max steps ({self.max_steps}) reached", "warning")
|
|
|
|
return AgentResult(
|
|
success=False,
|
|
summary="Max steps reached without completing task",
|
|
steps=self.steps,
|
|
error=f"Exceeded maximum of {self.max_steps} steps"
|
|
)
|
|
|
|
def _build_prompt(self) -> str:
|
|
"""Build the full prompt from conversation history."""
|
|
parts = []
|
|
for msg in self.conversation:
|
|
role = msg["role"]
|
|
content = msg["content"]
|
|
|
|
if role == "system":
|
|
parts.append(f"<|im_start|>system\n{content}<|im_end|>")
|
|
elif role == "user":
|
|
parts.append(f"<|im_start|>user\n{content}<|im_end|>")
|
|
elif role == "assistant":
|
|
parts.append(f"<|im_start|>assistant\n{content}<|im_end|>")
|
|
|
|
parts.append("<|im_start|>assistant\n")
|
|
return "\n".join(parts)
|
|
|
|
def get_steps_summary(self) -> str:
|
|
"""Get a formatted summary of all steps taken."""
|
|
if not self.steps:
|
|
return "No steps executed"
|
|
|
|
lines = []
|
|
for i, step in enumerate(self.steps, 1):
|
|
lines.append(f"Step {i}:")
|
|
lines.append(f" Thought: {step.thought[:80]}...")
|
|
if step.tool_name:
|
|
lines.append(f" Action: {step.tool_name}")
|
|
if step.tool_result:
|
|
result_preview = step.tool_result[:80] + "..." if len(step.tool_result) > 80 else step.tool_result
|
|
lines.append(f" Result: {result_preview}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|