Autarch Will Control The Internet
This commit is contained in:
350
core/pentest_tree.py
Normal file
350
core/pentest_tree.py
Normal file
@@ -0,0 +1,350 @@
|
||||
"""
|
||||
AUTARCH Penetration Testing Tree (PTT)
|
||||
Hierarchical task tracker for structured penetration testing workflows.
|
||||
Based on PentestGPT's USENIX paper methodology.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
|
||||
class NodeStatus(Enum):
|
||||
TODO = "todo"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
NOT_APPLICABLE = "not_applicable"
|
||||
|
||||
|
||||
class PTTNodeType(Enum):
|
||||
RECONNAISSANCE = "reconnaissance"
|
||||
INITIAL_ACCESS = "initial_access"
|
||||
PRIVILEGE_ESCALATION = "privilege_escalation"
|
||||
LATERAL_MOVEMENT = "lateral_movement"
|
||||
PERSISTENCE = "persistence"
|
||||
CREDENTIAL_ACCESS = "credential_access"
|
||||
EXFILTRATION = "exfiltration"
|
||||
CUSTOM = "custom"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PTTNode:
|
||||
"""A single node in the Penetration Testing Tree."""
|
||||
id: str
|
||||
label: str
|
||||
node_type: PTTNodeType
|
||||
status: NodeStatus = NodeStatus.TODO
|
||||
parent_id: Optional[str] = None
|
||||
children: List[str] = field(default_factory=list)
|
||||
details: str = ""
|
||||
tool_output: Optional[str] = None
|
||||
findings: List[str] = field(default_factory=list)
|
||||
priority: int = 3
|
||||
created_at: str = ""
|
||||
updated_at: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
now = datetime.now().isoformat()
|
||||
if not self.created_at:
|
||||
self.created_at = now
|
||||
if not self.updated_at:
|
||||
self.updated_at = now
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'id': self.id,
|
||||
'label': self.label,
|
||||
'node_type': self.node_type.value,
|
||||
'status': self.status.value,
|
||||
'parent_id': self.parent_id,
|
||||
'children': self.children.copy(),
|
||||
'details': self.details,
|
||||
'tool_output': self.tool_output,
|
||||
'findings': self.findings.copy(),
|
||||
'priority': self.priority,
|
||||
'created_at': self.created_at,
|
||||
'updated_at': self.updated_at,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> 'PTTNode':
|
||||
return cls(
|
||||
id=data['id'],
|
||||
label=data['label'],
|
||||
node_type=PTTNodeType(data['node_type']),
|
||||
status=NodeStatus(data['status']),
|
||||
parent_id=data.get('parent_id'),
|
||||
children=data.get('children', []),
|
||||
details=data.get('details', ''),
|
||||
tool_output=data.get('tool_output'),
|
||||
findings=data.get('findings', []),
|
||||
priority=data.get('priority', 3),
|
||||
created_at=data.get('created_at', ''),
|
||||
updated_at=data.get('updated_at', ''),
|
||||
)
|
||||
|
||||
|
||||
# Status display symbols
|
||||
_STATUS_SYMBOLS = {
|
||||
NodeStatus.TODO: '[ ]',
|
||||
NodeStatus.IN_PROGRESS: '[~]',
|
||||
NodeStatus.COMPLETED: '[x]',
|
||||
NodeStatus.NOT_APPLICABLE: '[-]',
|
||||
}
|
||||
|
||||
|
||||
class PentestTree:
|
||||
"""Penetration Testing Tree - hierarchical task tracker."""
|
||||
|
||||
def __init__(self, target: str):
|
||||
self.target = target
|
||||
self.nodes: Dict[str, PTTNode] = {}
|
||||
self.root_nodes: List[str] = []
|
||||
now = datetime.now().isoformat()
|
||||
self.created_at = now
|
||||
self.updated_at = now
|
||||
|
||||
def add_node(
|
||||
self,
|
||||
label: str,
|
||||
node_type: PTTNodeType,
|
||||
parent_id: Optional[str] = None,
|
||||
details: str = "",
|
||||
priority: int = 3,
|
||||
status: NodeStatus = NodeStatus.TODO,
|
||||
) -> str:
|
||||
"""Add a node to the tree. Returns the new node's ID."""
|
||||
node_id = str(uuid.uuid4())[:8]
|
||||
node = PTTNode(
|
||||
id=node_id,
|
||||
label=label,
|
||||
node_type=node_type,
|
||||
status=status,
|
||||
parent_id=parent_id,
|
||||
details=details,
|
||||
priority=priority,
|
||||
)
|
||||
|
||||
self.nodes[node_id] = node
|
||||
|
||||
if parent_id and parent_id in self.nodes:
|
||||
self.nodes[parent_id].children.append(node_id)
|
||||
elif parent_id is None:
|
||||
self.root_nodes.append(node_id)
|
||||
|
||||
self.updated_at = datetime.now().isoformat()
|
||||
return node_id
|
||||
|
||||
def update_node(
|
||||
self,
|
||||
node_id: str,
|
||||
status: Optional[NodeStatus] = None,
|
||||
details: Optional[str] = None,
|
||||
tool_output: Optional[str] = None,
|
||||
findings: Optional[List[str]] = None,
|
||||
priority: Optional[int] = None,
|
||||
label: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Update a node's properties. Returns True if found and updated."""
|
||||
node = self.nodes.get(node_id)
|
||||
if not node:
|
||||
return False
|
||||
|
||||
if status is not None:
|
||||
node.status = status
|
||||
if details is not None:
|
||||
node.details = details
|
||||
if tool_output is not None:
|
||||
node.tool_output = tool_output
|
||||
if findings is not None:
|
||||
node.findings.extend(findings)
|
||||
if priority is not None:
|
||||
node.priority = priority
|
||||
if label is not None:
|
||||
node.label = label
|
||||
|
||||
node.updated_at = datetime.now().isoformat()
|
||||
self.updated_at = node.updated_at
|
||||
return True
|
||||
|
||||
def delete_node(self, node_id: str) -> bool:
|
||||
"""Delete a node and all its children recursively."""
|
||||
node = self.nodes.get(node_id)
|
||||
if not node:
|
||||
return False
|
||||
|
||||
# Recursively delete children
|
||||
for child_id in node.children.copy():
|
||||
self.delete_node(child_id)
|
||||
|
||||
# Remove from parent's children list
|
||||
if node.parent_id and node.parent_id in self.nodes:
|
||||
parent = self.nodes[node.parent_id]
|
||||
if node_id in parent.children:
|
||||
parent.children.remove(node_id)
|
||||
|
||||
# Remove from root nodes if applicable
|
||||
if node_id in self.root_nodes:
|
||||
self.root_nodes.remove(node_id)
|
||||
|
||||
del self.nodes[node_id]
|
||||
self.updated_at = datetime.now().isoformat()
|
||||
return True
|
||||
|
||||
def get_node(self, node_id: str) -> Optional[PTTNode]:
|
||||
return self.nodes.get(node_id)
|
||||
|
||||
def get_next_todo(self) -> Optional[PTTNode]:
|
||||
"""Get the highest priority TODO node."""
|
||||
todos = [n for n in self.nodes.values() if n.status == NodeStatus.TODO]
|
||||
if not todos:
|
||||
return None
|
||||
return min(todos, key=lambda n: n.priority)
|
||||
|
||||
def get_all_by_status(self, status: NodeStatus) -> List[PTTNode]:
|
||||
return [n for n in self.nodes.values() if n.status == status]
|
||||
|
||||
def get_subtree(self, node_id: str) -> List[PTTNode]:
|
||||
"""Get all nodes in a subtree (including the root)."""
|
||||
node = self.nodes.get(node_id)
|
||||
if not node:
|
||||
return []
|
||||
result = [node]
|
||||
for child_id in node.children:
|
||||
result.extend(self.get_subtree(child_id))
|
||||
return result
|
||||
|
||||
def find_node_by_label(self, label: str) -> Optional[PTTNode]:
|
||||
"""Find a node by label (case-insensitive partial match)."""
|
||||
label_lower = label.lower()
|
||||
for node in self.nodes.values():
|
||||
if label_lower in node.label.lower():
|
||||
return node
|
||||
return None
|
||||
|
||||
def get_stats(self) -> Dict[str, int]:
|
||||
"""Get tree statistics."""
|
||||
stats = {'total': len(self.nodes)}
|
||||
for status in NodeStatus:
|
||||
stats[status.value] = len(self.get_all_by_status(status))
|
||||
return stats
|
||||
|
||||
def render_text(self) -> str:
|
||||
"""Render full tree as indented text for terminal display."""
|
||||
if not self.root_nodes:
|
||||
return " (empty tree)"
|
||||
|
||||
lines = [f"Target: {self.target}"]
|
||||
lines.append("")
|
||||
|
||||
for root_id in self.root_nodes:
|
||||
self._render_node(root_id, lines, indent=0)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _render_node(self, node_id: str, lines: List[str], indent: int):
|
||||
node = self.nodes.get(node_id)
|
||||
if not node:
|
||||
return
|
||||
|
||||
prefix = " " * indent
|
||||
symbol = _STATUS_SYMBOLS.get(node.status, '[ ]')
|
||||
priority_str = f" P{node.priority}" if node.priority != 3 else ""
|
||||
lines.append(f"{prefix}{symbol} {node.label}{priority_str}")
|
||||
|
||||
if node.findings:
|
||||
for finding in node.findings[:3]:
|
||||
lines.append(f"{prefix} -> {finding}")
|
||||
|
||||
for child_id in node.children:
|
||||
self._render_node(child_id, lines, indent + 1)
|
||||
|
||||
def render_summary(self) -> str:
|
||||
"""Render compact summary for LLM context injection.
|
||||
Designed to fit within tight token budgets (4096 ctx).
|
||||
Only shows TODO and IN_PROGRESS nodes with minimal detail.
|
||||
"""
|
||||
stats = self.get_stats()
|
||||
lines = [
|
||||
f"Target: {self.target}",
|
||||
f"Nodes: {stats['total']} total, {stats['todo']} todo, "
|
||||
f"{stats['completed']} done, {stats['in_progress']} active",
|
||||
]
|
||||
|
||||
# Show active and todo nodes only
|
||||
active = self.get_all_by_status(NodeStatus.IN_PROGRESS)
|
||||
todos = sorted(
|
||||
self.get_all_by_status(NodeStatus.TODO),
|
||||
key=lambda n: n.priority
|
||||
)
|
||||
|
||||
if active:
|
||||
lines.append("Active:")
|
||||
for n in active:
|
||||
lines.append(f" [{n.id}] {n.label}")
|
||||
|
||||
if todos:
|
||||
lines.append("Todo:")
|
||||
for n in todos[:5]:
|
||||
lines.append(f" [{n.id}] P{n.priority} {n.label}")
|
||||
if len(todos) > 5:
|
||||
lines.append(f" ... and {len(todos) - 5} more")
|
||||
|
||||
# Show recent findings (last 5)
|
||||
all_findings = []
|
||||
for node in self.nodes.values():
|
||||
if node.findings:
|
||||
for f in node.findings:
|
||||
all_findings.append(f)
|
||||
if all_findings:
|
||||
lines.append("Key findings:")
|
||||
for f in all_findings[-5:]:
|
||||
lines.append(f" - {f}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def initialize_standard_branches(self):
|
||||
"""Create standard MITRE ATT&CK-aligned top-level branches."""
|
||||
branches = [
|
||||
("Reconnaissance", PTTNodeType.RECONNAISSANCE, 1,
|
||||
"Information gathering and target enumeration"),
|
||||
("Initial Access", PTTNodeType.INITIAL_ACCESS, 2,
|
||||
"Gaining initial foothold on target"),
|
||||
("Privilege Escalation", PTTNodeType.PRIVILEGE_ESCALATION, 3,
|
||||
"Escalating from initial access to higher privileges"),
|
||||
("Lateral Movement", PTTNodeType.LATERAL_MOVEMENT, 4,
|
||||
"Moving to other systems in the network"),
|
||||
("Credential Access", PTTNodeType.CREDENTIAL_ACCESS, 3,
|
||||
"Obtaining credentials and secrets"),
|
||||
("Persistence", PTTNodeType.PERSISTENCE, 5,
|
||||
"Maintaining access to compromised systems"),
|
||||
]
|
||||
|
||||
for label, ntype, priority, details in branches:
|
||||
self.add_node(
|
||||
label=label,
|
||||
node_type=ntype,
|
||||
priority=priority,
|
||||
details=details,
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'target': self.target,
|
||||
'created_at': self.created_at,
|
||||
'updated_at': self.updated_at,
|
||||
'root_nodes': self.root_nodes.copy(),
|
||||
'nodes': {nid: n.to_dict() for nid, n in self.nodes.items()},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> 'PentestTree':
|
||||
tree = cls(target=data['target'])
|
||||
tree.created_at = data.get('created_at', '')
|
||||
tree.updated_at = data.get('updated_at', '')
|
||||
tree.root_nodes = data.get('root_nodes', [])
|
||||
for nid, ndata in data.get('nodes', {}).items():
|
||||
tree.nodes[nid] = PTTNode.from_dict(ndata)
|
||||
return tree
|
||||
Reference in New Issue
Block a user