Autarch/core/pentest_session.py

280 lines
9.3 KiB
Python
Raw Permalink Normal View History

"""
AUTARCH Pentest Session Manager
Save and resume penetration testing sessions with full state persistence.
"""
import json
import re
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from .pentest_tree import PentestTree, NodeStatus
class PentestSessionState(Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
ERROR = "error"
@dataclass
class SessionEvent:
"""A single event in the session timeline."""
timestamp: str
event_type: str
data: dict
def to_dict(self) -> dict:
return {
'timestamp': self.timestamp,
'event_type': self.event_type,
'data': self.data,
}
@classmethod
def from_dict(cls, data: dict) -> 'SessionEvent':
return cls(
timestamp=data['timestamp'],
event_type=data['event_type'],
data=data.get('data', {}),
)
class PentestSession:
"""Manages a single penetration testing session."""
@classmethod
def _get_dir(cls):
from core.paths import get_data_dir
d = get_data_dir() / "pentest_sessions"
d.mkdir(parents=True, exist_ok=True)
return d
def __init__(self, target: str, session_id: str = None):
self.session_id = session_id or self._generate_id(target)
self.target = target
self.state = PentestSessionState.IDLE
self.tree = PentestTree(target)
self.events: List[SessionEvent] = []
self.findings: List[Dict[str, Any]] = []
self.pipeline_history: List[dict] = []
self.notes: str = ""
self.step_count: int = 0
now = datetime.now().isoformat()
self.created_at = now
self.updated_at = now
@staticmethod
def _generate_id(target: str) -> str:
"""Generate a session ID from target and timestamp."""
safe = re.sub(r'[^a-zA-Z0-9]', '_', target)[:30]
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
return f"{safe}_{ts}"
def start(self):
"""Initialize a new session."""
self.state = PentestSessionState.RUNNING
self.tree.initialize_standard_branches()
self.log_event('state_change', {'from': 'idle', 'to': 'running'})
self.save()
def pause(self):
"""Pause the session and save state."""
prev = self.state.value
self.state = PentestSessionState.PAUSED
self.log_event('state_change', {'from': prev, 'to': 'paused'})
self.save()
def resume(self):
"""Resume a paused session."""
prev = self.state.value
self.state = PentestSessionState.RUNNING
self.log_event('state_change', {'from': prev, 'to': 'running'})
self.save()
def complete(self, summary: str = ""):
"""Mark session as completed."""
prev = self.state.value
self.state = PentestSessionState.COMPLETED
self.log_event('state_change', {
'from': prev,
'to': 'completed',
'summary': summary,
})
self.save()
def set_error(self, error_msg: str):
"""Mark session as errored."""
prev = self.state.value
self.state = PentestSessionState.ERROR
self.log_event('state_change', {
'from': prev,
'to': 'error',
'error': error_msg,
})
self.save()
def log_event(self, event_type: str, data: dict):
"""Log an event to the session timeline."""
event = SessionEvent(
timestamp=datetime.now().isoformat(),
event_type=event_type,
data=data,
)
self.events.append(event)
self.updated_at = event.timestamp
def log_pipeline_result(self, parsed: str, reasoning: str, actions: list):
"""Log a pipeline execution cycle."""
self.pipeline_history.append({
'timestamp': datetime.now().isoformat(),
'step': self.step_count,
'parsed_input': parsed,
'reasoning': reasoning,
'generated_actions': actions,
})
self.step_count += 1
def add_finding(self, title: str, description: str,
severity: str = "medium", node_id: str = None):
"""Add a key finding."""
self.findings.append({
'timestamp': datetime.now().isoformat(),
'severity': severity,
'title': title,
'description': description,
'node_id': node_id,
})
def save(self) -> str:
"""Save session to JSON file. Returns filepath."""
self._get_dir().mkdir(parents=True, exist_ok=True)
filepath = self._get_dir() / f"{self.session_id}.json"
data = {
'session_id': self.session_id,
'target': self.target,
'state': self.state.value,
'created_at': self.created_at,
'updated_at': self.updated_at,
'notes': self.notes,
'step_count': self.step_count,
'tree': self.tree.to_dict(),
'events': [e.to_dict() for e in self.events],
'findings': self.findings,
'pipeline_history': self.pipeline_history,
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
return str(filepath)
@classmethod
def load_session(cls, session_id: str) -> 'PentestSession':
"""Load a session from file."""
filepath = cls._get_dir() / f"{session_id}.json"
if not filepath.exists():
raise FileNotFoundError(f"Session not found: {session_id}")
with open(filepath, 'r') as f:
data = json.load(f)
session = cls(target=data['target'], session_id=data['session_id'])
session.state = PentestSessionState(data['state'])
session.created_at = data['created_at']
session.updated_at = data['updated_at']
session.notes = data.get('notes', '')
session.step_count = data.get('step_count', 0)
session.tree = PentestTree.from_dict(data['tree'])
session.events = [SessionEvent.from_dict(e) for e in data.get('events', [])]
session.findings = data.get('findings', [])
session.pipeline_history = data.get('pipeline_history', [])
return session
@classmethod
def list_sessions(cls) -> List[Dict[str, Any]]:
"""List all saved sessions with summary info."""
cls._get_dir().mkdir(parents=True, exist_ok=True)
sessions = []
for f in sorted(cls._get_dir().glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
try:
with open(f, 'r') as fh:
data = json.load(fh)
stats = {}
if 'tree' in data and 'nodes' in data['tree']:
nodes = data['tree']['nodes']
stats = {
'total': len(nodes),
'todo': sum(1 for n in nodes.values() if n.get('status') == 'todo'),
'completed': sum(1 for n in nodes.values() if n.get('status') == 'completed'),
}
sessions.append({
'session_id': data['session_id'],
'target': data['target'],
'state': data['state'],
'created': data['created_at'],
'updated': data['updated_at'],
'steps': data.get('step_count', 0),
'findings': len(data.get('findings', [])),
'tree_stats': stats,
})
except (json.JSONDecodeError, KeyError):
continue
return sessions
def delete(self) -> bool:
"""Delete this session's file."""
filepath = self._get_dir() / f"{self.session_id}.json"
if filepath.exists():
filepath.unlink()
return True
return False
def export_report(self) -> str:
"""Generate a text summary report of the session."""
stats = self.tree.get_stats()
lines = [
"=" * 60,
"AUTARCH Pentest Session Report",
"=" * 60,
f"Target: {self.target}",
f"Session: {self.session_id}",
f"State: {self.state.value}",
f"Started: {self.created_at}",
f"Updated: {self.updated_at}",
f"Steps: {self.step_count}",
"",
"--- Task Tree ---",
f"Total nodes: {stats['total']}",
f" Completed: {stats.get('completed', 0)}",
f" Todo: {stats.get('todo', 0)}",
f" Active: {stats.get('in_progress', 0)}",
f" N/A: {stats.get('not_applicable', 0)}",
"",
self.tree.render_text(),
"",
]
if self.findings:
lines.append("--- Findings ---")
for i, f in enumerate(self.findings, 1):
sev = f.get('severity', 'medium').upper()
lines.append(f" [{i}] [{sev}] {f['title']}")
lines.append(f" {f['description']}")
lines.append("")
if self.notes:
lines.append("--- Notes ---")
lines.append(self.notes)
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)