Full security platform with web dashboard, 16 Flask blueprints, 26 modules, autonomous AI agent, WebUSB hardware support, and Archon Android companion app. Includes Hash Toolkit, debug console, anti-stalkerware shield, Metasploit/RouterSploit integration, WireGuard VPN, OSINT reconnaissance, and multi-backend LLM support. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
280 lines
9.3 KiB
Python
280 lines
9.3 KiB
Python
"""
|
|
AUTARCH Pentest Session Manager
|
|
Save and resume penetration testing sessions with full state persistence.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
|
|
from .pentest_tree import PentestTree, NodeStatus
|
|
|
|
|
|
class PentestSessionState(Enum):
|
|
IDLE = "idle"
|
|
RUNNING = "running"
|
|
PAUSED = "paused"
|
|
COMPLETED = "completed"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class SessionEvent:
|
|
"""A single event in the session timeline."""
|
|
timestamp: str
|
|
event_type: str
|
|
data: dict
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'timestamp': self.timestamp,
|
|
'event_type': self.event_type,
|
|
'data': self.data,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> 'SessionEvent':
|
|
return cls(
|
|
timestamp=data['timestamp'],
|
|
event_type=data['event_type'],
|
|
data=data.get('data', {}),
|
|
)
|
|
|
|
|
|
class PentestSession:
|
|
"""Manages a single penetration testing session."""
|
|
|
|
@classmethod
|
|
def _get_dir(cls):
|
|
from core.paths import get_data_dir
|
|
d = get_data_dir() / "pentest_sessions"
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
def __init__(self, target: str, session_id: str = None):
|
|
self.session_id = session_id or self._generate_id(target)
|
|
self.target = target
|
|
self.state = PentestSessionState.IDLE
|
|
self.tree = PentestTree(target)
|
|
self.events: List[SessionEvent] = []
|
|
self.findings: List[Dict[str, Any]] = []
|
|
self.pipeline_history: List[dict] = []
|
|
self.notes: str = ""
|
|
self.step_count: int = 0
|
|
now = datetime.now().isoformat()
|
|
self.created_at = now
|
|
self.updated_at = now
|
|
|
|
@staticmethod
|
|
def _generate_id(target: str) -> str:
|
|
"""Generate a session ID from target and timestamp."""
|
|
safe = re.sub(r'[^a-zA-Z0-9]', '_', target)[:30]
|
|
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
return f"{safe}_{ts}"
|
|
|
|
def start(self):
|
|
"""Initialize a new session."""
|
|
self.state = PentestSessionState.RUNNING
|
|
self.tree.initialize_standard_branches()
|
|
self.log_event('state_change', {'from': 'idle', 'to': 'running'})
|
|
self.save()
|
|
|
|
def pause(self):
|
|
"""Pause the session and save state."""
|
|
prev = self.state.value
|
|
self.state = PentestSessionState.PAUSED
|
|
self.log_event('state_change', {'from': prev, 'to': 'paused'})
|
|
self.save()
|
|
|
|
def resume(self):
|
|
"""Resume a paused session."""
|
|
prev = self.state.value
|
|
self.state = PentestSessionState.RUNNING
|
|
self.log_event('state_change', {'from': prev, 'to': 'running'})
|
|
self.save()
|
|
|
|
def complete(self, summary: str = ""):
|
|
"""Mark session as completed."""
|
|
prev = self.state.value
|
|
self.state = PentestSessionState.COMPLETED
|
|
self.log_event('state_change', {
|
|
'from': prev,
|
|
'to': 'completed',
|
|
'summary': summary,
|
|
})
|
|
self.save()
|
|
|
|
def set_error(self, error_msg: str):
|
|
"""Mark session as errored."""
|
|
prev = self.state.value
|
|
self.state = PentestSessionState.ERROR
|
|
self.log_event('state_change', {
|
|
'from': prev,
|
|
'to': 'error',
|
|
'error': error_msg,
|
|
})
|
|
self.save()
|
|
|
|
def log_event(self, event_type: str, data: dict):
|
|
"""Log an event to the session timeline."""
|
|
event = SessionEvent(
|
|
timestamp=datetime.now().isoformat(),
|
|
event_type=event_type,
|
|
data=data,
|
|
)
|
|
self.events.append(event)
|
|
self.updated_at = event.timestamp
|
|
|
|
def log_pipeline_result(self, parsed: str, reasoning: str, actions: list):
|
|
"""Log a pipeline execution cycle."""
|
|
self.pipeline_history.append({
|
|
'timestamp': datetime.now().isoformat(),
|
|
'step': self.step_count,
|
|
'parsed_input': parsed,
|
|
'reasoning': reasoning,
|
|
'generated_actions': actions,
|
|
})
|
|
self.step_count += 1
|
|
|
|
def add_finding(self, title: str, description: str,
|
|
severity: str = "medium", node_id: str = None):
|
|
"""Add a key finding."""
|
|
self.findings.append({
|
|
'timestamp': datetime.now().isoformat(),
|
|
'severity': severity,
|
|
'title': title,
|
|
'description': description,
|
|
'node_id': node_id,
|
|
})
|
|
|
|
def save(self) -> str:
|
|
"""Save session to JSON file. Returns filepath."""
|
|
self._get_dir().mkdir(parents=True, exist_ok=True)
|
|
filepath = self._get_dir() / f"{self.session_id}.json"
|
|
|
|
data = {
|
|
'session_id': self.session_id,
|
|
'target': self.target,
|
|
'state': self.state.value,
|
|
'created_at': self.created_at,
|
|
'updated_at': self.updated_at,
|
|
'notes': self.notes,
|
|
'step_count': self.step_count,
|
|
'tree': self.tree.to_dict(),
|
|
'events': [e.to_dict() for e in self.events],
|
|
'findings': self.findings,
|
|
'pipeline_history': self.pipeline_history,
|
|
}
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
return str(filepath)
|
|
|
|
@classmethod
|
|
def load_session(cls, session_id: str) -> 'PentestSession':
|
|
"""Load a session from file."""
|
|
filepath = cls._get_dir() / f"{session_id}.json"
|
|
if not filepath.exists():
|
|
raise FileNotFoundError(f"Session not found: {session_id}")
|
|
|
|
with open(filepath, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
session = cls(target=data['target'], session_id=data['session_id'])
|
|
session.state = PentestSessionState(data['state'])
|
|
session.created_at = data['created_at']
|
|
session.updated_at = data['updated_at']
|
|
session.notes = data.get('notes', '')
|
|
session.step_count = data.get('step_count', 0)
|
|
session.tree = PentestTree.from_dict(data['tree'])
|
|
session.events = [SessionEvent.from_dict(e) for e in data.get('events', [])]
|
|
session.findings = data.get('findings', [])
|
|
session.pipeline_history = data.get('pipeline_history', [])
|
|
return session
|
|
|
|
@classmethod
|
|
def list_sessions(cls) -> List[Dict[str, Any]]:
|
|
"""List all saved sessions with summary info."""
|
|
cls._get_dir().mkdir(parents=True, exist_ok=True)
|
|
sessions = []
|
|
for f in sorted(cls._get_dir().glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
|
|
try:
|
|
with open(f, 'r') as fh:
|
|
data = json.load(fh)
|
|
stats = {}
|
|
if 'tree' in data and 'nodes' in data['tree']:
|
|
nodes = data['tree']['nodes']
|
|
stats = {
|
|
'total': len(nodes),
|
|
'todo': sum(1 for n in nodes.values() if n.get('status') == 'todo'),
|
|
'completed': sum(1 for n in nodes.values() if n.get('status') == 'completed'),
|
|
}
|
|
sessions.append({
|
|
'session_id': data['session_id'],
|
|
'target': data['target'],
|
|
'state': data['state'],
|
|
'created': data['created_at'],
|
|
'updated': data['updated_at'],
|
|
'steps': data.get('step_count', 0),
|
|
'findings': len(data.get('findings', [])),
|
|
'tree_stats': stats,
|
|
})
|
|
except (json.JSONDecodeError, KeyError):
|
|
continue
|
|
return sessions
|
|
|
|
def delete(self) -> bool:
|
|
"""Delete this session's file."""
|
|
filepath = self._get_dir() / f"{self.session_id}.json"
|
|
if filepath.exists():
|
|
filepath.unlink()
|
|
return True
|
|
return False
|
|
|
|
def export_report(self) -> str:
|
|
"""Generate a text summary report of the session."""
|
|
stats = self.tree.get_stats()
|
|
lines = [
|
|
"=" * 60,
|
|
"AUTARCH Pentest Session Report",
|
|
"=" * 60,
|
|
f"Target: {self.target}",
|
|
f"Session: {self.session_id}",
|
|
f"State: {self.state.value}",
|
|
f"Started: {self.created_at}",
|
|
f"Updated: {self.updated_at}",
|
|
f"Steps: {self.step_count}",
|
|
"",
|
|
"--- Task Tree ---",
|
|
f"Total nodes: {stats['total']}",
|
|
f" Completed: {stats.get('completed', 0)}",
|
|
f" Todo: {stats.get('todo', 0)}",
|
|
f" Active: {stats.get('in_progress', 0)}",
|
|
f" N/A: {stats.get('not_applicable', 0)}",
|
|
"",
|
|
self.tree.render_text(),
|
|
"",
|
|
]
|
|
|
|
if self.findings:
|
|
lines.append("--- Findings ---")
|
|
for i, f in enumerate(self.findings, 1):
|
|
sev = f.get('severity', 'medium').upper()
|
|
lines.append(f" [{i}] [{sev}] {f['title']}")
|
|
lines.append(f" {f['description']}")
|
|
lines.append("")
|
|
|
|
if self.notes:
|
|
lines.append("--- Notes ---")
|
|
lines.append(self.notes)
|
|
lines.append("")
|
|
|
|
lines.append("=" * 60)
|
|
return "\n".join(lines)
|