Autarch/core/pentest_pipeline.py

704 lines
24 KiB
Python
Raw Permalink Normal View History

"""
AUTARCH Pentest Pipeline
Three-module architecture (Parsing -> Reasoning -> Generation)
based on PentestGPT's USENIX paper methodology.
Uses AUTARCH's local LLM via llama-cpp-python.
"""
import re
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
from .pentest_tree import PentestTree, PTTNode, PTTNodeType, NodeStatus
from .config import get_config
# ─── Source type detection patterns ──────────────────────────────────
SOURCE_PATTERNS = {
'nmap': re.compile(r'Nmap scan report|PORT\s+STATE\s+SERVICE|nmap', re.IGNORECASE),
'msf_scan': re.compile(r'auxiliary/scanner|msf\d?\s*>.*auxiliary|^\[\*\]\s.*scanning', re.IGNORECASE | re.MULTILINE),
'msf_exploit': re.compile(r'exploit/|meterpreter|session\s+\d+\s+opened|^\[\*\]\s.*exploit', re.IGNORECASE | re.MULTILINE),
'msf_post': re.compile(r'post/|meterpreter\s*>', re.IGNORECASE),
'web': re.compile(r'HTTP/\d|<!DOCTYPE|<html|Content-Type:', re.IGNORECASE),
'shell': re.compile(r'^\$\s|^root@|^#\s|bash|zsh', re.IGNORECASE | re.MULTILINE),
'gobuster': re.compile(r'Gobuster|gobuster|Dir found|/\w+\s+\(Status:\s*\d+\)', re.IGNORECASE),
'nikto': re.compile(r'Nikto|nikto|^\+\s', re.IGNORECASE | re.MULTILINE),
}
def detect_source_type(output: str) -> str:
"""Auto-detect tool output type from content patterns."""
for source, pattern in SOURCE_PATTERNS.items():
if pattern.search(output[:2000]):
return source
return 'manual'
# ─── Prompt Templates ────────────────────────────────────────────────
PARSING_SYSTEM_PROMPT = """You are a penetration testing output parser. Extract key findings from raw tool output.
Given raw output from a security tool, extract and summarize:
1. Open ports and services (with versions when available)
2. Vulnerabilities or misconfigurations found
3. Credentials or sensitive information discovered
4. Operating system and software versions
5. Any error messages or access denials
Rules:
- Be concise. Use bullet points.
- Include specific version numbers, port numbers, and IP addresses.
- Prefix exploitable findings with [VULN]
- Prefix credentials with [CRED]
- Note failed attempts and why they failed.
- Do not speculate beyond what the output shows.
Format your response as:
SUMMARY: one line description
FINDINGS:
- finding 1
- finding 2
- [VULN] vulnerability finding
STATUS: success/partial/failed"""
REASONING_SYSTEM_PROMPT = """You are a penetration testing strategist. You maintain a task tree and decide next steps.
You will receive:
1. The current task tree showing completed and todo tasks
2. New findings from the latest tool execution
Your job:
1. UPDATE the tree based on new findings
2. DECIDE the single most important next task
Rules:
- Prioritize exploitation paths with highest success likelihood.
- If a service version is known, suggest checking for known CVEs.
- After recon, focus on the most promising attack surface.
- Do not add redundant tasks.
- Mark tasks not-applicable if findings make them irrelevant.
Respond in this exact format:
TREE_UPDATES:
- ADD: parent_id | node_type | priority | task description
- COMPLETE: node_id | findings summary
- NOT_APPLICABLE: node_id | reason
NEXT_TASK: description of the single most important next action
REASONING: 1-2 sentences explaining why this is the highest priority"""
GENERATION_SYSTEM_PROMPT = """You are a penetration testing command generator. Convert task descriptions into specific executable commands.
Available tools:
- shell: Run shell command. Args: {"command": "...", "timeout": 30}
- msf_search: Search MSF modules. Args: {"query": "search term"}
- msf_module_info: Module details. Args: {"module_type": "auxiliary|exploit|post", "module_name": "path"}
- msf_execute: Run MSF module. Args: {"module_type": "...", "module_name": "...", "options": "{\\"RHOSTS\\": \\"...\\"}" }
- msf_sessions: List sessions. Args: {}
- msf_session_command: Command in session. Args: {"session_id": "...", "command": "..."}
- msf_console: MSF console command. Args: {"command": "..."}
Rules:
- Provide the EXACT tool name and JSON arguments.
- Describe what to look for in the output.
- If multiple steps needed, number them.
- Always include RHOSTS/target in module options.
- Prefer auxiliary scanners before exploits.
Format:
COMMANDS:
1. TOOL: tool_name | ARGS: {"key": "value"} | EXPECT: what to look for
2. TOOL: tool_name | ARGS: {"key": "value"} | EXPECT: what to look for
FALLBACK: alternative approach if primary fails"""
INITIAL_PLAN_PROMPT = """You are a penetration testing strategist planning an engagement.
Target: {target}
Create an initial reconnaissance plan. List the first 3-5 specific tasks to perform, ordered by priority.
Format:
TASKS:
1. node_type | priority | task description
2. node_type | priority | task description
3. node_type | priority | task description
FIRST_ACTION: description of the very first thing to do
REASONING: why start here"""
DISCUSS_SYSTEM_PROMPT = """You are a penetration testing expert assistant. Answer the user's question about their current engagement.
Current target: {target}
Current status:
{tree_summary}
Answer concisely and provide actionable advice."""
# ─── Pipeline Modules ────────────────────────────────────────────────
class ParsingModule:
"""Normalizes raw tool output into structured summaries."""
def __init__(self, llm):
self.llm = llm
self.config = get_config()
def parse(self, raw_output: str, source_type: str = "auto",
context: str = "") -> dict:
"""Parse raw tool output into normalized summary.
Returns dict with 'summary', 'findings', 'status', 'raw_source'.
"""
if source_type == "auto":
source_type = detect_source_type(raw_output)
chunk_size = 2000
try:
chunk_size = self.config.get_int('pentest', 'output_chunk_size', 2000)
except Exception:
pass
chunks = self._chunk_output(raw_output, chunk_size)
all_findings = []
all_summaries = []
status = "unknown"
for i, chunk in enumerate(chunks):
prefix = f"[{source_type} output"
if len(chunks) > 1:
prefix += f" part {i+1}/{len(chunks)}"
prefix += "]"
message = f"{prefix}\n{chunk}"
if context:
message = f"Context: {context}\n\n{message}"
self.llm.clear_history()
try:
response = self.llm.chat(
message,
system_prompt=PARSING_SYSTEM_PROMPT,
temperature=0.2,
max_tokens=512,
)
except Exception as e:
return {
'summary': f"Parse error: {e}",
'findings': [],
'status': 'failed',
'raw_source': source_type,
}
summary, findings, chunk_status = self._parse_response(response)
all_summaries.append(summary)
all_findings.extend(findings)
if chunk_status != "unknown":
status = chunk_status
return {
'summary': " | ".join(all_summaries) if all_summaries else "No summary",
'findings': all_findings,
'status': status,
'raw_source': source_type,
}
def _chunk_output(self, output: str, max_chunk: int = 2000) -> List[str]:
"""Split large output into chunks."""
if len(output) <= max_chunk:
return [output]
chunks = []
lines = output.split('\n')
current = []
current_len = 0
for line in lines:
if current_len + len(line) + 1 > max_chunk and current:
chunks.append('\n'.join(current))
current = []
current_len = 0
current.append(line)
current_len += len(line) + 1
if current:
chunks.append('\n'.join(current))
return chunks
def _parse_response(self, response: str) -> Tuple[str, List[str], str]:
"""Extract summary, findings, and status from LLM response."""
summary = ""
findings = []
status = "unknown"
# Extract SUMMARY
m = re.search(r'SUMMARY:\s*(.+)', response, re.IGNORECASE)
if m:
summary = m.group(1).strip()
# Extract FINDINGS
findings_section = re.search(
r'FINDINGS:\s*\n((?:[-*]\s*.+\n?)+)',
response, re.IGNORECASE
)
if findings_section:
for line in findings_section.group(1).strip().split('\n'):
line = re.sub(r'^[-*]\s*', '', line).strip()
if line:
findings.append(line)
# Extract STATUS
m = re.search(r'STATUS:\s*(\w+)', response, re.IGNORECASE)
if m:
status = m.group(1).strip().lower()
# Fallback: if structured parse failed, use full response
if not summary and not findings:
summary = response[:200].strip()
for line in response.split('\n'):
line = line.strip()
if line.startswith(('-', '*', '[VULN]', '[CRED]')):
findings.append(re.sub(r'^[-*]\s*', '', line))
return summary, findings, status
class ReasoningModule:
"""Maintains PTT and decides next actions."""
def __init__(self, llm, tree: PentestTree):
self.llm = llm
self.tree = tree
def reason(self, parsed_output: dict, context: str = "") -> dict:
"""Three-step reasoning: update tree, validate, extract next todo.
Returns dict with 'tree_updates', 'next_task', 'reasoning'.
"""
tree_summary = self.tree.render_summary()
findings_text = parsed_output.get('summary', '')
if parsed_output.get('findings'):
findings_text += "\nFindings:\n"
for f in parsed_output['findings']:
findings_text += f"- {f}\n"
message = (
f"Current pentest tree:\n{tree_summary}\n\n"
f"New information ({parsed_output.get('raw_source', 'unknown')}):\n"
f"{findings_text}"
)
if context:
message += f"\n\nAdditional context: {context}"
self.llm.clear_history()
try:
response = self.llm.chat(
message,
system_prompt=REASONING_SYSTEM_PROMPT,
temperature=0.3,
max_tokens=1024,
)
except Exception as e:
return {
'tree_updates': [],
'next_task': f"Error during reasoning: {e}",
'reasoning': str(e),
}
updates = self._parse_tree_updates(response)
self._apply_updates(updates)
next_task = ""
m = re.search(r'NEXT_TASK:\s*(.+)', response, re.IGNORECASE)
if m:
next_task = m.group(1).strip()
reasoning = ""
m = re.search(r'REASONING:\s*(.+)', response, re.IGNORECASE | re.DOTALL)
if m:
reasoning = m.group(1).strip().split('\n')[0]
# Fallback: if no NEXT_TASK parsed, get from tree
if not next_task:
todo = self.tree.get_next_todo()
if todo:
next_task = todo.label
return {
'tree_updates': updates,
'next_task': next_task,
'reasoning': reasoning,
}
def _parse_tree_updates(self, response: str) -> List[dict]:
"""Extract tree operations from LLM response."""
updates = []
# Parse ADD operations
for m in re.finditer(
r'ADD:\s*(\S+)\s*\|\s*(\w+)\s*\|\s*(\d)\s*\|\s*(.+)',
response, re.IGNORECASE
):
parent = m.group(1).strip()
if parent.lower() in ('root', 'none', '-'):
parent = None
ntype_str = m.group(2).strip().lower()
ntype = self._map_node_type(ntype_str)
updates.append({
'operation': 'add',
'parent_id': parent,
'node_type': ntype,
'priority': int(m.group(3)),
'label': m.group(4).strip(),
})
# Parse COMPLETE operations
for m in re.finditer(
r'COMPLETE:\s*(\S+)\s*\|\s*(.+)',
response, re.IGNORECASE
):
updates.append({
'operation': 'complete',
'node_id': m.group(1).strip(),
'findings': m.group(2).strip(),
})
# Parse NOT_APPLICABLE operations
for m in re.finditer(
r'NOT_APPLICABLE:\s*(\S+)\s*\|\s*(.+)',
response, re.IGNORECASE
):
updates.append({
'operation': 'not_applicable',
'node_id': m.group(1).strip(),
'reason': m.group(2).strip(),
})
return updates
def _map_node_type(self, type_str: str) -> PTTNodeType:
"""Map a string to PTTNodeType."""
mapping = {
'recon': PTTNodeType.RECONNAISSANCE,
'reconnaissance': PTTNodeType.RECONNAISSANCE,
'initial_access': PTTNodeType.INITIAL_ACCESS,
'initial': PTTNodeType.INITIAL_ACCESS,
'access': PTTNodeType.INITIAL_ACCESS,
'privesc': PTTNodeType.PRIVILEGE_ESCALATION,
'privilege_escalation': PTTNodeType.PRIVILEGE_ESCALATION,
'escalation': PTTNodeType.PRIVILEGE_ESCALATION,
'lateral': PTTNodeType.LATERAL_MOVEMENT,
'lateral_movement': PTTNodeType.LATERAL_MOVEMENT,
'persistence': PTTNodeType.PERSISTENCE,
'credential': PTTNodeType.CREDENTIAL_ACCESS,
'credential_access': PTTNodeType.CREDENTIAL_ACCESS,
'creds': PTTNodeType.CREDENTIAL_ACCESS,
'exfiltration': PTTNodeType.EXFILTRATION,
'exfil': PTTNodeType.EXFILTRATION,
}
return mapping.get(type_str.lower(), PTTNodeType.CUSTOM)
def _apply_updates(self, updates: List[dict]):
"""Apply parsed operations to the tree."""
for update in updates:
op = update['operation']
if op == 'add':
# Resolve parent - could be an ID or a label
parent_id = update.get('parent_id')
if parent_id and parent_id not in self.tree.nodes:
# Try to find by label match
node = self.tree.find_node_by_label(parent_id)
parent_id = node.id if node else None
self.tree.add_node(
label=update['label'],
node_type=update['node_type'],
parent_id=parent_id,
priority=update.get('priority', 3),
)
elif op == 'complete':
node_id = update['node_id']
if node_id not in self.tree.nodes:
node = self.tree.find_node_by_label(node_id)
if node:
node_id = node.id
else:
continue
self.tree.update_node(
node_id,
status=NodeStatus.COMPLETED,
findings=[update.get('findings', '')],
)
elif op == 'not_applicable':
node_id = update['node_id']
if node_id not in self.tree.nodes:
node = self.tree.find_node_by_label(node_id)
if node:
node_id = node.id
else:
continue
self.tree.update_node(
node_id,
status=NodeStatus.NOT_APPLICABLE,
details=update.get('reason', ''),
)
class GenerationModule:
"""Converts abstract tasks into concrete commands."""
def __init__(self, llm):
self.llm = llm
def generate(self, task_description: str, target: str,
context: str = "") -> dict:
"""Generate executable commands for a task.
Returns dict with 'commands' (list) and 'fallback' (str).
"""
message = f"Target: {target}\nTask: {task_description}"
if context:
message += f"\n\nContext: {context}"
self.llm.clear_history()
try:
response = self.llm.chat(
message,
system_prompt=GENERATION_SYSTEM_PROMPT,
temperature=0.2,
max_tokens=512,
)
except Exception as e:
return {
'commands': [],
'fallback': f"Generation error: {e}",
'raw_response': str(e),
}
commands = self._parse_commands(response)
fallback = ""
m = re.search(r'FALLBACK:\s*(.+)', response, re.IGNORECASE | re.DOTALL)
if m:
fallback = m.group(1).strip().split('\n')[0]
return {
'commands': commands,
'fallback': fallback,
'raw_response': response,
}
def _parse_commands(self, response: str) -> List[dict]:
"""Extract commands from LLM response."""
commands = []
# Parse structured TOOL: ... | ARGS: ... | EXPECT: ... format
for m in re.finditer(
r'TOOL:\s*(\w+)\s*\|\s*ARGS:\s*(\{[^}]+\})\s*\|\s*EXPECT:\s*(.+)',
response, re.IGNORECASE
):
tool_name = m.group(1).strip()
args_str = m.group(2).strip()
expect = m.group(3).strip()
# Try to parse JSON args
import json
try:
args = json.loads(args_str)
except json.JSONDecodeError:
# Try fixing common LLM JSON issues
fixed = args_str.replace("'", '"')
try:
args = json.loads(fixed)
except json.JSONDecodeError:
args = {'raw': args_str}
commands.append({
'tool': tool_name,
'args': args,
'expect': expect,
})
# Fallback: try to find shell commands or MSF commands
if not commands:
for line in response.split('\n'):
line = line.strip()
# Detect nmap/shell commands
if re.match(r'^(nmap|nikto|gobuster|curl|wget|nc|netcat)\s', line):
commands.append({
'tool': 'shell',
'args': {'command': line},
'expect': 'Check output for results',
})
# Detect MSF use/run commands
elif re.match(r'^(use |run |set )', line, re.IGNORECASE):
commands.append({
'tool': 'msf_console',
'args': {'command': line},
'expect': 'Check output for results',
})
return commands
# ─── Pipeline Orchestrator ────────────────────────────────────────────
class PentestPipeline:
"""Orchestrates the three-module pipeline."""
def __init__(self, llm, target: str, tree: PentestTree = None):
self.llm = llm
self.target = target
self.tree = tree or PentestTree(target)
self.parser = ParsingModule(llm)
self.reasoner = ReasoningModule(llm, self.tree)
self.generator = GenerationModule(llm)
self.history: List[dict] = []
def process_output(self, raw_output: str,
source_type: str = "auto") -> dict:
"""Full pipeline: parse -> reason -> generate.
Returns dict with 'parsed', 'reasoning', 'commands', 'next_task'.
"""
# Step 1: Parse
parsed = self.parser.parse(raw_output, source_type)
# Step 2: Reason
reasoning = self.reasoner.reason(parsed)
# Step 3: Generate commands for the next task
generated = {'commands': [], 'fallback': ''}
if reasoning.get('next_task'):
# Build context from recent findings
context = parsed.get('summary', '')
generated = self.generator.generate(
reasoning['next_task'],
self.target,
context=context,
)
result = {
'parsed': parsed,
'reasoning': reasoning,
'commands': generated.get('commands', []),
'fallback': generated.get('fallback', ''),
'next_task': reasoning.get('next_task', ''),
}
self.history.append({
'timestamp': datetime.now().isoformat(),
'result': {
'parsed_summary': parsed.get('summary', ''),
'findings_count': len(parsed.get('findings', [])),
'next_task': reasoning.get('next_task', ''),
'commands_count': len(generated.get('commands', [])),
}
})
return result
def get_initial_plan(self) -> dict:
"""Generate initial pentest plan for the target."""
prompt = INITIAL_PLAN_PROMPT.format(target=self.target)
self.llm.clear_history()
try:
response = self.llm.chat(
prompt,
system_prompt=REASONING_SYSTEM_PROMPT,
temperature=0.3,
max_tokens=1024,
)
except Exception as e:
return {
'tasks': [],
'first_action': f"Error: {e}",
'reasoning': str(e),
}
# Parse TASKS
tasks = []
for m in re.finditer(
r'(\d+)\.\s*(\w+)\s*\|\s*(\d)\s*\|\s*(.+)',
response
):
ntype_str = m.group(2).strip()
ntype = self.reasoner._map_node_type(ntype_str)
tasks.append({
'node_type': ntype,
'priority': int(m.group(3)),
'label': m.group(4).strip(),
})
# Add tasks to tree under appropriate branches
for task in tasks:
# Find matching root branch
parent_id = None
for root_id in self.tree.root_nodes:
root = self.tree.get_node(root_id)
if root and root.node_type == task['node_type']:
parent_id = root_id
break
self.tree.add_node(
label=task['label'],
node_type=task['node_type'],
parent_id=parent_id,
priority=task['priority'],
)
# Parse first action
first_action = ""
m = re.search(r'FIRST_ACTION:\s*(.+)', response, re.IGNORECASE)
if m:
first_action = m.group(1).strip()
reasoning = ""
m = re.search(r'REASONING:\s*(.+)', response, re.IGNORECASE)
if m:
reasoning = m.group(1).strip()
# Generate commands for first action
commands = []
if first_action:
gen = self.generator.generate(first_action, self.target)
commands = gen.get('commands', [])
return {
'tasks': tasks,
'first_action': first_action,
'reasoning': reasoning,
'commands': commands,
}
def inject_information(self, info: str, source: str = "manual") -> dict:
"""Inject external information and get updated recommendations."""
parsed = {
'summary': info[:200],
'findings': [info],
'status': 'success',
'raw_source': source,
}
return self.process_output(info, source_type=source)
def discuss(self, question: str) -> str:
"""Ad-hoc question that doesn't affect the tree."""
tree_summary = self.tree.render_summary()
prompt = DISCUSS_SYSTEM_PROMPT.format(
target=self.target,
tree_summary=tree_summary,
)
self.llm.clear_history()
try:
return self.llm.chat(
question,
system_prompt=prompt,
temperature=0.5,
max_tokens=1024,
)
except Exception as e:
return f"Error: {e}"