Autarch/modules/reverse_eng.py
DigiJ cdde8717d0 v2.3.0 — RCS exploit v2.0, Starlink hack, SMS forge, Archon RCS module
Major RCS/SMS exploitation rewrite (v2.0):
- bugle_db direct extraction (plaintext messages, no decryption needed)
- CVE-2024-0044 run-as privilege escalation (Android 12-13)
- AOSP RCS provider queries (content://rcs/)
- Archon app relay for Shizuku-elevated bugle_db access
- 7-tab web UI: Extract, Database, Forge, Modify, Exploit, Backup, Monitor
- SQL query interface for extracted databases
- Full backup/restore/clone with SMS Backup & Restore XML support
- Known CVE database (CVE-2023-24033, CVE-2024-49415, CVE-2025-48593)
- IMS/RCS diagnostics, Phenotype verbose logging, Pixel tools

New modules: Starlink hack, SMS forge, SDR drone detection
Archon Android app: RCS messaging module with Shizuku integration
Updated manuals to v2.3, 60 web blueprints confirmed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-03 13:50:59 -08:00

1980 lines
76 KiB
Python

"""AUTARCH Reverse Engineering Toolkit
Binary analysis, PE/ELF parsing, disassembly, YARA scanning,
hex viewing, packer detection, and Ghidra headless integration.
"""
DESCRIPTION = "Binary analysis, disassembly & reverse engineering"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "analyze"
import os
import sys
import re
import math
import json
import struct
import hashlib
import subprocess
import tempfile
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from core.paths import get_data_dir, find_tool
except ImportError:
def get_data_dir():
return str(Path(__file__).parent.parent / 'data')
def find_tool(name, extra_paths=None):
import shutil
return shutil.which(name)
try:
from core.banner import Colors, clear_screen, display_banner
except ImportError:
class Colors:
CYAN = BOLD = GREEN = YELLOW = RED = WHITE = DIM = RESET = ""
def clear_screen(): pass
def display_banner(): pass
# Optional: capstone disassembler
try:
import capstone
HAS_CAPSTONE = True
except ImportError:
HAS_CAPSTONE = False
# Optional: yara-python
try:
import yara
HAS_YARA = True
except ImportError:
HAS_YARA = False
# ── Magic Bytes ──────────────────────────────────────────────────────────────
MAGIC_BYTES = {
b'\x4d\x5a': 'PE',
b'\x7fELF': 'ELF',
b'\xfe\xed\xfa\xce': 'Mach-O (32-bit)',
b'\xfe\xed\xfa\xcf': 'Mach-O (64-bit)',
b'\xce\xfa\xed\xfe': 'Mach-O (32-bit, reversed)',
b'\xcf\xfa\xed\xfe': 'Mach-O (64-bit, reversed)',
b'\xca\xfe\xba\xbe': 'Mach-O (Universal)',
b'\x50\x4b\x03\x04': 'ZIP/JAR/APK/DOCX',
b'\x50\x4b\x05\x06': 'ZIP (empty)',
b'\x25\x50\x44\x46': 'PDF',
b'\xd0\xcf\x11\xe0': 'OLE2 (DOC/XLS/PPT)',
b'\x89\x50\x4e\x47': 'PNG',
b'\xff\xd8\xff': 'JPEG',
b'\x47\x49\x46\x38': 'GIF',
b'\x1f\x8b': 'GZIP',
b'\x42\x5a\x68': 'BZIP2',
b'\xfd\x37\x7a\x58': 'XZ',
b'\x37\x7a\xbc\xaf': '7-Zip',
b'\x52\x61\x72\x21': 'RAR',
b'\xca\xfe\xba\xbe': 'Java Class / Mach-O Universal',
b'\x7f\x45\x4c\x46': 'ELF',
b'\x23\x21': 'Script (shebang)',
b'\x00\x61\x73\x6d': 'WebAssembly',
b'\xed\xab\xee\xdb': 'RPM',
b'\x21\x3c\x61\x72': 'Debian/AR archive',
}
# ── Packer Signatures ───────────────────────────────────────────────────────
PACKER_SIGNATURES = {
'UPX': {
'section_names': [b'UPX0', b'UPX1', b'UPX2', b'UPX!'],
'magic': [b'UPX!', b'UPX0', b'\x55\x50\x58'],
'description': 'Ultimate Packer for Executables',
},
'Themida': {
'section_names': [b'.themida', b'.winlice'],
'magic': [],
'description': 'Themida / WinLicense protector',
},
'ASPack': {
'section_names': [b'.aspack', b'.adata'],
'magic': [b'\x60\xe8\x00\x00\x00\x00\x5d\x81\xed'],
'description': 'ASPack packer',
},
'MPRESS': {
'section_names': [b'.MPRESS1', b'.MPRESS2'],
'magic': [],
'description': 'MPRESS packer',
},
'VMProtect': {
'section_names': [b'.vmp0', b'.vmp1', b'.vmp2'],
'magic': [],
'description': 'VMProtect software protection',
},
'PECompact': {
'section_names': [b'PEC2', b'pec1', b'pec2', b'PEC2TO'],
'magic': [],
'description': 'PECompact packer',
},
'Petite': {
'section_names': [b'.petite'],
'magic': [b'\xb8\x00\x00\x00\x00\x66\x9c\x60\x50'],
'description': 'Petite packer',
},
'NSPack': {
'section_names': [b'.nsp0', b'.nsp1', b'.nsp2', b'nsp0', b'nsp1'],
'magic': [],
'description': 'NSPack (North Star) packer',
},
'Enigma': {
'section_names': [b'.enigma1', b'.enigma2'],
'magic': [],
'description': 'Enigma Protector',
},
'MEW': {
'section_names': [b'MEW'],
'magic': [],
'description': 'MEW packer',
},
}
# ── PE Constants ─────────────────────────────────────────────────────────────
PE_MACHINE_TYPES = {
0x0: 'Unknown',
0x14c: 'x86 (i386)',
0x166: 'MIPS R4000',
0x1a2: 'Hitachi SH3',
0x1a6: 'Hitachi SH4',
0x1c0: 'ARM',
0x1c4: 'ARM Thumb-2',
0x200: 'Intel IA-64',
0x8664: 'x86-64 (AMD64)',
0xaa64: 'ARM64 (AArch64)',
0x5032: 'RISC-V 32-bit',
0x5064: 'RISC-V 64-bit',
}
PE_SECTION_FLAGS = {
0x00000020: 'CODE',
0x00000040: 'INITIALIZED_DATA',
0x00000080: 'UNINITIALIZED_DATA',
0x02000000: 'DISCARDABLE',
0x04000000: 'NOT_CACHED',
0x08000000: 'NOT_PAGED',
0x10000000: 'SHARED',
0x20000000: 'EXECUTE',
0x40000000: 'READ',
0x80000000: 'WRITE',
}
# ── ELF Constants ────────────────────────────────────────────────────────────
ELF_MACHINE_TYPES = {
0: 'None',
2: 'SPARC',
3: 'x86',
8: 'MIPS',
20: 'PowerPC',
21: 'PowerPC64',
40: 'ARM',
43: 'SPARC V9',
50: 'IA-64',
62: 'x86-64',
183: 'AArch64 (ARM64)',
243: 'RISC-V',
247: 'eBPF',
}
ELF_TYPES = {0: 'NONE', 1: 'REL', 2: 'EXEC', 3: 'DYN', 4: 'CORE'}
ELF_OSABI = {
0: 'UNIX System V', 1: 'HP-UX', 2: 'NetBSD', 3: 'Linux',
6: 'Solaris', 7: 'AIX', 8: 'IRIX', 9: 'FreeBSD', 12: 'OpenBSD',
}
ELF_SH_TYPES = {
0: 'NULL', 1: 'PROGBITS', 2: 'SYMTAB', 3: 'STRTAB', 4: 'RELA',
5: 'HASH', 6: 'DYNAMIC', 7: 'NOTE', 8: 'NOBITS', 9: 'REL',
11: 'DYNSYM',
}
ELF_PT_TYPES = {
0: 'NULL', 1: 'LOAD', 2: 'DYNAMIC', 3: 'INTERP', 4: 'NOTE',
5: 'SHLIB', 6: 'PHDR', 7: 'TLS',
0x6474e550: 'GNU_EH_FRAME', 0x6474e551: 'GNU_STACK',
0x6474e552: 'GNU_RELRO', 0x6474e553: 'GNU_PROPERTY',
}
# ── ReverseEngineer Class ────────────────────────────────────────────────────
class ReverseEngineer:
"""Comprehensive binary analysis and reverse engineering toolkit."""
_instance = None
def __init__(self):
data_dir = get_data_dir() if callable(get_data_dir) else get_data_dir
self.storage_dir = Path(str(data_dir)) / 'reverse_eng'
self.yara_rules_dir = self.storage_dir / 'yara_rules'
self.cache_dir = self.storage_dir / 'cache'
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.yara_rules_dir.mkdir(parents=True, exist_ok=True)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._analysis_cache: Dict[str, Any] = {}
# ── File Type Detection ──────────────────────────────────────────────
def get_file_type(self, file_path: str) -> Dict[str, str]:
"""Identify file type from magic bytes."""
p = Path(file_path)
if not p.exists() or not p.is_file():
return {'type': 'unknown', 'error': 'File not found'}
try:
with open(p, 'rb') as f:
header = f.read(16)
except Exception as e:
return {'type': 'unknown', 'error': str(e)}
if len(header) < 2:
return {'type': 'empty', 'description': 'File too small'}
# Check magic bytes, longest match first
for magic, file_type in sorted(MAGIC_BYTES.items(), key=lambda x: -len(x[0])):
if header[:len(magic)] == magic:
return {'type': file_type, 'magic_hex': magic.hex()}
# Heuristic: check if text file
try:
with open(p, 'rb') as f:
sample = f.read(8192)
text_chars = set(range(7, 14)) | set(range(32, 127)) | {0}
non_text = sum(1 for b in sample if b not in text_chars)
if non_text / max(len(sample), 1) < 0.05:
return {'type': 'Text', 'description': 'ASCII/UTF-8 text file'}
except Exception:
pass
return {'type': 'unknown', 'magic_hex': header[:8].hex()}
# ── Entropy Calculation ──────────────────────────────────────────────
def calculate_entropy(self, data: bytes) -> float:
"""Calculate Shannon entropy of byte data. Returns 0.0 to 8.0."""
if not data:
return 0.0
freq = [0] * 256
for b in data:
freq[b] += 1
length = len(data)
entropy = 0.0
for count in freq:
if count > 0:
p = count / length
entropy -= p * math.log2(p)
return round(entropy, 4)
def section_entropy(self, file_path: str) -> List[Dict[str, Any]]:
"""Calculate entropy per section for PE/ELF binaries."""
ft = self.get_file_type(file_path)
file_type = ft.get('type', '')
results = []
if file_type == 'PE':
pe_info = self.parse_pe(file_path)
if 'error' not in pe_info:
with open(file_path, 'rb') as f:
for sec in pe_info.get('sections', []):
offset = sec.get('raw_offset', 0)
size = sec.get('raw_size', 0)
if size > 0 and offset > 0:
f.seek(offset)
data = f.read(size)
ent = self.calculate_entropy(data)
results.append({
'name': sec.get('name', ''),
'offset': offset,
'size': size,
'entropy': ent,
'packed': ent > 7.0,
})
elif file_type == 'ELF':
elf_info = self.parse_elf(file_path)
if 'error' not in elf_info:
with open(file_path, 'rb') as f:
for sec in elf_info.get('sections', []):
offset = sec.get('offset', 0)
size = sec.get('size', 0)
if size > 0 and offset > 0:
f.seek(offset)
data = f.read(size)
ent = self.calculate_entropy(data)
results.append({
'name': sec.get('name', ''),
'offset': offset,
'size': size,
'entropy': ent,
'packed': ent > 7.0,
})
return results
# ── Comprehensive Binary Analysis ────────────────────────────────────
def analyze_binary(self, file_path: str) -> Dict[str, Any]:
"""Comprehensive binary analysis: type, hashes, entropy, strings, architecture."""
p = Path(file_path)
if not p.exists() or not p.is_file():
return {'error': f'File not found: {file_path}'}
stat = p.stat()
# Read file data
try:
with open(p, 'rb') as f:
data = f.read()
except Exception as e:
return {'error': f'Cannot read file: {e}'}
# File type
file_type = self.get_file_type(file_path)
# Hashes
hashes = {
'md5': hashlib.md5(data).hexdigest(),
'sha1': hashlib.sha1(data).hexdigest(),
'sha256': hashlib.sha256(data).hexdigest(),
}
# Overall entropy
overall_entropy = self.calculate_entropy(data)
# Section entropy
sec_entropy = self.section_entropy(file_path)
# Architecture detection
arch = 'unknown'
ftype = file_type.get('type', '')
if ftype == 'PE':
pe = self.parse_pe(file_path)
arch = pe.get('machine_str', 'unknown')
elif ftype == 'ELF':
elf = self.parse_elf(file_path)
arch = elf.get('machine_str', 'unknown')
# Extract strings (limited to first 1MB for speed)
sample = data[:1024 * 1024]
strings = self._extract_strings_from_data(sample, min_length=4)
# Packer detection
packer = self.detect_packer(file_path)
result = {
'file': str(p.absolute()),
'name': p.name,
'size': stat.st_size,
'size_human': self._human_size(stat.st_size),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'file_type': file_type,
'architecture': arch,
'hashes': hashes,
'entropy': overall_entropy,
'entropy_level': 'high' if overall_entropy > 7.0 else ('medium' if overall_entropy > 6.0 else 'low'),
'section_entropy': sec_entropy,
'strings_count': len(strings),
'strings_preview': strings[:100],
'packer': packer,
}
# Add imports/exports if applicable
if ftype == 'PE':
result['imports'] = self.get_imports(file_path)
result['exports'] = self.get_exports(file_path)
elif ftype == 'ELF':
result['imports'] = self.get_imports(file_path)
result['exports'] = self.get_exports(file_path)
# Cache result
self._analysis_cache[file_path] = result
return result
# ── PE Parsing ───────────────────────────────────────────────────────
def parse_pe(self, file_path: str) -> Dict[str, Any]:
"""Parse PE (Portable Executable) headers using struct.unpack."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found'}
try:
with open(p, 'rb') as f:
data = f.read()
except Exception as e:
return {'error': str(e)}
if len(data) < 64 or data[:2] != b'\x4d\x5a':
return {'error': 'Not a valid PE file (missing MZ header)'}
# DOS Header
e_lfanew = struct.unpack_from('<I', data, 0x3C)[0]
if e_lfanew + 24 > len(data):
return {'error': 'Invalid PE offset'}
# PE Signature
pe_sig = data[e_lfanew:e_lfanew + 4]
if pe_sig != b'PE\x00\x00':
return {'error': 'Invalid PE signature'}
# COFF Header (20 bytes after PE signature)
coff_offset = e_lfanew + 4
if coff_offset + 20 > len(data):
return {'error': 'Truncated COFF header'}
machine, num_sections, time_stamp, sym_table_ptr, num_symbols, \
opt_header_size, characteristics = struct.unpack_from(
'<HHIIIHH', data, coff_offset)
machine_str = PE_MACHINE_TYPES.get(machine, f'Unknown (0x{machine:04x})')
timestamp_str = datetime.utcfromtimestamp(time_stamp).isoformat() if time_stamp else 'N/A'
# Optional Header
opt_offset = coff_offset + 20
if opt_offset + 2 > len(data):
return {'error': 'Truncated optional header'}
opt_magic = struct.unpack_from('<H', data, opt_offset)[0]
is_pe32_plus = (opt_magic == 0x20b) # PE32+ (64-bit)
is_pe32 = (opt_magic == 0x10b) # PE32 (32-bit)
opt_info = {'magic': f'0x{opt_magic:04x}', 'format': 'PE32+' if is_pe32_plus else 'PE32'}
if is_pe32_plus and opt_offset + 112 <= len(data):
_, major_link, minor_link, code_size, init_data, uninit_data, \
entry_point, code_base = struct.unpack_from('<HBBIIIII', data, opt_offset)
image_base = struct.unpack_from('<Q', data, opt_offset + 24)[0]
section_align, file_align = struct.unpack_from('<II', data, opt_offset + 32)[0:2]
opt_info.update({
'linker_version': f'{major_link}.{minor_link}',
'code_size': code_size,
'entry_point': f'0x{entry_point:08x}',
'image_base': f'0x{image_base:016x}',
'section_alignment': section_align,
'file_alignment': file_align,
})
elif is_pe32 and opt_offset + 96 <= len(data):
_, major_link, minor_link, code_size, init_data, uninit_data, \
entry_point, code_base = struct.unpack_from('<HBBIIIII', data, opt_offset)
image_base = struct.unpack_from('<I', data, opt_offset + 28)[0]
section_align, file_align = struct.unpack_from('<II', data, opt_offset + 32)[0:2]
opt_info.update({
'linker_version': f'{major_link}.{minor_link}',
'code_size': code_size,
'entry_point': f'0x{entry_point:08x}',
'image_base': f'0x{image_base:08x}',
'section_alignment': section_align,
'file_alignment': file_align,
})
# Parse Data Directories (for imports/exports)
data_dirs = []
if is_pe32_plus:
dd_offset = opt_offset + 112
num_dd = struct.unpack_from('<I', data, opt_offset + 108)[0] if opt_offset + 112 <= len(data) else 0
elif is_pe32:
dd_offset = opt_offset + 96
num_dd = struct.unpack_from('<I', data, opt_offset + 92)[0] if opt_offset + 96 <= len(data) else 0
else:
dd_offset = 0
num_dd = 0
dd_names = ['Export', 'Import', 'Resource', 'Exception', 'Security',
'BaseReloc', 'Debug', 'Architecture', 'GlobalPtr', 'TLS',
'LoadConfig', 'BoundImport', 'IAT', 'DelayImport', 'CLR', 'Reserved']
for i in range(min(num_dd, 16)):
off = dd_offset + i * 8
if off + 8 <= len(data):
rva, size = struct.unpack_from('<II', data, off)
if rva or size:
data_dirs.append({
'name': dd_names[i] if i < len(dd_names) else f'Dir_{i}',
'rva': f'0x{rva:08x}',
'size': size,
})
# Section Headers
sections = []
sec_offset = opt_offset + opt_header_size
for i in range(num_sections):
off = sec_offset + i * 40
if off + 40 > len(data):
break
name_raw = data[off:off + 8]
name = name_raw.rstrip(b'\x00').decode('ascii', errors='replace')
vsize, vaddr, raw_size, raw_offset, reloc_ptr, linenum_ptr, \
num_relocs, num_linenums, chars = struct.unpack_from(
'<IIIIIIHHI', data, off + 8)
flags = []
for flag_val, flag_name in PE_SECTION_FLAGS.items():
if chars & flag_val:
flags.append(flag_name)
sections.append({
'name': name,
'virtual_size': vsize,
'virtual_address': f'0x{vaddr:08x}',
'raw_size': raw_size,
'raw_offset': raw_offset,
'characteristics': f'0x{chars:08x}',
'flags': flags,
})
result = {
'format': 'PE',
'machine': f'0x{machine:04x}',
'machine_str': machine_str,
'num_sections': num_sections,
'timestamp': timestamp_str,
'timestamp_raw': time_stamp,
'characteristics': f'0x{characteristics:04x}',
'optional_header': opt_info,
'data_directories': data_dirs,
'sections': sections,
}
return result
# ── ELF Parsing ──────────────────────────────────────────────────────
def parse_elf(self, file_path: str) -> Dict[str, Any]:
"""Parse ELF (Executable and Linkable Format) headers using struct.unpack."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found'}
try:
with open(p, 'rb') as f:
data = f.read()
except Exception as e:
return {'error': str(e)}
if len(data) < 16 or data[:4] != b'\x7fELF':
return {'error': 'Not a valid ELF file'}
# ELF Identification
ei_class = data[4] # 1=32-bit, 2=64-bit
ei_data = data[5] # 1=little-endian, 2=big-endian
ei_version = data[6]
ei_osabi = data[7]
is_64 = (ei_class == 2)
endian = '<' if ei_data == 1 else '>'
bits_str = '64-bit' if is_64 else '32-bit'
endian_str = 'Little Endian' if ei_data == 1 else 'Big Endian'
# ELF Header
if is_64:
if len(data) < 64:
return {'error': 'Truncated ELF64 header'}
e_type, e_machine, e_version, e_entry, e_phoff, e_shoff, \
e_flags, e_ehsize, e_phentsize, e_phnum, e_shentsize, \
e_shnum, e_shstrndx = struct.unpack_from(
f'{endian}HHIQQQIHHHHHH', data, 16)
else:
if len(data) < 52:
return {'error': 'Truncated ELF32 header'}
e_type, e_machine, e_version, e_entry, e_phoff, e_shoff, \
e_flags, e_ehsize, e_phentsize, e_phnum, e_shentsize, \
e_shnum, e_shstrndx = struct.unpack_from(
f'{endian}HHIIIIIHHHHHH', data, 16)
machine_str = ELF_MACHINE_TYPES.get(e_machine, f'Unknown ({e_machine})')
type_str = ELF_TYPES.get(e_type, f'Unknown ({e_type})')
osabi_str = ELF_OSABI.get(ei_osabi, f'Unknown ({ei_osabi})')
# Section Headers
sections = []
shstrtab_data = b''
if e_shstrndx < e_shnum and e_shoff > 0:
strtab_off = e_shoff + e_shstrndx * e_shentsize
if is_64 and strtab_off + 64 <= len(data):
sh_offset = struct.unpack_from(f'{endian}Q', data, strtab_off + 24)[0]
sh_size = struct.unpack_from(f'{endian}Q', data, strtab_off + 32)[0]
elif not is_64 and strtab_off + 40 <= len(data):
sh_offset = struct.unpack_from(f'{endian}I', data, strtab_off + 16)[0]
sh_size = struct.unpack_from(f'{endian}I', data, strtab_off + 20)[0]
else:
sh_offset = 0
sh_size = 0
if sh_offset + sh_size <= len(data):
shstrtab_data = data[sh_offset:sh_offset + sh_size]
for i in range(e_shnum):
off = e_shoff + i * e_shentsize
if is_64:
if off + 64 > len(data):
break
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, \
sh_link, sh_info, sh_addralign, sh_entsize = struct.unpack_from(
f'{endian}IIQQQQIIQQ', data, off)
else:
if off + 40 > len(data):
break
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, \
sh_link, sh_info, sh_addralign, sh_entsize = struct.unpack_from(
f'{endian}IIIIIIIIII', data, off)
# Resolve section name from string table
name = ''
if sh_name < len(shstrtab_data):
end = shstrtab_data.index(b'\x00', sh_name) if b'\x00' in shstrtab_data[sh_name:] else len(shstrtab_data)
name = shstrtab_data[sh_name:end].decode('ascii', errors='replace')
type_name = ELF_SH_TYPES.get(sh_type, f'0x{sh_type:x}')
sections.append({
'name': name,
'type': type_name,
'type_raw': sh_type,
'flags': f'0x{sh_flags:x}',
'address': f'0x{sh_addr:x}',
'offset': sh_offset,
'size': sh_size,
'link': sh_link,
'info': sh_info,
'alignment': sh_addralign,
'entry_size': sh_entsize,
})
# Program Headers
program_headers = []
for i in range(e_phnum):
off = e_phoff + i * e_phentsize
if is_64:
if off + 56 > len(data):
break
p_type, p_flags, p_offset, p_vaddr, p_paddr, p_filesz, \
p_memsz, p_align = struct.unpack_from(
f'{endian}IIQQQQQQ', data, off)
else:
if off + 32 > len(data):
break
p_type, p_offset, p_vaddr, p_paddr, p_filesz, p_memsz, \
p_flags, p_align = struct.unpack_from(
f'{endian}IIIIIIII', data, off)
pt_name = ELF_PT_TYPES.get(p_type, f'0x{p_type:x}')
perm_str = ''
perm_str += 'R' if p_flags & 4 else '-'
perm_str += 'W' if p_flags & 2 else '-'
perm_str += 'X' if p_flags & 1 else '-'
program_headers.append({
'type': pt_name,
'type_raw': p_type,
'flags': perm_str,
'offset': f'0x{p_offset:x}',
'vaddr': f'0x{p_vaddr:x}',
'paddr': f'0x{p_paddr:x}',
'file_size': p_filesz,
'mem_size': p_memsz,
'alignment': p_align,
})
# Dynamic section symbols
dynamic = []
for sec in sections:
if sec['type'] == 'DYNAMIC' and sec['size'] > 0:
dyn_off = sec['offset']
dyn_size = sec['size']
entry_sz = 16 if is_64 else 8
for j in range(0, dyn_size, entry_sz):
off = dyn_off + j
if is_64 and off + 16 <= len(data):
d_tag, d_val = struct.unpack_from(f'{endian}qQ', data, off)
elif not is_64 and off + 8 <= len(data):
d_tag, d_val = struct.unpack_from(f'{endian}iI', data, off)
else:
break
if d_tag == 0:
break
dynamic.append({'tag': d_tag, 'value': f'0x{d_val:x}'})
result = {
'format': 'ELF',
'class': bits_str,
'endianness': endian_str,
'osabi': osabi_str,
'type': type_str,
'type_raw': e_type,
'machine': f'0x{e_machine:x}',
'machine_str': machine_str,
'entry_point': f'0x{e_entry:x}',
'flags': f'0x{e_flags:x}',
'num_sections': e_shnum,
'num_program_headers': e_phnum,
'sections': sections,
'program_headers': program_headers,
'dynamic': dynamic[:50],
}
return result
# ── String Extraction ────────────────────────────────────────────────
def _extract_strings_from_data(self, data: bytes, min_length: int = 4) -> List[Dict[str, Any]]:
"""Extract ASCII and Unicode strings from raw byte data."""
results = []
# ASCII strings
ascii_pattern = re.compile(rb'[\x20-\x7e]{' + str(min_length).encode() + rb',}')
for match in ascii_pattern.finditer(data):
results.append({
'offset': match.start(),
'string': match.group().decode('ascii', errors='replace'),
'encoding': 'ascii',
})
# UTF-16LE strings (common in PE binaries)
i = 0
while i < len(data) - 1:
# Look for sequences of printable chars with null bytes interleaved
chars = []
start = i
while i < len(data) - 1:
lo, hi = data[i], data[i + 1]
if hi == 0 and 0x20 <= lo <= 0x7e:
chars.append(chr(lo))
i += 2
else:
break
if len(chars) >= min_length:
results.append({
'offset': start,
'string': ''.join(chars),
'encoding': 'unicode',
})
else:
i += 1
# Sort by offset and deduplicate
results.sort(key=lambda x: x['offset'])
return results
def extract_strings(self, file_path: str, min_length: int = 4,
encoding: str = 'both') -> List[Dict[str, Any]]:
"""Extract printable strings from a binary file."""
p = Path(file_path)
if not p.exists():
return []
try:
with open(p, 'rb') as f:
data = f.read()
except Exception:
return []
results = self._extract_strings_from_data(data, min_length)
if encoding == 'ascii':
results = [s for s in results if s['encoding'] == 'ascii']
elif encoding == 'unicode':
results = [s for s in results if s['encoding'] == 'unicode']
return results
# ── Disassembly ──────────────────────────────────────────────────────
def disassemble(self, data: bytes, arch: str = 'x64', mode: str = '64',
offset: int = 0, count: int = 0) -> List[Dict[str, Any]]:
"""Disassemble raw bytes. Uses capstone if available, otherwise objdump."""
if HAS_CAPSTONE:
return self._disassemble_capstone(data, arch, mode, offset, count)
return self._disassemble_objdump(data, arch, offset, count)
def _disassemble_capstone(self, data: bytes, arch: str, mode: str,
offset: int, count: int) -> List[Dict[str, Any]]:
"""Disassemble using capstone."""
arch_map = {
'x86': (capstone.CS_ARCH_X86, capstone.CS_MODE_32),
'x64': (capstone.CS_ARCH_X86, capstone.CS_MODE_64),
'arm': (capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
'arm64': (capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM),
'mips': (capstone.CS_ARCH_MIPS, capstone.CS_MODE_MIPS32),
}
cs_arch, cs_mode = arch_map.get(arch.lower(), (capstone.CS_ARCH_X86, capstone.CS_MODE_64))
md = capstone.Cs(cs_arch, cs_mode)
instructions = []
for i, (address, size, mnemonic, op_str) in enumerate(md.disasm_lite(data, offset)):
if count > 0 and i >= count:
break
inst_bytes = data[address - offset:address - offset + size]
instructions.append({
'address': f'0x{address:08x}',
'mnemonic': mnemonic,
'op_str': op_str,
'bytes_hex': inst_bytes.hex(),
'size': size,
})
return instructions
def _disassemble_objdump(self, data: bytes, arch: str,
offset: int, count: int) -> List[Dict[str, Any]]:
"""Disassemble using objdump as fallback."""
objdump = find_tool('objdump')
if not objdump:
return [{'error': 'No disassembler available. Install capstone (pip install capstone) or objdump.'}]
# Write data to temporary file
with tempfile.NamedTemporaryFile(suffix='.bin', delete=False) as tmp:
tmp.write(data)
tmp_path = tmp.name
try:
arch_flag = {
'x86': 'i386', 'x64': 'i386:x86-64',
'arm': 'arm', 'arm64': 'aarch64',
}.get(arch.lower(), 'i386:x86-64')
cmd = [objdump, '-D', '-b', 'binary', '-m', arch_flag, tmp_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
instructions = []
for line in result.stdout.splitlines():
match = re.match(r'\s*([0-9a-f]+):\s+([0-9a-f ]+?)\s+(\w+)\s*(.*)', line)
if match:
addr_str, bytes_hex, mnemonic, op_str = match.groups()
instructions.append({
'address': f'0x{int(addr_str, 16) + offset:08x}',
'mnemonic': mnemonic.strip(),
'op_str': op_str.strip(),
'bytes_hex': bytes_hex.replace(' ', ''),
})
if count > 0 and len(instructions) >= count:
break
return instructions
except Exception as e:
return [{'error': f'objdump failed: {e}'}]
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
def disassemble_file(self, file_path: str, section: str = '.text',
offset: int = 0, count: int = 100) -> List[Dict[str, Any]]:
"""Disassemble a specific section of a binary file."""
p = Path(file_path)
if not p.exists():
return [{'error': 'File not found'}]
ft = self.get_file_type(file_path)
ftype = ft.get('type', '')
arch = 'x64'
sec_offset = offset
sec_size = 0
if ftype == 'PE':
pe = self.parse_pe(file_path)
if 'error' in pe:
return [{'error': pe['error']}]
machine = pe.get('machine', '')
if '14c' in machine:
arch = 'x86'
elif 'aa64' in machine:
arch = 'arm64'
elif '1c0' in machine or '1c4' in machine:
arch = 'arm'
for sec in pe.get('sections', []):
if sec['name'].strip('\x00') == section.strip('.'):
sec_offset = sec['raw_offset'] + offset
sec_size = sec['raw_size']
break
elif sec['name'].strip('\x00').lower() == section.lstrip('.').lower():
sec_offset = sec['raw_offset'] + offset
sec_size = sec['raw_size']
break
elif ftype == 'ELF':
elf = self.parse_elf(file_path)
if 'error' in elf:
return [{'error': elf['error']}]
machine_str = elf.get('machine_str', '')
if 'x86-64' in machine_str:
arch = 'x64'
elif 'x86' in machine_str:
arch = 'x86'
elif 'ARM64' in machine_str or 'AArch64' in machine_str:
arch = 'arm64'
elif 'ARM' in machine_str:
arch = 'arm'
for sec in elf.get('sections', []):
if sec['name'] == section:
sec_offset = sec['offset'] + offset
sec_size = sec['size']
break
# Read section data
try:
with open(p, 'rb') as f:
if sec_size > 0:
f.seek(sec_offset)
data = f.read(min(sec_size, 0x10000))
else:
f.seek(sec_offset)
data = f.read(0x10000)
except Exception as e:
return [{'error': f'Cannot read file: {e}'}]
return self.disassemble(data, arch=arch, offset=sec_offset, count=count)
# ── YARA Scanning ────────────────────────────────────────────────────
def yara_scan(self, file_path: str, rules_path: Optional[str] = None,
rules_string: Optional[str] = None) -> Dict[str, Any]:
"""Scan a file with YARA rules."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found', 'matches': []}
if HAS_YARA:
return self._yara_scan_python(file_path, rules_path, rules_string)
return self._yara_scan_cli(file_path, rules_path, rules_string)
def _yara_scan_python(self, file_path: str, rules_path: Optional[str],
rules_string: Optional[str]) -> Dict[str, Any]:
"""Scan using yara-python library."""
try:
if rules_string:
rules = yara.compile(source=rules_string)
elif rules_path:
rules = yara.compile(filepath=rules_path)
else:
# Use all rules in yara_rules directory
rule_files = list(self.yara_rules_dir.glob('*.yar')) + \
list(self.yara_rules_dir.glob('*.yara'))
if not rule_files:
return {'error': 'No YARA rules found', 'matches': []}
sources = {}
for rf in rule_files:
ns = rf.stem
sources[ns] = str(rf)
rules = yara.compile(filepaths=sources)
matches = rules.match(file_path)
results = []
for match in matches:
strings_found = []
for string_match in match.strings:
for instance in string_match.instances:
strings_found.append({
'offset': instance.offset,
'identifier': string_match.identifier,
'data': instance.matched_data.hex() if len(instance.matched_data) <= 64 else instance.matched_data[:64].hex() + '...',
})
results.append({
'rule': match.rule,
'namespace': match.namespace,
'tags': list(match.tags),
'meta': dict(match.meta) if match.meta else {},
'strings': strings_found,
})
return {'matches': results, 'total': len(results), 'engine': 'yara-python'}
except Exception as e:
return {'error': str(e), 'matches': []}
def _yara_scan_cli(self, file_path: str, rules_path: Optional[str],
rules_string: Optional[str]) -> Dict[str, Any]:
"""Scan using yara CLI tool as fallback."""
yara_bin = find_tool('yara')
if not yara_bin:
return {'error': 'YARA not available. Install yara-python (pip install yara-python) or yara CLI.', 'matches': []}
try:
if rules_string:
with tempfile.NamedTemporaryFile(suffix='.yar', mode='w', delete=False) as tmp:
tmp.write(rules_string)
tmp_rules = tmp.name
rules_file = tmp_rules
elif rules_path:
rules_file = rules_path
tmp_rules = None
else:
rule_files = list(self.yara_rules_dir.glob('*.yar')) + \
list(self.yara_rules_dir.glob('*.yara'))
if not rule_files:
return {'error': 'No YARA rules found', 'matches': []}
rules_file = str(rule_files[0])
tmp_rules = None
cmd = [yara_bin, '-s', rules_file, file_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
matches = []
current_rule = None
for line in result.stdout.splitlines():
rule_match = re.match(r'^(\S+)\s+\S+$', line)
if rule_match and ':' not in line:
current_rule = {'rule': rule_match.group(1), 'strings': []}
matches.append(current_rule)
elif current_rule and ':' in line:
parts = line.strip().split(':', 2)
if len(parts) >= 3:
current_rule['strings'].append({
'offset': int(parts[0], 0) if parts[0].strip() else 0,
'identifier': parts[1].strip(),
'data': parts[2].strip(),
})
if tmp_rules:
os.unlink(tmp_rules)
return {'matches': matches, 'total': len(matches), 'engine': 'yara-cli'}
except Exception as e:
return {'error': str(e), 'matches': []}
def list_yara_rules(self) -> List[Dict[str, str]]:
"""List available YARA rule files."""
rules = []
for ext in ('*.yar', '*.yara'):
for f in self.yara_rules_dir.glob(ext):
stat = f.stat()
rules.append({
'name': f.name,
'path': str(f),
'size': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
})
return sorted(rules, key=lambda x: x['name'])
# ── Packer Detection ─────────────────────────────────────────────────
def detect_packer(self, file_path: str) -> Dict[str, Any]:
"""Detect common executable packers."""
p = Path(file_path)
if not p.exists():
return {'detected': False, 'error': 'File not found'}
try:
with open(p, 'rb') as f:
data = f.read()
except Exception as e:
return {'detected': False, 'error': str(e)}
ft = self.get_file_type(file_path)
detections = []
# Check magic byte signatures in file body
for packer, sig_info in PACKER_SIGNATURES.items():
score = 0
evidence = []
# Check for magic byte patterns
for pattern in sig_info.get('magic', []):
idx = data.find(pattern)
if idx != -1:
score += 40
evidence.append(f'Magic pattern at offset 0x{idx:x}')
# Check section names (for PE files)
if ft.get('type') == 'PE':
pe = self.parse_pe(file_path)
if 'error' not in pe:
for sec in pe.get('sections', []):
sec_name = sec['name'].encode('ascii', errors='ignore')
for packer_sec in sig_info.get('section_names', []):
if sec_name.rstrip(b'\x00').startswith(packer_sec.rstrip(b'\x00')):
score += 50
evidence.append(f'Section name: {sec["name"]}')
if score > 0:
detections.append({
'packer': packer,
'confidence': min(score, 100),
'description': sig_info.get('description', ''),
'evidence': evidence,
})
# Heuristic checks
overall_entropy = self.calculate_entropy(data)
if overall_entropy > 7.2:
detections.append({
'packer': 'Unknown (high entropy)',
'confidence': 60,
'description': f'High overall entropy ({overall_entropy:.2f}) suggests packing or encryption',
'evidence': [f'Entropy: {overall_entropy:.4f}'],
})
# Check for small code section with high entropy (common in packed binaries)
if ft.get('type') == 'PE':
sec_ent = self.section_entropy(file_path)
high_ent_sections = [s for s in sec_ent if s.get('entropy', 0) > 7.0]
if high_ent_sections and not detections:
names = ', '.join(s['name'] for s in high_ent_sections)
detections.append({
'packer': 'Unknown (packed sections)',
'confidence': 50,
'description': f'High entropy sections detected: {names}',
'evidence': [f'{s["name"]}: entropy {s["entropy"]:.2f}' for s in high_ent_sections],
})
# Sort by confidence
detections.sort(key=lambda x: -x['confidence'])
return {
'detected': len(detections) > 0,
'detections': detections,
'overall_entropy': overall_entropy if 'overall_entropy' in dir() else self.calculate_entropy(data),
}
# ── Hex Dump ─────────────────────────────────────────────────────────
def hex_dump(self, file_path: str, offset: int = 0, length: int = 256) -> Dict[str, Any]:
"""Generate formatted hex dump of a file region."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found'}
try:
file_size = p.stat().st_size
with open(p, 'rb') as f:
f.seek(offset)
data = f.read(length)
except Exception as e:
return {'error': str(e)}
lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = ' '.join(f'{b:02x}' for b in chunk)
# Add spacing between 8-byte groups
if len(chunk) > 8:
hex_bytes = [f'{b:02x}' for b in chunk]
hex_part = ' '.join(hex_bytes[:8]) + ' ' + ' '.join(hex_bytes[8:])
ascii_part = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
lines.append({
'offset': f'{offset + i:08x}',
'hex': hex_part,
'ascii': ascii_part,
})
# Also produce a formatted text version
text_lines = []
for line in lines:
text_lines.append(f'{line["offset"]} {line["hex"]:<49} |{line["ascii"]}|')
return {
'offset': offset,
'length': len(data),
'file_size': file_size,
'lines': lines,
'text': '\n'.join(text_lines),
}
def hex_search(self, file_path: str, pattern: str) -> Dict[str, Any]:
"""Search for a hex pattern in a binary file. Pattern: space/dash separated hex bytes."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found', 'matches': []}
# Parse hex pattern
clean = re.sub(r'[^0-9a-fA-F?]', '', pattern.replace('??', 'FF'))
if len(clean) % 2 != 0:
return {'error': 'Invalid hex pattern (odd number of nibbles)', 'matches': []}
try:
search_bytes = bytes.fromhex(re.sub(r'[^0-9a-fA-F]', '', pattern.replace(' ', '').replace('-', '')))
except ValueError:
return {'error': 'Invalid hex pattern', 'matches': []}
try:
with open(p, 'rb') as f:
data = f.read()
except Exception as e:
return {'error': str(e), 'matches': []}
matches = []
start = 0
while True:
idx = data.find(search_bytes, start)
if idx == -1:
break
context = data[max(0, idx - 8):idx + len(search_bytes) + 8]
matches.append({
'offset': idx,
'offset_hex': f'0x{idx:08x}',
'context': context.hex(),
})
start = idx + 1
if len(matches) >= 1000:
break
return {
'pattern': search_bytes.hex(),
'matches': matches,
'total': len(matches),
'file_size': len(data),
}
# ── Binary Comparison ────────────────────────────────────────────────
def compare_binaries(self, file1: str, file2: str) -> Dict[str, Any]:
"""Compare two binary files: sizes, hashes, section diffs, byte-level changes."""
p1, p2 = Path(file1), Path(file2)
if not p1.exists():
return {'error': f'File not found: {file1}'}
if not p2.exists():
return {'error': f'File not found: {file2}'}
try:
with open(p1, 'rb') as f:
data1 = f.read()
with open(p2, 'rb') as f:
data2 = f.read()
except Exception as e:
return {'error': str(e)}
# Size comparison
size1, size2 = len(data1), len(data2)
# Hashes
hashes1 = {
'md5': hashlib.md5(data1).hexdigest(),
'sha256': hashlib.sha256(data1).hexdigest(),
}
hashes2 = {
'md5': hashlib.md5(data2).hexdigest(),
'sha256': hashlib.sha256(data2).hexdigest(),
}
identical = hashes1['sha256'] == hashes2['sha256']
# Byte-level diff summary
min_len = min(len(data1), len(data2))
diff_count = 0
diff_regions = []
in_diff = False
diff_start = 0
for i in range(min_len):
if data1[i] != data2[i]:
diff_count += 1
if not in_diff:
in_diff = True
diff_start = i
else:
if in_diff:
in_diff = False
diff_regions.append({
'offset': f'0x{diff_start:08x}',
'length': i - diff_start,
})
if in_diff:
diff_regions.append({
'offset': f'0x{diff_start:08x}',
'length': min_len - diff_start,
})
# Add difference for size mismatch
if size1 != size2:
diff_count += abs(size1 - size2)
# Section-level comparison for PE/ELF
section_diffs = []
ft1 = self.get_file_type(file1)
ft2 = self.get_file_type(file2)
if ft1.get('type') == ft2.get('type') and ft1.get('type') in ('PE', 'ELF'):
if ft1['type'] == 'PE':
pe1, pe2 = self.parse_pe(file1), self.parse_pe(file2)
secs1 = {s['name']: s for s in pe1.get('sections', [])}
secs2 = {s['name']: s for s in pe2.get('sections', [])}
else:
elf1, elf2 = self.parse_elf(file1), self.parse_elf(file2)
secs1 = {s['name']: s for s in elf1.get('sections', [])}
secs2 = {s['name']: s for s in elf2.get('sections', [])}
all_names = sorted(set(list(secs1.keys()) + list(secs2.keys())))
for name in all_names:
s1 = secs1.get(name)
s2 = secs2.get(name)
if s1 and s2:
size_key = 'raw_size' if ft1['type'] == 'PE' else 'size'
section_diffs.append({
'name': name,
'status': 'modified' if s1.get(size_key) != s2.get(size_key) else 'unchanged',
'size_file1': s1.get(size_key, 0),
'size_file2': s2.get(size_key, 0),
})
elif s1:
section_diffs.append({'name': name, 'status': 'removed'})
else:
section_diffs.append({'name': name, 'status': 'added'})
# Entropy comparison
ent1 = self.calculate_entropy(data1)
ent2 = self.calculate_entropy(data2)
return {
'file1': {'name': p1.name, 'size': size1, 'hashes': hashes1, 'entropy': ent1},
'file2': {'name': p2.name, 'size': size2, 'hashes': hashes2, 'entropy': ent2},
'identical': identical,
'diff_bytes': diff_count,
'diff_percentage': round((diff_count / max(max(size1, size2), 1)) * 100, 2),
'diff_regions': diff_regions[:100],
'diff_regions_total': len(diff_regions),
'section_diffs': section_diffs,
}
# ── Ghidra Integration ───────────────────────────────────────────────
def ghidra_decompile(self, file_path: str, function: Optional[str] = None) -> Dict[str, Any]:
"""Run Ghidra headless analysis and return decompiled output."""
p = Path(file_path)
if not p.exists():
return {'error': 'File not found'}
analyze_headless = find_tool('analyzeHeadless')
if not analyze_headless:
# Try common Ghidra install locations
ghidra_paths = []
if os.name == 'nt':
for drive in ['C', 'D']:
ghidra_paths.extend([
Path(f'{drive}:/ghidra/support/analyzeHeadless.bat'),
Path(f'{drive}:/Program Files/ghidra/support/analyzeHeadless.bat'),
])
else:
ghidra_paths.extend([
Path('/opt/ghidra/support/analyzeHeadless'),
Path('/usr/local/ghidra/support/analyzeHeadless'),
Path.home() / 'ghidra' / 'support' / 'analyzeHeadless',
])
for gp in ghidra_paths:
if gp.exists():
analyze_headless = str(gp)
break
if not analyze_headless:
return {'error': 'Ghidra not found. Install Ghidra and ensure analyzeHeadless is in PATH.'}
# Create temporary project directory
with tempfile.TemporaryDirectory(prefix='autarch_ghidra_') as tmp_dir:
project_name = 'autarch_analysis'
cmd = [
analyze_headless,
tmp_dir, project_name,
'-import', str(p),
'-postScript', 'DecompileHeadless.java',
'-scriptlog', os.path.join(tmp_dir, 'script.log'),
'-deleteProject',
]
if function:
cmd.extend(['-scriptArgs', function])
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300,
cwd=tmp_dir)
output = result.stdout + '\n' + result.stderr
# Try to read script output
log_path = os.path.join(tmp_dir, 'script.log')
script_output = ''
if os.path.exists(log_path):
with open(log_path, 'r') as f:
script_output = f.read()
return {
'output': output,
'script_output': script_output,
'return_code': result.returncode,
'function': function,
}
except subprocess.TimeoutExpired:
return {'error': 'Ghidra analysis timed out (300s limit)'}
except Exception as e:
return {'error': f'Ghidra execution failed: {e}'}
# ── Import / Export Extraction ───────────────────────────────────────
def get_imports(self, file_path: str) -> List[Dict[str, Any]]:
"""Extract imported functions from PE or ELF binary."""
ft = self.get_file_type(file_path)
ftype = ft.get('type', '')
if ftype == 'PE':
return self._get_pe_imports(file_path)
elif ftype == 'ELF':
return self._get_elf_imports(file_path)
return []
def _get_pe_imports(self, file_path: str) -> List[Dict[str, Any]]:
"""Parse PE import directory table."""
pe = self.parse_pe(file_path)
if 'error' in pe:
return []
try:
with open(file_path, 'rb') as f:
data = f.read()
except Exception:
return []
# Find import directory RVA
import_dir = None
for dd in pe.get('data_directories', []):
if dd['name'] == 'Import':
import_dir = dd
break
if not import_dir:
return []
import_rva = int(import_dir['rva'], 16)
if import_rva == 0:
return []
# Convert RVA to file offset using section mapping
sections = pe.get('sections', [])
def rva_to_offset(rva):
for sec in sections:
sec_va = int(sec['virtual_address'], 16)
sec_raw = sec['raw_offset']
sec_vs = sec['virtual_size']
if sec_va <= rva < sec_va + sec_vs:
return sec_raw + (rva - sec_va)
return rva
imports = []
offset = rva_to_offset(import_rva)
# Read Import Directory entries (20 bytes each)
while offset + 20 <= len(data):
ilt_rva, timestamp, forwarder, name_rva, iat_rva = struct.unpack_from('<IIIII', data, offset)
if ilt_rva == 0 and name_rva == 0:
break # End of import directory
# Read DLL name
name_off = rva_to_offset(name_rva)
dll_name = ''
if name_off < len(data):
end = data.find(b'\x00', name_off)
if end != -1:
dll_name = data[name_off:end].decode('ascii', errors='replace')
# Read import names from ILT (or IAT if ILT is 0)
lookup_rva = ilt_rva if ilt_rva else iat_rva
func_offset = rva_to_offset(lookup_rva)
functions = []
is_64 = pe.get('optional_header', {}).get('format') == 'PE32+'
entry_size = 8 if is_64 else 4
func_count = 0
while func_offset + entry_size <= len(data) and func_count < 500:
if is_64:
entry = struct.unpack_from('<Q', data, func_offset)[0]
ordinal_flag = 1 << 63
else:
entry = struct.unpack_from('<I', data, func_offset)[0]
ordinal_flag = 1 << 31
if entry == 0:
break
if entry & ordinal_flag:
ordinal = entry & 0xFFFF
functions.append({'name': f'Ordinal_{ordinal}', 'ordinal': ordinal})
else:
hint_off = rva_to_offset(entry & 0x7FFFFFFF)
if hint_off + 2 < len(data):
hint = struct.unpack_from('<H', data, hint_off)[0]
name_end = data.find(b'\x00', hint_off + 2)
if name_end != -1 and name_end - (hint_off + 2) < 256:
func_name = data[hint_off + 2:name_end].decode('ascii', errors='replace')
functions.append({'name': func_name, 'hint': hint})
func_offset += entry_size
func_count += 1
imports.append({
'library': dll_name,
'functions': functions,
'count': len(functions),
})
offset += 20
return imports
def _get_elf_imports(self, file_path: str) -> List[Dict[str, Any]]:
"""Extract imported symbols from ELF dynamic symbol table."""
elf = self.parse_elf(file_path)
if 'error' in elf:
return []
try:
with open(file_path, 'rb') as f:
data = f.read()
except Exception:
return []
is_64 = '64-bit' in elf.get('class', '')
endian = '<' if 'Little' in elf.get('endianness', 'Little') else '>'
# Find .dynsym and .dynstr sections
dynsym_sec = None
dynstr_sec = None
for sec in elf.get('sections', []):
if sec['name'] == '.dynsym':
dynsym_sec = sec
elif sec['name'] == '.dynstr':
dynstr_sec = sec
if not dynsym_sec or not dynstr_sec:
return []
# Read string table
str_off = dynstr_sec['offset']
str_size = dynstr_sec['size']
if str_off + str_size > len(data):
return []
strtab = data[str_off:str_off + str_size]
# Read symbol table
sym_off = dynsym_sec['offset']
sym_size = dynsym_sec['size']
entry_size = 24 if is_64 else 16
imports = []
for i in range(0, sym_size, entry_size):
off = sym_off + i
if is_64 and off + 24 <= len(data):
st_name, st_info, st_other, st_shndx, st_value, st_size = struct.unpack_from(
f'{endian}IBBHQQ', data, off)
elif not is_64 and off + 16 <= len(data):
st_name, st_value, st_size, st_info, st_other, st_shndx = struct.unpack_from(
f'{endian}IIIBBH', data, off)
else:
break
# Undefined symbols (imports) have shndx == 0
if st_shndx == 0 and st_name > 0 and st_name < len(strtab):
end = strtab.find(b'\x00', st_name)
if end != -1:
sym_name = strtab[st_name:end].decode('ascii', errors='replace')
if sym_name:
bind = (st_info >> 4) & 0xf
sym_type = st_info & 0xf
bind_str = {0: 'LOCAL', 1: 'GLOBAL', 2: 'WEAK'}.get(bind, str(bind))
type_str = {0: 'NOTYPE', 1: 'OBJECT', 2: 'FUNC'}.get(sym_type, str(sym_type))
imports.append({
'name': sym_name,
'bind': bind_str,
'type': type_str,
})
# Group by library if possible (from NEEDED entries in dynamic section)
needed_libs = []
for dyn in elf.get('dynamic', []):
if dyn['tag'] == 1: # DT_NEEDED
val = int(dyn['value'], 16)
if val < len(strtab):
end = strtab.find(b'\x00', val)
if end != -1:
needed_libs.append(strtab[val:end].decode('ascii', errors='replace'))
result = [{'library': lib, 'functions': [], 'count': 0} for lib in needed_libs]
if imports:
ungrouped = {'library': '(dynamic imports)', 'functions': imports, 'count': len(imports)}
result.append(ungrouped)
return result
def get_exports(self, file_path: str) -> List[Dict[str, Any]]:
"""Extract exported functions from PE or ELF binary."""
ft = self.get_file_type(file_path)
ftype = ft.get('type', '')
if ftype == 'PE':
return self._get_pe_exports(file_path)
elif ftype == 'ELF':
return self._get_elf_exports(file_path)
return []
def _get_pe_exports(self, file_path: str) -> List[Dict[str, Any]]:
"""Parse PE export directory table."""
pe = self.parse_pe(file_path)
if 'error' in pe:
return []
try:
with open(file_path, 'rb') as f:
data = f.read()
except Exception:
return []
export_dir = None
for dd in pe.get('data_directories', []):
if dd['name'] == 'Export':
export_dir = dd
break
if not export_dir:
return []
export_rva = int(export_dir['rva'], 16)
if export_rva == 0:
return []
sections = pe.get('sections', [])
def rva_to_offset(rva):
for sec in sections:
sec_va = int(sec['virtual_address'], 16)
sec_raw = sec['raw_offset']
sec_vs = sec['virtual_size']
if sec_va <= rva < sec_va + sec_vs:
return sec_raw + (rva - sec_va)
return rva
offset = rva_to_offset(export_rva)
if offset + 40 > len(data):
return []
_, timestamp, major_ver, minor_ver, name_rva, ordinal_base, \
num_functions, num_names, addr_functions_rva, addr_names_rva, \
addr_ordinals_rva = struct.unpack_from('<IIHHIIIIIII', data, offset)
exports = []
names_offset = rva_to_offset(addr_names_rva)
ordinals_offset = rva_to_offset(addr_ordinals_rva)
functions_offset = rva_to_offset(addr_functions_rva)
for i in range(min(num_names, 2000)):
if names_offset + (i + 1) * 4 > len(data):
break
name_rva = struct.unpack_from('<I', data, names_offset + i * 4)[0]
name_off = rva_to_offset(name_rva)
if name_off < len(data):
end = data.find(b'\x00', name_off)
if end != -1 and end - name_off < 256:
func_name = data[name_off:end].decode('ascii', errors='replace')
else:
func_name = f'<unknown_{i}>'
else:
func_name = f'<unknown_{i}>'
ordinal = 0
if ordinals_offset + (i + 1) * 2 <= len(data):
ordinal = struct.unpack_from('<H', data, ordinals_offset + i * 2)[0]
func_rva = 0
func_idx = ordinal
if functions_offset + (func_idx + 1) * 4 <= len(data):
func_rva = struct.unpack_from('<I', data, functions_offset + func_idx * 4)[0]
exports.append({
'name': func_name,
'ordinal': ordinal + ordinal_base,
'address': f'0x{func_rva:08x}',
})
return exports
def _get_elf_exports(self, file_path: str) -> List[Dict[str, Any]]:
"""Extract exported (defined GLOBAL/WEAK) symbols from ELF."""
elf = self.parse_elf(file_path)
if 'error' in elf:
return []
try:
with open(file_path, 'rb') as f:
data = f.read()
except Exception:
return []
is_64 = '64-bit' in elf.get('class', '')
endian = '<' if 'Little' in elf.get('endianness', 'Little') else '>'
# Find .dynsym and .dynstr
dynsym_sec = None
dynstr_sec = None
for sec in elf.get('sections', []):
if sec['name'] == '.dynsym':
dynsym_sec = sec
elif sec['name'] == '.dynstr':
dynstr_sec = sec
if not dynsym_sec or not dynstr_sec:
return []
str_off = dynstr_sec['offset']
str_size = dynstr_sec['size']
if str_off + str_size > len(data):
return []
strtab = data[str_off:str_off + str_size]
sym_off = dynsym_sec['offset']
sym_size = dynsym_sec['size']
entry_size = 24 if is_64 else 16
exports = []
for i in range(0, sym_size, entry_size):
off = sym_off + i
if is_64 and off + 24 <= len(data):
st_name, st_info, st_other, st_shndx, st_value, st_size = struct.unpack_from(
f'{endian}IBBHQQ', data, off)
elif not is_64 and off + 16 <= len(data):
st_name, st_value, st_size, st_info, st_other, st_shndx = struct.unpack_from(
f'{endian}IIIBBH', data, off)
else:
break
# Exported = defined (shndx != 0) and GLOBAL or WEAK binding
bind = (st_info >> 4) & 0xf
if st_shndx != 0 and bind in (1, 2) and st_name > 0 and st_name < len(strtab):
end = strtab.find(b'\x00', st_name)
if end != -1:
sym_name = strtab[st_name:end].decode('ascii', errors='replace')
if sym_name:
sym_type = st_info & 0xf
type_str = {0: 'NOTYPE', 1: 'OBJECT', 2: 'FUNC'}.get(sym_type, str(sym_type))
exports.append({
'name': sym_name,
'address': f'0x{st_value:x}',
'type': type_str,
'size': st_size,
})
return exports
# ── Utility Methods ──────────────────────────────────────────────────
@staticmethod
def _human_size(size: int) -> str:
"""Convert bytes to human-readable string."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f'{size:.1f} {unit}' if unit != 'B' else f'{size} {unit}'
size /= 1024
return f'{size:.1f} PB'
def print_status(self, message: str, status: str = "info"):
colors = {"info": Colors.CYAN, "success": Colors.GREEN,
"warning": Colors.YELLOW, "error": Colors.RED}
symbols = {"info": "*", "success": "+", "warning": "!", "error": "X"}
print(f"{colors.get(status, Colors.WHITE)}[{symbols.get(status, '*')}] {message}{Colors.RESET}")
# ── CLI Interface ────────────────────────────────────────────────────
def show_menu(self):
clear_screen()
display_banner()
print(f"{Colors.CYAN}{Colors.BOLD} Reverse Engineering Toolkit{Colors.RESET}")
print(f"{Colors.DIM} Binary analysis, disassembly & YARA scanning{Colors.RESET}")
print(f"{Colors.DIM} {'=' * 50}{Colors.RESET}")
print()
print(f" {Colors.CYAN}[1]{Colors.RESET} Analyze Binary")
print(f" {Colors.CYAN}[2]{Colors.RESET} Disassemble")
print(f" {Colors.CYAN}[3]{Colors.RESET} YARA Scan")
print(f" {Colors.CYAN}[4]{Colors.RESET} Hex Dump")
print(f" {Colors.CYAN}[5]{Colors.RESET} Detect Packer")
print(f" {Colors.CYAN}[6]{Colors.RESET} Compare Binaries")
print()
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
print()
def cli_analyze(self):
filepath = input(f"{Colors.WHITE} Enter file path: {Colors.RESET}").strip()
if not filepath:
return
result = self.analyze_binary(filepath)
if 'error' in result:
self.print_status(result['error'], 'error')
return
print(f"\n{Colors.CYAN}{'=' * 60}{Colors.RESET}")
print(f" {Colors.BOLD}{result['name']}{Colors.RESET}")
print(f" Type: {result['file_type'].get('type', 'unknown')} | "
f"Arch: {result['architecture']} | Size: {result['size_human']}")
print(f"\n {Colors.CYAN}Hashes:{Colors.RESET}")
for algo, val in result['hashes'].items():
print(f" {algo.upper():8} {val}")
print(f"\n {Colors.CYAN}Entropy:{Colors.RESET} {result['entropy']} ({result['entropy_level']})")
if result['section_entropy']:
for s in result['section_entropy']:
bar = '#' * int(s['entropy'] * 3)
color = Colors.RED if s['entropy'] > 7.0 else (Colors.YELLOW if s['entropy'] > 6.0 else Colors.GREEN)
print(f" {s['name']:12} {color}{s['entropy']:.2f}{Colors.RESET} {bar}")
print(f"\n {Colors.CYAN}Strings:{Colors.RESET} {result['strings_count']} found")
if result['packer']['detected']:
print(f"\n {Colors.RED}Packer Detected:{Colors.RESET}")
for d in result['packer']['detections']:
print(f" {d['packer']} (confidence: {d['confidence']}%)")
def cli_disassemble(self):
filepath = input(f"{Colors.WHITE} Enter file path: {Colors.RESET}").strip()
if not filepath:
return
section = input(f"{Colors.WHITE} Section [.text]: {Colors.RESET}").strip() or '.text'
count = input(f"{Colors.WHITE} Instruction count [50]: {Colors.RESET}").strip() or '50'
try:
count = int(count)
except ValueError:
count = 50
results = self.disassemble_file(filepath, section=section, count=count)
if results and 'error' in results[0]:
self.print_status(results[0]['error'], 'error')
return
print(f"\n{Colors.CYAN}{'Address':<14} {'Bytes':<24} {'Mnemonic':<10} {'Operands'}{Colors.RESET}")
print(f"{'-' * 70}")
for inst in results:
print(f" {inst['address']:<12} {inst.get('bytes_hex', ''):<22} "
f"{Colors.CYAN}{inst['mnemonic']:<10}{Colors.RESET} {inst.get('op_str', '')}")
def cli_yara_scan(self):
filepath = input(f"{Colors.WHITE} Enter file path to scan: {Colors.RESET}").strip()
if not filepath:
return
rules_path = input(f"{Colors.WHITE} YARA rules file (or Enter for all): {Colors.RESET}").strip() or None
result = self.yara_scan(filepath, rules_path=rules_path)
if 'error' in result and result['error']:
self.print_status(result['error'], 'error')
if result.get('matches'):
print(f"\n {Colors.RED}Matches: {result['total']}{Colors.RESET}")
for m in result['matches']:
print(f" Rule: {m['rule']}")
for s in m.get('strings', [])[:5]:
print(f" 0x{s.get('offset', 0):08x}: {s.get('identifier', '')} = {s.get('data', '')}")
else:
self.print_status("No matches found", "info")
def cli_hex_dump(self):
filepath = input(f"{Colors.WHITE} Enter file path: {Colors.RESET}").strip()
if not filepath:
return
offset = input(f"{Colors.WHITE} Offset [0]: {Colors.RESET}").strip() or '0'
length = input(f"{Colors.WHITE} Length [256]: {Colors.RESET}").strip() or '256'
try:
offset = int(offset, 0)
length = int(length, 0)
except ValueError:
self.print_status("Invalid offset or length", "error")
return
result = self.hex_dump(filepath, offset, length)
if 'error' in result:
self.print_status(result['error'], 'error')
return
print(f"\n{Colors.CYAN}{result['text']}{Colors.RESET}")
def cli_detect_packer(self):
filepath = input(f"{Colors.WHITE} Enter file path: {Colors.RESET}").strip()
if not filepath:
return
result = self.detect_packer(filepath)
if 'error' in result:
self.print_status(result['error'], 'error')
return
if result['detected']:
print(f"\n {Colors.RED}Packer(s) Detected:{Colors.RESET}")
for d in result['detections']:
print(f" {d['packer']} — confidence {d['confidence']}%")
print(f" {d['description']}")
for e in d.get('evidence', []):
print(f" {e}")
else:
self.print_status("No packer detected", "success")
print(f" Entropy: {result.get('overall_entropy', 0):.4f}")
def cli_compare(self):
file1 = input(f"{Colors.WHITE} First file: {Colors.RESET}").strip()
file2 = input(f"{Colors.WHITE} Second file: {Colors.RESET}").strip()
if not file1 or not file2:
return
result = self.compare_binaries(file1, file2)
if 'error' in result:
self.print_status(result['error'], 'error')
return
f1, f2 = result['file1'], result['file2']
print(f"\n{Colors.CYAN}{'=' * 60}{Colors.RESET}")
print(f" File 1: {f1['name']} ({f1['size']:,} bytes, entropy {f1['entropy']:.2f})")
print(f" File 2: {f2['name']} ({f2['size']:,} bytes, entropy {f2['entropy']:.2f})")
if result['identical']:
self.print_status("Files are identical", "success")
else:
print(f"\n {Colors.YELLOW}Different bytes: {result['diff_bytes']:,} ({result['diff_percentage']}%){Colors.RESET}")
print(f" Diff regions: {result['diff_regions_total']}")
for sd in result.get('section_diffs', []):
status_color = Colors.RED if sd['status'] != 'unchanged' else Colors.GREEN
print(f" {sd['name']:16} {status_color}{sd['status']}{Colors.RESET}")
def run(self):
while True:
self.show_menu()
try:
choice = input(f"{Colors.WHITE} Select: {Colors.RESET}").strip()
if choice == "0":
break
elif choice == "1":
self.cli_analyze()
elif choice == "2":
self.cli_disassemble()
elif choice == "3":
self.cli_yara_scan()
elif choice == "4":
self.cli_hex_dump()
elif choice == "5":
self.cli_detect_packer()
elif choice == "6":
self.cli_compare()
if choice in ["1", "2", "3", "4", "5", "6"]:
input(f"\n{Colors.WHITE} Press Enter to continue...{Colors.RESET}")
except (EOFError, KeyboardInterrupt):
break
# ── Singleton ────────────────────────────────────────────────────────────────
_instance = None
def get_reverse_eng() -> ReverseEngineer:
global _instance
if _instance is None:
_instance = ReverseEngineer()
return _instance
def run():
get_reverse_eng().run()
if __name__ == "__main__":
run()