Autarch/core/rsf.py

451 lines
14 KiB
Python
Raw Permalink Normal View History

"""
AUTARCH RouterSploit Framework Wrapper
Low-level interface for RouterSploit module discovery, import, and execution.
Direct Python import -- no RPC layer needed since RSF is pure Python.
"""
import sys
import os
import re
import threading
import importlib
from io import StringIO
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Tuple, Any
from contextlib import contextmanager
from .config import get_config
class RSFError(Exception):
"""Custom exception for RouterSploit operations."""
pass
@dataclass
class RSFModuleInfo:
"""Metadata for a RouterSploit module."""
name: str = ""
path: str = ""
description: str = ""
authors: Tuple[str, ...] = ()
devices: Tuple[str, ...] = ()
references: Tuple[str, ...] = ()
options: List[Dict[str, Any]] = field(default_factory=list)
module_type: str = "" # exploits, creds, scanners, payloads, encoders, generic
class RSFManager:
"""Manager for RouterSploit framework operations.
Handles sys.path setup, module discovery, dynamic import,
option introspection, stdout capture, and execution.
"""
def __init__(self):
self._available = None
self._module_index = None
self._path_added = False
def _ensure_path(self):
"""Add RSF install path to sys.path if not already present."""
if self._path_added:
return
config = get_config()
install_path = config.get('rsf', 'install_path', '')
if install_path and install_path not in sys.path:
sys.path.insert(0, install_path)
self._path_added = True
@property
def is_available(self) -> bool:
"""Check if RouterSploit is importable. Caches result."""
if self._available is not None:
return self._available
try:
self._ensure_path()
import routersploit
self._available = True
except ImportError:
self._available = False
return self._available
def reset_cache(self):
"""Reset cached state (availability, module index)."""
self._available = None
self._module_index = None
self._path_added = False
def index_all_modules(self) -> List[str]:
"""Discover all RSF modules. Returns list of dotted module paths.
Uses routersploit.core.exploit.utils.index_modules() internally.
Results are cached after first call.
Returns:
List of module paths like 'exploits/routers/dlink/some_module'
"""
if self._module_index is not None:
return self._module_index
if not self.is_available:
raise RSFError("RouterSploit is not available")
try:
self._ensure_path()
from routersploit.core.exploit import utils
modules_dir = os.path.join(
os.path.dirname(utils.__file__),
'..', '..', 'modules'
)
modules_dir = os.path.normpath(modules_dir)
if not os.path.isdir(modules_dir):
# Try from config path
config = get_config()
install_path = config.get('rsf', 'install_path', '')
modules_dir = os.path.join(install_path, 'routersploit', 'modules')
raw_index = utils.index_modules(modules_dir)
# Convert dotted paths to slash paths for display
self._module_index = []
for mod_path in raw_index:
# Remove 'routersploit.modules.' prefix if present
clean = mod_path
for prefix in ('routersploit.modules.', 'modules.'):
if clean.startswith(prefix):
clean = clean[len(prefix):]
# Convert dots to slashes
clean = clean.replace('.', '/')
self._module_index.append(clean)
return self._module_index
except Exception as e:
raise RSFError(f"Failed to index modules: {e}")
def get_module_count(self) -> int:
"""Get total number of indexed modules."""
try:
return len(self.index_all_modules())
except RSFError:
return 0
def get_modules_by_type(self, module_type: str) -> List[str]:
"""Filter modules by type (exploits, creds, scanners, payloads, encoders, generic).
Args:
module_type: One of 'exploits', 'creds', 'scanners', 'payloads', 'encoders', 'generic'
Returns:
List of matching module paths
"""
all_modules = self.index_all_modules()
return [m for m in all_modules if m.startswith(module_type + '/')]
def search_modules(self, query: str) -> List[str]:
"""Search modules by substring match on path.
Args:
query: Search string (case-insensitive)
Returns:
List of matching module paths
"""
all_modules = self.index_all_modules()
query_lower = query.lower()
return [m for m in all_modules if query_lower in m.lower()]
def _dotted_path(self, slash_path: str) -> str:
"""Convert slash path to dotted import path.
Args:
slash_path: e.g. 'exploits/routers/dlink/some_module'
Returns:
Dotted path like 'routersploit.modules.exploits.routers.dlink.some_module'
"""
clean = slash_path.strip('/')
dotted = clean.replace('/', '.')
return f"routersploit.modules.{dotted}"
def load_module(self, path: str) -> Tuple[Any, RSFModuleInfo]:
"""Load a RouterSploit module by path.
Converts slash path to dotted import path, imports using
import_exploit(), instantiates, and extracts metadata.
Args:
path: Module path like 'exploits/routers/dlink/some_module'
Returns:
Tuple of (module_instance, RSFModuleInfo)
Raises:
RSFError: If module cannot be loaded
"""
if not self.is_available:
raise RSFError("RouterSploit is not available")
try:
self._ensure_path()
from routersploit.core.exploit.utils import import_exploit
dotted = self._dotted_path(path)
module_class = import_exploit(dotted)
instance = module_class()
# Extract __info__ dict
info_dict = {}
# RSF metaclass renames __info__ to _ClassName__info__
for attr in dir(instance):
if attr.endswith('__info__') or attr == '__info__':
try:
info_dict = getattr(instance, attr)
if isinstance(info_dict, dict):
break
except AttributeError:
continue
# If not found via mangled name, try class hierarchy
if not info_dict:
for klass in type(instance).__mro__:
mangled = f"_{klass.__name__}__info__"
if hasattr(klass, mangled):
info_dict = getattr(klass, mangled)
if isinstance(info_dict, dict):
break
# Extract options
options = self.get_module_options(instance)
# Determine module type from path
parts = path.split('/')
module_type = parts[0] if parts else ""
module_info = RSFModuleInfo(
name=info_dict.get('name', path.split('/')[-1]),
path=path,
description=info_dict.get('description', ''),
authors=info_dict.get('authors', ()),
devices=info_dict.get('devices', ()),
references=info_dict.get('references', ()),
options=options,
module_type=module_type,
)
return instance, module_info
except Exception as e:
raise RSFError(f"Failed to load module '{path}': {e}")
def get_module_options(self, instance) -> List[Dict[str, Any]]:
"""Introspect Option descriptors on a module instance.
Uses RSF's exploit_attributes metaclass aggregator to get
option names, then reads descriptor properties for details.
Args:
instance: Instantiated RSF module
Returns:
List of dicts with keys: name, type, default, description, current, advanced
"""
options = []
# Try exploit_attributes first (set by metaclass)
exploit_attrs = getattr(type(instance), 'exploit_attributes', {})
if exploit_attrs:
for name, attr_info in exploit_attrs.items():
# attr_info is [display_value, description, advanced]
display_value = attr_info[0] if len(attr_info) > 0 else ""
description = attr_info[1] if len(attr_info) > 1 else ""
advanced = attr_info[2] if len(attr_info) > 2 else False
# Get current value from instance
try:
current = getattr(instance, name, display_value)
except Exception:
current = display_value
# Determine option type from the descriptor class
opt_type = "string"
for klass in type(instance).__mro__:
if name in klass.__dict__:
descriptor = klass.__dict__[name]
opt_type = type(descriptor).__name__.lower()
# Clean up: optip -> ip, optport -> port, etc.
opt_type = opt_type.replace('opt', '')
break
options.append({
'name': name,
'type': opt_type,
'default': display_value,
'description': description,
'current': str(current) if current is not None else "",
'advanced': advanced,
})
else:
# Fallback: inspect instance options property
opt_names = getattr(instance, 'options', [])
for name in opt_names:
try:
current = getattr(instance, name, "")
options.append({
'name': name,
'type': 'string',
'default': str(current),
'description': '',
'current': str(current) if current is not None else "",
'advanced': False,
})
except Exception:
continue
return options
def set_module_option(self, instance, name: str, value: str) -> bool:
"""Set an option on a module instance.
Args:
instance: RSF module instance
name: Option name
value: Value to set (string, will be validated by descriptor)
Returns:
True if set successfully
Raises:
RSFError: If option cannot be set
"""
try:
setattr(instance, name, value)
return True
except Exception as e:
raise RSFError(f"Failed to set option '{name}': {e}")
@contextmanager
def capture_output(self):
"""Context manager to capture stdout/stderr from RSF modules.
RSF modules print directly via their printer system. This
redirects stdout/stderr to StringIO for capturing output.
Yields:
StringIO object containing captured output
"""
captured = StringIO()
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = captured
sys.stderr = captured
yield captured
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
def execute_check(self, instance, timeout: int = 60) -> Tuple[Optional[bool], str]:
"""Run check() on a module with stdout capture and timeout.
check() is the safe vulnerability verification method.
Args:
instance: RSF module instance (already configured)
timeout: Timeout in seconds
Returns:
Tuple of (result, output) where result is True/False/None
"""
result = [None]
output = [""]
error = [None]
def _run():
try:
with self.capture_output() as captured:
check_result = instance.check()
result[0] = check_result
output[0] = captured.getvalue()
except Exception as e:
error[0] = e
try:
output[0] = captured.getvalue()
except Exception:
pass
thread = threading.Thread(target=_run, daemon=True)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
return None, output[0] + "\n[!] Module execution timed out"
if error[0]:
return None, output[0] + f"\n[-] Error: {error[0]}"
return result[0], output[0]
def execute_run(self, instance, timeout: int = 120) -> Tuple[bool, str]:
"""Run run() on a module with stdout capture and timeout.
run() is the full exploit execution method.
Args:
instance: RSF module instance (already configured)
timeout: Timeout in seconds
Returns:
Tuple of (completed, output) where completed indicates
whether execution finished within timeout
"""
completed = [False]
output = [""]
error = [None]
def _run():
try:
with self.capture_output() as captured:
instance.run()
completed[0] = True
output[0] = captured.getvalue()
except Exception as e:
error[0] = e
try:
output[0] = captured.getvalue()
except Exception:
pass
thread = threading.Thread(target=_run, daemon=True)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
return False, output[0] + "\n[!] Module execution timed out"
if error[0]:
return False, output[0] + f"\n[-] Error: {error[0]}"
return completed[0], output[0]
# Singleton instance
_rsf_manager = None
def get_rsf_manager() -> RSFManager:
"""Get the global RSFManager singleton instance."""
global _rsf_manager
if _rsf_manager is None:
_rsf_manager = RSFManager()
return _rsf_manager