Autarch/modules/malware_sandbox.py

525 lines
20 KiB
Python
Raw Permalink Normal View History

"""AUTARCH Malware Sandbox
Isolated sample detonation (Docker-based), behavior logging, API call tracing,
network activity monitoring, and file system change tracking.
"""
DESCRIPTION = "Malware detonation sandbox & analysis"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "analyze"
import os
import re
import json
import time
import shutil
import hashlib
import subprocess
import threading
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
try:
from core.paths import find_tool, get_data_dir
except ImportError:
def find_tool(name):
return shutil.which(name)
def get_data_dir():
return str(Path(__file__).parent.parent / 'data')
# ── YARA Rules (basic) ──────────────────────────────────────────────────────
BASIC_YARA_INDICATORS = {
'suspicious_imports': [
b'CreateRemoteThread', b'VirtualAllocEx', b'WriteProcessMemory',
b'NtQueryInformationProcess', b'IsDebuggerPresent',
b'GetProcAddress', b'LoadLibraryA', b'ShellExecuteA',
],
'crypto_indicators': [
b'CryptEncrypt', b'CryptDecrypt', b'BCryptEncrypt',
b'AES', b'RSA', b'BEGIN PUBLIC KEY',
],
'network_indicators': [
b'InternetOpenA', b'HttpOpenRequestA', b'URLDownloadToFile',
b'WSAStartup', b'connect', b'send', b'recv',
b'http://', b'https://', b'ftp://',
],
'persistence_indicators': [
b'CurrentVersion\\Run', b'SOFTWARE\\Microsoft\\Windows\\CurrentVersion',
b'schtasks', b'at.exe', b'HKEY_LOCAL_MACHINE', b'HKEY_CURRENT_USER',
b'crontab', b'/etc/cron',
],
'evasion_indicators': [
b'IsDebuggerPresent', b'CheckRemoteDebuggerPresent',
b'NtSetInformationThread', b'vmware', b'virtualbox', b'vbox',
b'sandbox', b'SbieDll.dll',
],
}
# ── Sandbox Engine ───────────────────────────────────────────────────────────
class MalwareSandbox:
"""Isolated malware analysis environment."""
def __init__(self):
self.data_dir = os.path.join(get_data_dir(), 'sandbox')
os.makedirs(self.data_dir, exist_ok=True)
self.samples_dir = os.path.join(self.data_dir, 'samples')
os.makedirs(self.samples_dir, exist_ok=True)
self.reports_dir = os.path.join(self.data_dir, 'reports')
os.makedirs(self.reports_dir, exist_ok=True)
self.docker = find_tool('docker') or shutil.which('docker')
self.strace = shutil.which('strace')
self.ltrace = shutil.which('ltrace')
self.file_cmd = shutil.which('file')
self.strings_cmd = find_tool('strings') or shutil.which('strings')
self.analyses: List[Dict] = []
self._jobs: Dict[str, Dict] = {}
def get_status(self) -> Dict:
"""Get sandbox capabilities."""
docker_ok = False
if self.docker:
try:
result = subprocess.run([self.docker, 'info'],
capture_output=True, timeout=5)
docker_ok = result.returncode == 0
except Exception:
pass
return {
'docker': docker_ok,
'strace': self.strace is not None,
'ltrace': self.ltrace is not None,
'file': self.file_cmd is not None,
'strings': self.strings_cmd is not None,
'samples': len(list(Path(self.samples_dir).iterdir())),
'analyses': len(self.analyses)
}
# ── Sample Management ────────────────────────────────────────────────
def submit_sample(self, filepath: str, name: str = None) -> Dict:
"""Submit a sample for analysis."""
if not os.path.exists(filepath):
return {'ok': False, 'error': 'File not found'}
# Hash the sample
hashes = {}
with open(filepath, 'rb') as f:
data = f.read()
hashes['md5'] = hashlib.md5(data).hexdigest()
hashes['sha1'] = hashlib.sha1(data).hexdigest()
hashes['sha256'] = hashlib.sha256(data).hexdigest()
# Copy to samples dir
sample_name = name or Path(filepath).name
safe_name = re.sub(r'[^\w.\-]', '_', sample_name)
dest = os.path.join(self.samples_dir, f'{hashes["sha256"][:16]}_{safe_name}')
shutil.copy2(filepath, dest)
sample = {
'name': sample_name,
'path': dest,
'size': os.path.getsize(dest),
'hashes': hashes,
'submitted': datetime.now(timezone.utc).isoformat()
}
return {'ok': True, 'sample': sample}
def list_samples(self) -> List[Dict]:
"""List submitted samples."""
samples = []
for f in Path(self.samples_dir).iterdir():
if f.is_file():
samples.append({
'name': f.name,
'path': str(f),
'size': f.stat().st_size,
'modified': datetime.fromtimestamp(f.stat().st_mtime, timezone.utc).isoformat()
})
return samples
# ── Static Analysis ──────────────────────────────────────────────────
def static_analysis(self, filepath: str) -> Dict:
"""Perform static analysis on a sample."""
if not os.path.exists(filepath):
return {'ok': False, 'error': 'File not found'}
result = {
'ok': True,
'file': filepath,
'name': Path(filepath).name,
'size': os.path.getsize(filepath)
}
# File type identification
if self.file_cmd:
try:
out = subprocess.check_output([self.file_cmd, filepath],
text=True, timeout=10)
result['file_type'] = out.split(':', 1)[-1].strip()
except Exception:
pass
# Hashes
with open(filepath, 'rb') as f:
data = f.read()
result['hashes'] = {
'md5': hashlib.md5(data).hexdigest(),
'sha1': hashlib.sha1(data).hexdigest(),
'sha256': hashlib.sha256(data).hexdigest()
}
# Strings extraction
if self.strings_cmd:
try:
out = subprocess.check_output(
[self.strings_cmd, '-n', '6', filepath],
text=True, timeout=30, stderr=subprocess.DEVNULL
)
strings = out.strip().split('\n')
result['strings_count'] = len(strings)
# Extract interesting strings
urls = [s for s in strings if re.match(r'https?://', s)]
ips = [s for s in strings if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', s)]
emails = [s for s in strings if re.match(r'[^@]+@[^@]+\.[^@]+', s)]
paths = [s for s in strings if s.startswith('/') or '\\' in s]
result['interesting_strings'] = {
'urls': urls[:20],
'ips': list(set(ips))[:20],
'emails': list(set(emails))[:10],
'paths': paths[:20]
}
except Exception:
pass
# YARA-like signature matching
indicators = {}
for category, patterns in BASIC_YARA_INDICATORS.items():
matches = [p.decode('utf-8', errors='replace') for p in patterns if p in data]
if matches:
indicators[category] = matches
result['indicators'] = indicators
result['indicator_count'] = sum(len(v) for v in indicators.values())
# PE header analysis
if data[:2] == b'MZ':
result['pe_info'] = self._parse_pe_header(data)
# ELF header analysis
if data[:4] == b'\x7fELF':
result['elf_info'] = self._parse_elf_header(data)
# Risk score
score = 0
if indicators.get('evasion_indicators'):
score += 30
if indicators.get('persistence_indicators'):
score += 25
if indicators.get('suspicious_imports'):
score += 20
if indicators.get('network_indicators'):
score += 15
if indicators.get('crypto_indicators'):
score += 10
result['risk_score'] = min(100, score)
result['risk_level'] = (
'critical' if score >= 70 else
'high' if score >= 50 else
'medium' if score >= 30 else
'low' if score >= 10 else
'clean'
)
return result
def _parse_pe_header(self, data: bytes) -> Dict:
"""Basic PE header parsing."""
info = {'format': 'PE'}
try:
import struct
e_lfanew = struct.unpack_from('<I', data, 0x3C)[0]
if data[e_lfanew:e_lfanew+4] == b'PE\x00\x00':
machine = struct.unpack_from('<H', data, e_lfanew + 4)[0]
info['machine'] = {0x14c: 'i386', 0x8664: 'x86_64', 0x1c0: 'ARM'}.get(machine, hex(machine))
num_sections = struct.unpack_from('<H', data, e_lfanew + 6)[0]
info['sections'] = num_sections
timestamp = struct.unpack_from('<I', data, e_lfanew + 8)[0]
info['compile_time'] = datetime.fromtimestamp(timestamp, timezone.utc).isoformat()
except Exception:
pass
return info
def _parse_elf_header(self, data: bytes) -> Dict:
"""Basic ELF header parsing."""
info = {'format': 'ELF'}
try:
import struct
ei_class = data[4]
info['bits'] = {1: 32, 2: 64}.get(ei_class, 0)
ei_data = data[5]
info['endian'] = {1: 'little', 2: 'big'}.get(ei_data, 'unknown')
e_type = struct.unpack_from('<H', data, 16)[0]
info['type'] = {1: 'relocatable', 2: 'executable', 3: 'shared', 4: 'core'}.get(e_type, str(e_type))
except Exception:
pass
return info
# ── Dynamic Analysis (Docker) ────────────────────────────────────────
def dynamic_analysis(self, filepath: str, timeout: int = 60) -> str:
"""Run sample in Docker sandbox. Returns job_id."""
if not self.docker:
return ''
job_id = f'sandbox_{int(time.time())}'
self._jobs[job_id] = {
'type': 'dynamic', 'status': 'running',
'result': None, 'started': time.time()
}
def _run():
try:
container_name = f'autarch_sandbox_{job_id}'
sample_name = Path(filepath).name
# Run in isolated container
cmd = [
self.docker, 'run', '--rm',
'--name', container_name,
'--network', 'none', # No network
'--memory', '256m', # Memory limit
'--cpus', '1', # CPU limit
'--read-only', # Read-only root
'--tmpfs', '/tmp:size=64m',
'-v', f'{os.path.abspath(filepath)}:/sample/{sample_name}:ro',
'ubuntu:22.04',
'bash', '-c', f'''
# Log file operations
cp /sample/{sample_name} /tmp/test_sample
chmod +x /tmp/test_sample 2>/dev/null
# Try to run with strace if available
timeout {timeout} strace -f -o /tmp/trace.log /tmp/test_sample 2>/tmp/stderr.log || true
cat /tmp/trace.log 2>/dev/null | head -1000
echo "---STDERR---"
cat /tmp/stderr.log 2>/dev/null | head -100
'''
]
result = subprocess.run(cmd, capture_output=True, text=True,
timeout=timeout + 30)
# Parse strace output
syscalls = {}
files_accessed = []
network_calls = []
for line in result.stdout.split('\n'):
# Count syscalls
sc_match = re.match(r'.*?(\w+)\(', line)
if sc_match:
sc = sc_match.group(1)
syscalls[sc] = syscalls.get(sc, 0) + 1
# File access
if 'open(' in line or 'openat(' in line:
f_match = re.search(r'"([^"]+)"', line)
if f_match:
files_accessed.append(f_match.group(1))
# Network
if 'connect(' in line or 'socket(' in line:
network_calls.append(line.strip()[:100])
self._jobs[job_id]['status'] = 'complete'
self._jobs[job_id]['result'] = {
'ok': True,
'syscalls': syscalls,
'syscall_count': sum(syscalls.values()),
'files_accessed': list(set(files_accessed))[:50],
'network_calls': network_calls[:20],
'exit_code': result.returncode,
'stderr': result.stderr[:500] if result.stderr else ''
}
except subprocess.TimeoutExpired:
# Kill container
subprocess.run([self.docker, 'kill', container_name],
capture_output=True)
self._jobs[job_id]['status'] = 'complete'
self._jobs[job_id]['result'] = {
'ok': True, 'timeout': True,
'message': 'Analysis timed out (sample may be long-running)'
}
except Exception as e:
self._jobs[job_id]['status'] = 'error'
self._jobs[job_id]['result'] = {'ok': False, 'error': str(e)}
threading.Thread(target=_run, daemon=True).start()
return job_id
# ── Report Generation ────────────────────────────────────────────────
def generate_report(self, filepath: str, include_dynamic: bool = False) -> Dict:
"""Generate comprehensive analysis report."""
static = self.static_analysis(filepath)
report = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'sample': {
'name': Path(filepath).name,
'path': filepath,
'size': static.get('size', 0),
'hashes': static.get('hashes', {})
},
'static_analysis': static,
'risk_score': static.get('risk_score', 0),
'risk_level': static.get('risk_level', 'unknown')
}
# Save report
report_name = f'report_{static.get("hashes", {}).get("sha256", "unknown")[:16]}.json'
report_path = os.path.join(self.reports_dir, report_name)
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
report['report_path'] = report_path
self.analyses.append({
'name': Path(filepath).name,
'report': report_path,
'risk': report['risk_level'],
'timestamp': report['timestamp']
})
return {'ok': True, **report}
def list_reports(self) -> List[Dict]:
"""List analysis reports."""
reports = []
for f in Path(self.reports_dir).glob('*.json'):
try:
with open(f) as fh:
data = json.load(fh)
reports.append({
'name': f.name,
'path': str(f),
'sample': data.get('sample', {}).get('name', ''),
'risk': data.get('risk_level', 'unknown'),
'timestamp': data.get('timestamp', '')
})
except Exception:
pass
return reports
# ── Job Management ───────────────────────────────────────────────────
def get_job(self, job_id: str) -> Optional[Dict]:
return self._jobs.get(job_id)
# ── Singleton ────────────────────────────────────────────────────────────────
_instance = None
def get_sandbox() -> MalwareSandbox:
global _instance
if _instance is None:
_instance = MalwareSandbox()
return _instance
# ── CLI Interface ────────────────────────────────────────────────────────────
def run():
"""CLI entry point for Malware Sandbox module."""
sandbox = get_sandbox()
while True:
status = sandbox.get_status()
print(f"\n{'='*60}")
print(f" Malware Sandbox")
print(f"{'='*60}")
print(f" Docker: {'OK' if status['docker'] else 'NOT AVAILABLE'}")
print(f" Samples: {status['samples']} Analyses: {status['analyses']}")
print()
print(" 1 — Submit Sample")
print(" 2 — Static Analysis")
print(" 3 — Dynamic Analysis (Docker)")
print(" 4 — Full Report")
print(" 5 — List Samples")
print(" 6 — List Reports")
print(" 0 — Back")
print()
choice = input(" > ").strip()
if choice == '0':
break
elif choice == '1':
path = input(" File path: ").strip()
if path:
result = sandbox.submit_sample(path)
if result['ok']:
s = result['sample']
print(f" Submitted: {s['name']} ({s['size']} bytes)")
print(f" SHA256: {s['hashes']['sha256']}")
else:
print(f" Error: {result['error']}")
elif choice == '2':
path = input(" Sample path: ").strip()
if path:
result = sandbox.static_analysis(path)
if result['ok']:
print(f" Type: {result.get('file_type', 'unknown')}")
print(f" Risk: {result['risk_level']} ({result['risk_score']}/100)")
print(f" Strings: {result.get('strings_count', 0)}")
for cat, matches in result.get('indicators', {}).items():
print(f" {cat}: {', '.join(matches[:5])}")
else:
print(f" Error: {result['error']}")
elif choice == '3':
if not status['docker']:
print(" Docker not available")
continue
path = input(" Sample path: ").strip()
if path:
job_id = sandbox.dynamic_analysis(path)
print(f" Running in sandbox (job: {job_id})...")
while True:
job = sandbox.get_job(job_id)
if job['status'] != 'running':
r = job['result']
if r.get('ok'):
print(f" Syscalls: {r.get('syscall_count', 0)}")
print(f" Files: {len(r.get('files_accessed', []))}")
print(f" Network: {len(r.get('network_calls', []))}")
else:
print(f" Error: {r.get('error', 'Unknown')}")
break
time.sleep(2)
elif choice == '4':
path = input(" Sample path: ").strip()
if path:
result = sandbox.generate_report(path)
if result['ok']:
print(f" Report: {result['report_path']}")
print(f" Risk: {result['risk_level']} ({result['risk_score']}/100)")
elif choice == '5':
for s in sandbox.list_samples():
print(f" {s['name']} ({s['size']} bytes)")
elif choice == '6':
for r in sandbox.list_reports():
print(f" [{r['risk']}] {r['sample']} {r['timestamp'][:19]}")