""" AUTARCH Pentest Pipeline Three-module architecture (Parsing -> Reasoning -> Generation) based on PentestGPT's USENIX paper methodology. Uses AUTARCH's local LLM via llama-cpp-python. """ import re from typing import Optional, List, Dict, Any, Tuple from datetime import datetime from .pentest_tree import PentestTree, PTTNode, PTTNodeType, NodeStatus from .config import get_config # ─── Source type detection patterns ────────────────────────────────── SOURCE_PATTERNS = { 'nmap': re.compile(r'Nmap scan report|PORT\s+STATE\s+SERVICE|nmap', re.IGNORECASE), 'msf_scan': re.compile(r'auxiliary/scanner|msf\d?\s*>.*auxiliary|^\[\*\]\s.*scanning', re.IGNORECASE | re.MULTILINE), 'msf_exploit': re.compile(r'exploit/|meterpreter|session\s+\d+\s+opened|^\[\*\]\s.*exploit', re.IGNORECASE | re.MULTILINE), 'msf_post': re.compile(r'post/|meterpreter\s*>', re.IGNORECASE), 'web': re.compile(r'HTTP/\d| str: """Auto-detect tool output type from content patterns.""" for source, pattern in SOURCE_PATTERNS.items(): if pattern.search(output[:2000]): return source return 'manual' # ─── Prompt Templates ──────────────────────────────────────────────── PARSING_SYSTEM_PROMPT = """You are a penetration testing output parser. Extract key findings from raw tool output. Given raw output from a security tool, extract and summarize: 1. Open ports and services (with versions when available) 2. Vulnerabilities or misconfigurations found 3. Credentials or sensitive information discovered 4. Operating system and software versions 5. Any error messages or access denials Rules: - Be concise. Use bullet points. - Include specific version numbers, port numbers, and IP addresses. - Prefix exploitable findings with [VULN] - Prefix credentials with [CRED] - Note failed attempts and why they failed. - Do not speculate beyond what the output shows. Format your response as: SUMMARY: one line description FINDINGS: - finding 1 - finding 2 - [VULN] vulnerability finding STATUS: success/partial/failed""" REASONING_SYSTEM_PROMPT = """You are a penetration testing strategist. You maintain a task tree and decide next steps. You will receive: 1. The current task tree showing completed and todo tasks 2. New findings from the latest tool execution Your job: 1. UPDATE the tree based on new findings 2. DECIDE the single most important next task Rules: - Prioritize exploitation paths with highest success likelihood. - If a service version is known, suggest checking for known CVEs. - After recon, focus on the most promising attack surface. - Do not add redundant tasks. - Mark tasks not-applicable if findings make them irrelevant. Respond in this exact format: TREE_UPDATES: - ADD: parent_id | node_type | priority | task description - COMPLETE: node_id | findings summary - NOT_APPLICABLE: node_id | reason NEXT_TASK: description of the single most important next action REASONING: 1-2 sentences explaining why this is the highest priority""" GENERATION_SYSTEM_PROMPT = """You are a penetration testing command generator. Convert task descriptions into specific executable commands. Available tools: - shell: Run shell command. Args: {"command": "...", "timeout": 30} - msf_search: Search MSF modules. Args: {"query": "search term"} - msf_module_info: Module details. Args: {"module_type": "auxiliary|exploit|post", "module_name": "path"} - msf_execute: Run MSF module. Args: {"module_type": "...", "module_name": "...", "options": "{\\"RHOSTS\\": \\"...\\"}" } - msf_sessions: List sessions. Args: {} - msf_session_command: Command in session. Args: {"session_id": "...", "command": "..."} - msf_console: MSF console command. Args: {"command": "..."} Rules: - Provide the EXACT tool name and JSON arguments. - Describe what to look for in the output. - If multiple steps needed, number them. - Always include RHOSTS/target in module options. - Prefer auxiliary scanners before exploits. Format: COMMANDS: 1. TOOL: tool_name | ARGS: {"key": "value"} | EXPECT: what to look for 2. TOOL: tool_name | ARGS: {"key": "value"} | EXPECT: what to look for FALLBACK: alternative approach if primary fails""" INITIAL_PLAN_PROMPT = """You are a penetration testing strategist planning an engagement. Target: {target} Create an initial reconnaissance plan. List the first 3-5 specific tasks to perform, ordered by priority. Format: TASKS: 1. node_type | priority | task description 2. node_type | priority | task description 3. node_type | priority | task description FIRST_ACTION: description of the very first thing to do REASONING: why start here""" DISCUSS_SYSTEM_PROMPT = """You are a penetration testing expert assistant. Answer the user's question about their current engagement. Current target: {target} Current status: {tree_summary} Answer concisely and provide actionable advice.""" # ─── Pipeline Modules ──────────────────────────────────────────────── class ParsingModule: """Normalizes raw tool output into structured summaries.""" def __init__(self, llm): self.llm = llm self.config = get_config() def parse(self, raw_output: str, source_type: str = "auto", context: str = "") -> dict: """Parse raw tool output into normalized summary. Returns dict with 'summary', 'findings', 'status', 'raw_source'. """ if source_type == "auto": source_type = detect_source_type(raw_output) chunk_size = 2000 try: chunk_size = self.config.get_int('pentest', 'output_chunk_size', 2000) except Exception: pass chunks = self._chunk_output(raw_output, chunk_size) all_findings = [] all_summaries = [] status = "unknown" for i, chunk in enumerate(chunks): prefix = f"[{source_type} output" if len(chunks) > 1: prefix += f" part {i+1}/{len(chunks)}" prefix += "]" message = f"{prefix}\n{chunk}" if context: message = f"Context: {context}\n\n{message}" self.llm.clear_history() try: response = self.llm.chat( message, system_prompt=PARSING_SYSTEM_PROMPT, temperature=0.2, max_tokens=512, ) except Exception as e: return { 'summary': f"Parse error: {e}", 'findings': [], 'status': 'failed', 'raw_source': source_type, } summary, findings, chunk_status = self._parse_response(response) all_summaries.append(summary) all_findings.extend(findings) if chunk_status != "unknown": status = chunk_status return { 'summary': " | ".join(all_summaries) if all_summaries else "No summary", 'findings': all_findings, 'status': status, 'raw_source': source_type, } def _chunk_output(self, output: str, max_chunk: int = 2000) -> List[str]: """Split large output into chunks.""" if len(output) <= max_chunk: return [output] chunks = [] lines = output.split('\n') current = [] current_len = 0 for line in lines: if current_len + len(line) + 1 > max_chunk and current: chunks.append('\n'.join(current)) current = [] current_len = 0 current.append(line) current_len += len(line) + 1 if current: chunks.append('\n'.join(current)) return chunks def _parse_response(self, response: str) -> Tuple[str, List[str], str]: """Extract summary, findings, and status from LLM response.""" summary = "" findings = [] status = "unknown" # Extract SUMMARY m = re.search(r'SUMMARY:\s*(.+)', response, re.IGNORECASE) if m: summary = m.group(1).strip() # Extract FINDINGS findings_section = re.search( r'FINDINGS:\s*\n((?:[-*]\s*.+\n?)+)', response, re.IGNORECASE ) if findings_section: for line in findings_section.group(1).strip().split('\n'): line = re.sub(r'^[-*]\s*', '', line).strip() if line: findings.append(line) # Extract STATUS m = re.search(r'STATUS:\s*(\w+)', response, re.IGNORECASE) if m: status = m.group(1).strip().lower() # Fallback: if structured parse failed, use full response if not summary and not findings: summary = response[:200].strip() for line in response.split('\n'): line = line.strip() if line.startswith(('-', '*', '[VULN]', '[CRED]')): findings.append(re.sub(r'^[-*]\s*', '', line)) return summary, findings, status class ReasoningModule: """Maintains PTT and decides next actions.""" def __init__(self, llm, tree: PentestTree): self.llm = llm self.tree = tree def reason(self, parsed_output: dict, context: str = "") -> dict: """Three-step reasoning: update tree, validate, extract next todo. Returns dict with 'tree_updates', 'next_task', 'reasoning'. """ tree_summary = self.tree.render_summary() findings_text = parsed_output.get('summary', '') if parsed_output.get('findings'): findings_text += "\nFindings:\n" for f in parsed_output['findings']: findings_text += f"- {f}\n" message = ( f"Current pentest tree:\n{tree_summary}\n\n" f"New information ({parsed_output.get('raw_source', 'unknown')}):\n" f"{findings_text}" ) if context: message += f"\n\nAdditional context: {context}" self.llm.clear_history() try: response = self.llm.chat( message, system_prompt=REASONING_SYSTEM_PROMPT, temperature=0.3, max_tokens=1024, ) except Exception as e: return { 'tree_updates': [], 'next_task': f"Error during reasoning: {e}", 'reasoning': str(e), } updates = self._parse_tree_updates(response) self._apply_updates(updates) next_task = "" m = re.search(r'NEXT_TASK:\s*(.+)', response, re.IGNORECASE) if m: next_task = m.group(1).strip() reasoning = "" m = re.search(r'REASONING:\s*(.+)', response, re.IGNORECASE | re.DOTALL) if m: reasoning = m.group(1).strip().split('\n')[0] # Fallback: if no NEXT_TASK parsed, get from tree if not next_task: todo = self.tree.get_next_todo() if todo: next_task = todo.label return { 'tree_updates': updates, 'next_task': next_task, 'reasoning': reasoning, } def _parse_tree_updates(self, response: str) -> List[dict]: """Extract tree operations from LLM response.""" updates = [] # Parse ADD operations for m in re.finditer( r'ADD:\s*(\S+)\s*\|\s*(\w+)\s*\|\s*(\d)\s*\|\s*(.+)', response, re.IGNORECASE ): parent = m.group(1).strip() if parent.lower() in ('root', 'none', '-'): parent = None ntype_str = m.group(2).strip().lower() ntype = self._map_node_type(ntype_str) updates.append({ 'operation': 'add', 'parent_id': parent, 'node_type': ntype, 'priority': int(m.group(3)), 'label': m.group(4).strip(), }) # Parse COMPLETE operations for m in re.finditer( r'COMPLETE:\s*(\S+)\s*\|\s*(.+)', response, re.IGNORECASE ): updates.append({ 'operation': 'complete', 'node_id': m.group(1).strip(), 'findings': m.group(2).strip(), }) # Parse NOT_APPLICABLE operations for m in re.finditer( r'NOT_APPLICABLE:\s*(\S+)\s*\|\s*(.+)', response, re.IGNORECASE ): updates.append({ 'operation': 'not_applicable', 'node_id': m.group(1).strip(), 'reason': m.group(2).strip(), }) return updates def _map_node_type(self, type_str: str) -> PTTNodeType: """Map a string to PTTNodeType.""" mapping = { 'recon': PTTNodeType.RECONNAISSANCE, 'reconnaissance': PTTNodeType.RECONNAISSANCE, 'initial_access': PTTNodeType.INITIAL_ACCESS, 'initial': PTTNodeType.INITIAL_ACCESS, 'access': PTTNodeType.INITIAL_ACCESS, 'privesc': PTTNodeType.PRIVILEGE_ESCALATION, 'privilege_escalation': PTTNodeType.PRIVILEGE_ESCALATION, 'escalation': PTTNodeType.PRIVILEGE_ESCALATION, 'lateral': PTTNodeType.LATERAL_MOVEMENT, 'lateral_movement': PTTNodeType.LATERAL_MOVEMENT, 'persistence': PTTNodeType.PERSISTENCE, 'credential': PTTNodeType.CREDENTIAL_ACCESS, 'credential_access': PTTNodeType.CREDENTIAL_ACCESS, 'creds': PTTNodeType.CREDENTIAL_ACCESS, 'exfiltration': PTTNodeType.EXFILTRATION, 'exfil': PTTNodeType.EXFILTRATION, } return mapping.get(type_str.lower(), PTTNodeType.CUSTOM) def _apply_updates(self, updates: List[dict]): """Apply parsed operations to the tree.""" for update in updates: op = update['operation'] if op == 'add': # Resolve parent - could be an ID or a label parent_id = update.get('parent_id') if parent_id and parent_id not in self.tree.nodes: # Try to find by label match node = self.tree.find_node_by_label(parent_id) parent_id = node.id if node else None self.tree.add_node( label=update['label'], node_type=update['node_type'], parent_id=parent_id, priority=update.get('priority', 3), ) elif op == 'complete': node_id = update['node_id'] if node_id not in self.tree.nodes: node = self.tree.find_node_by_label(node_id) if node: node_id = node.id else: continue self.tree.update_node( node_id, status=NodeStatus.COMPLETED, findings=[update.get('findings', '')], ) elif op == 'not_applicable': node_id = update['node_id'] if node_id not in self.tree.nodes: node = self.tree.find_node_by_label(node_id) if node: node_id = node.id else: continue self.tree.update_node( node_id, status=NodeStatus.NOT_APPLICABLE, details=update.get('reason', ''), ) class GenerationModule: """Converts abstract tasks into concrete commands.""" def __init__(self, llm): self.llm = llm def generate(self, task_description: str, target: str, context: str = "") -> dict: """Generate executable commands for a task. Returns dict with 'commands' (list) and 'fallback' (str). """ message = f"Target: {target}\nTask: {task_description}" if context: message += f"\n\nContext: {context}" self.llm.clear_history() try: response = self.llm.chat( message, system_prompt=GENERATION_SYSTEM_PROMPT, temperature=0.2, max_tokens=512, ) except Exception as e: return { 'commands': [], 'fallback': f"Generation error: {e}", 'raw_response': str(e), } commands = self._parse_commands(response) fallback = "" m = re.search(r'FALLBACK:\s*(.+)', response, re.IGNORECASE | re.DOTALL) if m: fallback = m.group(1).strip().split('\n')[0] return { 'commands': commands, 'fallback': fallback, 'raw_response': response, } def _parse_commands(self, response: str) -> List[dict]: """Extract commands from LLM response.""" commands = [] # Parse structured TOOL: ... | ARGS: ... | EXPECT: ... format for m in re.finditer( r'TOOL:\s*(\w+)\s*\|\s*ARGS:\s*(\{[^}]+\})\s*\|\s*EXPECT:\s*(.+)', response, re.IGNORECASE ): tool_name = m.group(1).strip() args_str = m.group(2).strip() expect = m.group(3).strip() # Try to parse JSON args import json try: args = json.loads(args_str) except json.JSONDecodeError: # Try fixing common LLM JSON issues fixed = args_str.replace("'", '"') try: args = json.loads(fixed) except json.JSONDecodeError: args = {'raw': args_str} commands.append({ 'tool': tool_name, 'args': args, 'expect': expect, }) # Fallback: try to find shell commands or MSF commands if not commands: for line in response.split('\n'): line = line.strip() # Detect nmap/shell commands if re.match(r'^(nmap|nikto|gobuster|curl|wget|nc|netcat)\s', line): commands.append({ 'tool': 'shell', 'args': {'command': line}, 'expect': 'Check output for results', }) # Detect MSF use/run commands elif re.match(r'^(use |run |set )', line, re.IGNORECASE): commands.append({ 'tool': 'msf_console', 'args': {'command': line}, 'expect': 'Check output for results', }) return commands # ─── Pipeline Orchestrator ──────────────────────────────────────────── class PentestPipeline: """Orchestrates the three-module pipeline.""" def __init__(self, llm, target: str, tree: PentestTree = None): self.llm = llm self.target = target self.tree = tree or PentestTree(target) self.parser = ParsingModule(llm) self.reasoner = ReasoningModule(llm, self.tree) self.generator = GenerationModule(llm) self.history: List[dict] = [] def process_output(self, raw_output: str, source_type: str = "auto") -> dict: """Full pipeline: parse -> reason -> generate. Returns dict with 'parsed', 'reasoning', 'commands', 'next_task'. """ # Step 1: Parse parsed = self.parser.parse(raw_output, source_type) # Step 2: Reason reasoning = self.reasoner.reason(parsed) # Step 3: Generate commands for the next task generated = {'commands': [], 'fallback': ''} if reasoning.get('next_task'): # Build context from recent findings context = parsed.get('summary', '') generated = self.generator.generate( reasoning['next_task'], self.target, context=context, ) result = { 'parsed': parsed, 'reasoning': reasoning, 'commands': generated.get('commands', []), 'fallback': generated.get('fallback', ''), 'next_task': reasoning.get('next_task', ''), } self.history.append({ 'timestamp': datetime.now().isoformat(), 'result': { 'parsed_summary': parsed.get('summary', ''), 'findings_count': len(parsed.get('findings', [])), 'next_task': reasoning.get('next_task', ''), 'commands_count': len(generated.get('commands', [])), } }) return result def get_initial_plan(self) -> dict: """Generate initial pentest plan for the target.""" prompt = INITIAL_PLAN_PROMPT.format(target=self.target) self.llm.clear_history() try: response = self.llm.chat( prompt, system_prompt=REASONING_SYSTEM_PROMPT, temperature=0.3, max_tokens=1024, ) except Exception as e: return { 'tasks': [], 'first_action': f"Error: {e}", 'reasoning': str(e), } # Parse TASKS tasks = [] for m in re.finditer( r'(\d+)\.\s*(\w+)\s*\|\s*(\d)\s*\|\s*(.+)', response ): ntype_str = m.group(2).strip() ntype = self.reasoner._map_node_type(ntype_str) tasks.append({ 'node_type': ntype, 'priority': int(m.group(3)), 'label': m.group(4).strip(), }) # Add tasks to tree under appropriate branches for task in tasks: # Find matching root branch parent_id = None for root_id in self.tree.root_nodes: root = self.tree.get_node(root_id) if root and root.node_type == task['node_type']: parent_id = root_id break self.tree.add_node( label=task['label'], node_type=task['node_type'], parent_id=parent_id, priority=task['priority'], ) # Parse first action first_action = "" m = re.search(r'FIRST_ACTION:\s*(.+)', response, re.IGNORECASE) if m: first_action = m.group(1).strip() reasoning = "" m = re.search(r'REASONING:\s*(.+)', response, re.IGNORECASE) if m: reasoning = m.group(1).strip() # Generate commands for first action commands = [] if first_action: gen = self.generator.generate(first_action, self.target) commands = gen.get('commands', []) return { 'tasks': tasks, 'first_action': first_action, 'reasoning': reasoning, 'commands': commands, } def inject_information(self, info: str, source: str = "manual") -> dict: """Inject external information and get updated recommendations.""" parsed = { 'summary': info[:200], 'findings': [info], 'status': 'success', 'raw_source': source, } return self.process_output(info, source_type=source) def discuss(self, question: str) -> str: """Ad-hoc question that doesn't affect the tree.""" tree_summary = self.tree.render_summary() prompt = DISCUSS_SYSTEM_PROMPT.format( target=self.target, tree_summary=tree_summary, ) self.llm.clear_history() try: return self.llm.chat( question, system_prompt=prompt, temperature=0.5, max_tokens=1024, ) except Exception as e: return f"Error: {e}"