""" AUTARCH Pentest Session Manager Save and resume penetration testing sessions with full state persistence. """ import json import re from enum import Enum from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional, List, Dict, Any from .pentest_tree import PentestTree, NodeStatus class PentestSessionState(Enum): IDLE = "idle" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" ERROR = "error" @dataclass class SessionEvent: """A single event in the session timeline.""" timestamp: str event_type: str data: dict def to_dict(self) -> dict: return { 'timestamp': self.timestamp, 'event_type': self.event_type, 'data': self.data, } @classmethod def from_dict(cls, data: dict) -> 'SessionEvent': return cls( timestamp=data['timestamp'], event_type=data['event_type'], data=data.get('data', {}), ) class PentestSession: """Manages a single penetration testing session.""" @classmethod def _get_dir(cls): from core.paths import get_data_dir d = get_data_dir() / "pentest_sessions" d.mkdir(parents=True, exist_ok=True) return d def __init__(self, target: str, session_id: str = None): self.session_id = session_id or self._generate_id(target) self.target = target self.state = PentestSessionState.IDLE self.tree = PentestTree(target) self.events: List[SessionEvent] = [] self.findings: List[Dict[str, Any]] = [] self.pipeline_history: List[dict] = [] self.notes: str = "" self.step_count: int = 0 now = datetime.now().isoformat() self.created_at = now self.updated_at = now @staticmethod def _generate_id(target: str) -> str: """Generate a session ID from target and timestamp.""" safe = re.sub(r'[^a-zA-Z0-9]', '_', target)[:30] ts = datetime.now().strftime('%Y%m%d_%H%M%S') return f"{safe}_{ts}" def start(self): """Initialize a new session.""" self.state = PentestSessionState.RUNNING self.tree.initialize_standard_branches() self.log_event('state_change', {'from': 'idle', 'to': 'running'}) self.save() def pause(self): """Pause the session and save state.""" prev = self.state.value self.state = PentestSessionState.PAUSED self.log_event('state_change', {'from': prev, 'to': 'paused'}) self.save() def resume(self): """Resume a paused session.""" prev = self.state.value self.state = PentestSessionState.RUNNING self.log_event('state_change', {'from': prev, 'to': 'running'}) self.save() def complete(self, summary: str = ""): """Mark session as completed.""" prev = self.state.value self.state = PentestSessionState.COMPLETED self.log_event('state_change', { 'from': prev, 'to': 'completed', 'summary': summary, }) self.save() def set_error(self, error_msg: str): """Mark session as errored.""" prev = self.state.value self.state = PentestSessionState.ERROR self.log_event('state_change', { 'from': prev, 'to': 'error', 'error': error_msg, }) self.save() def log_event(self, event_type: str, data: dict): """Log an event to the session timeline.""" event = SessionEvent( timestamp=datetime.now().isoformat(), event_type=event_type, data=data, ) self.events.append(event) self.updated_at = event.timestamp def log_pipeline_result(self, parsed: str, reasoning: str, actions: list): """Log a pipeline execution cycle.""" self.pipeline_history.append({ 'timestamp': datetime.now().isoformat(), 'step': self.step_count, 'parsed_input': parsed, 'reasoning': reasoning, 'generated_actions': actions, }) self.step_count += 1 def add_finding(self, title: str, description: str, severity: str = "medium", node_id: str = None): """Add a key finding.""" self.findings.append({ 'timestamp': datetime.now().isoformat(), 'severity': severity, 'title': title, 'description': description, 'node_id': node_id, }) def save(self) -> str: """Save session to JSON file. Returns filepath.""" self._get_dir().mkdir(parents=True, exist_ok=True) filepath = self._get_dir() / f"{self.session_id}.json" data = { 'session_id': self.session_id, 'target': self.target, 'state': self.state.value, 'created_at': self.created_at, 'updated_at': self.updated_at, 'notes': self.notes, 'step_count': self.step_count, 'tree': self.tree.to_dict(), 'events': [e.to_dict() for e in self.events], 'findings': self.findings, 'pipeline_history': self.pipeline_history, } with open(filepath, 'w') as f: json.dump(data, f, indent=2) return str(filepath) @classmethod def load_session(cls, session_id: str) -> 'PentestSession': """Load a session from file.""" filepath = cls._get_dir() / f"{session_id}.json" if not filepath.exists(): raise FileNotFoundError(f"Session not found: {session_id}") with open(filepath, 'r') as f: data = json.load(f) session = cls(target=data['target'], session_id=data['session_id']) session.state = PentestSessionState(data['state']) session.created_at = data['created_at'] session.updated_at = data['updated_at'] session.notes = data.get('notes', '') session.step_count = data.get('step_count', 0) session.tree = PentestTree.from_dict(data['tree']) session.events = [SessionEvent.from_dict(e) for e in data.get('events', [])] session.findings = data.get('findings', []) session.pipeline_history = data.get('pipeline_history', []) return session @classmethod def list_sessions(cls) -> List[Dict[str, Any]]: """List all saved sessions with summary info.""" cls._get_dir().mkdir(parents=True, exist_ok=True) sessions = [] for f in sorted(cls._get_dir().glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): try: with open(f, 'r') as fh: data = json.load(fh) stats = {} if 'tree' in data and 'nodes' in data['tree']: nodes = data['tree']['nodes'] stats = { 'total': len(nodes), 'todo': sum(1 for n in nodes.values() if n.get('status') == 'todo'), 'completed': sum(1 for n in nodes.values() if n.get('status') == 'completed'), } sessions.append({ 'session_id': data['session_id'], 'target': data['target'], 'state': data['state'], 'created': data['created_at'], 'updated': data['updated_at'], 'steps': data.get('step_count', 0), 'findings': len(data.get('findings', [])), 'tree_stats': stats, }) except (json.JSONDecodeError, KeyError): continue return sessions def delete(self) -> bool: """Delete this session's file.""" filepath = self._get_dir() / f"{self.session_id}.json" if filepath.exists(): filepath.unlink() return True return False def export_report(self) -> str: """Generate a text summary report of the session.""" stats = self.tree.get_stats() lines = [ "=" * 60, "AUTARCH Pentest Session Report", "=" * 60, f"Target: {self.target}", f"Session: {self.session_id}", f"State: {self.state.value}", f"Started: {self.created_at}", f"Updated: {self.updated_at}", f"Steps: {self.step_count}", "", "--- Task Tree ---", f"Total nodes: {stats['total']}", f" Completed: {stats.get('completed', 0)}", f" Todo: {stats.get('todo', 0)}", f" Active: {stats.get('in_progress', 0)}", f" N/A: {stats.get('not_applicable', 0)}", "", self.tree.render_text(), "", ] if self.findings: lines.append("--- Findings ---") for i, f in enumerate(self.findings, 1): sev = f.get('severity', 'medium').upper() lines.append(f" [{i}] [{sev}] {f['title']}") lines.append(f" {f['description']}") lines.append("") if self.notes: lines.append("--- Notes ---") lines.append(self.notes) lines.append("") lines.append("=" * 60) return "\n".join(lines)