525 lines
20 KiB
Python
525 lines
20 KiB
Python
|
|
"""AUTARCH Malware Sandbox
|
||
|
|
|
||
|
|
Isolated sample detonation (Docker-based), behavior logging, API call tracing,
|
||
|
|
network activity monitoring, and file system change tracking.
|
||
|
|
"""
|
||
|
|
|
||
|
|
DESCRIPTION = "Malware detonation sandbox & analysis"
|
||
|
|
AUTHOR = "darkHal"
|
||
|
|
VERSION = "1.0"
|
||
|
|
CATEGORY = "analyze"
|
||
|
|
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import json
|
||
|
|
import time
|
||
|
|
import shutil
|
||
|
|
import hashlib
|
||
|
|
import subprocess
|
||
|
|
import threading
|
||
|
|
from pathlib import Path
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from typing import Dict, List, Optional, Any
|
||
|
|
|
||
|
|
try:
|
||
|
|
from core.paths import find_tool, get_data_dir
|
||
|
|
except ImportError:
|
||
|
|
def find_tool(name):
|
||
|
|
return shutil.which(name)
|
||
|
|
def get_data_dir():
|
||
|
|
return str(Path(__file__).parent.parent / 'data')
|
||
|
|
|
||
|
|
|
||
|
|
# ── YARA Rules (basic) ──────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
BASIC_YARA_INDICATORS = {
|
||
|
|
'suspicious_imports': [
|
||
|
|
b'CreateRemoteThread', b'VirtualAllocEx', b'WriteProcessMemory',
|
||
|
|
b'NtQueryInformationProcess', b'IsDebuggerPresent',
|
||
|
|
b'GetProcAddress', b'LoadLibraryA', b'ShellExecuteA',
|
||
|
|
],
|
||
|
|
'crypto_indicators': [
|
||
|
|
b'CryptEncrypt', b'CryptDecrypt', b'BCryptEncrypt',
|
||
|
|
b'AES', b'RSA', b'BEGIN PUBLIC KEY',
|
||
|
|
],
|
||
|
|
'network_indicators': [
|
||
|
|
b'InternetOpenA', b'HttpOpenRequestA', b'URLDownloadToFile',
|
||
|
|
b'WSAStartup', b'connect', b'send', b'recv',
|
||
|
|
b'http://', b'https://', b'ftp://',
|
||
|
|
],
|
||
|
|
'persistence_indicators': [
|
||
|
|
b'CurrentVersion\\Run', b'SOFTWARE\\Microsoft\\Windows\\CurrentVersion',
|
||
|
|
b'schtasks', b'at.exe', b'HKEY_LOCAL_MACHINE', b'HKEY_CURRENT_USER',
|
||
|
|
b'crontab', b'/etc/cron',
|
||
|
|
],
|
||
|
|
'evasion_indicators': [
|
||
|
|
b'IsDebuggerPresent', b'CheckRemoteDebuggerPresent',
|
||
|
|
b'NtSetInformationThread', b'vmware', b'virtualbox', b'vbox',
|
||
|
|
b'sandbox', b'SbieDll.dll',
|
||
|
|
],
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# ── Sandbox Engine ───────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class MalwareSandbox:
|
||
|
|
"""Isolated malware analysis environment."""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.data_dir = os.path.join(get_data_dir(), 'sandbox')
|
||
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
||
|
|
self.samples_dir = os.path.join(self.data_dir, 'samples')
|
||
|
|
os.makedirs(self.samples_dir, exist_ok=True)
|
||
|
|
self.reports_dir = os.path.join(self.data_dir, 'reports')
|
||
|
|
os.makedirs(self.reports_dir, exist_ok=True)
|
||
|
|
|
||
|
|
self.docker = find_tool('docker') or shutil.which('docker')
|
||
|
|
self.strace = shutil.which('strace')
|
||
|
|
self.ltrace = shutil.which('ltrace')
|
||
|
|
self.file_cmd = shutil.which('file')
|
||
|
|
self.strings_cmd = find_tool('strings') or shutil.which('strings')
|
||
|
|
|
||
|
|
self.analyses: List[Dict] = []
|
||
|
|
self._jobs: Dict[str, Dict] = {}
|
||
|
|
|
||
|
|
def get_status(self) -> Dict:
|
||
|
|
"""Get sandbox capabilities."""
|
||
|
|
docker_ok = False
|
||
|
|
if self.docker:
|
||
|
|
try:
|
||
|
|
result = subprocess.run([self.docker, 'info'],
|
||
|
|
capture_output=True, timeout=5)
|
||
|
|
docker_ok = result.returncode == 0
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return {
|
||
|
|
'docker': docker_ok,
|
||
|
|
'strace': self.strace is not None,
|
||
|
|
'ltrace': self.ltrace is not None,
|
||
|
|
'file': self.file_cmd is not None,
|
||
|
|
'strings': self.strings_cmd is not None,
|
||
|
|
'samples': len(list(Path(self.samples_dir).iterdir())),
|
||
|
|
'analyses': len(self.analyses)
|
||
|
|
}
|
||
|
|
|
||
|
|
# ── Sample Management ────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def submit_sample(self, filepath: str, name: str = None) -> Dict:
|
||
|
|
"""Submit a sample for analysis."""
|
||
|
|
if not os.path.exists(filepath):
|
||
|
|
return {'ok': False, 'error': 'File not found'}
|
||
|
|
|
||
|
|
# Hash the sample
|
||
|
|
hashes = {}
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
data = f.read()
|
||
|
|
hashes['md5'] = hashlib.md5(data).hexdigest()
|
||
|
|
hashes['sha1'] = hashlib.sha1(data).hexdigest()
|
||
|
|
hashes['sha256'] = hashlib.sha256(data).hexdigest()
|
||
|
|
|
||
|
|
# Copy to samples dir
|
||
|
|
sample_name = name or Path(filepath).name
|
||
|
|
safe_name = re.sub(r'[^\w.\-]', '_', sample_name)
|
||
|
|
dest = os.path.join(self.samples_dir, f'{hashes["sha256"][:16]}_{safe_name}')
|
||
|
|
shutil.copy2(filepath, dest)
|
||
|
|
|
||
|
|
sample = {
|
||
|
|
'name': sample_name,
|
||
|
|
'path': dest,
|
||
|
|
'size': os.path.getsize(dest),
|
||
|
|
'hashes': hashes,
|
||
|
|
'submitted': datetime.now(timezone.utc).isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
return {'ok': True, 'sample': sample}
|
||
|
|
|
||
|
|
def list_samples(self) -> List[Dict]:
|
||
|
|
"""List submitted samples."""
|
||
|
|
samples = []
|
||
|
|
for f in Path(self.samples_dir).iterdir():
|
||
|
|
if f.is_file():
|
||
|
|
samples.append({
|
||
|
|
'name': f.name,
|
||
|
|
'path': str(f),
|
||
|
|
'size': f.stat().st_size,
|
||
|
|
'modified': datetime.fromtimestamp(f.stat().st_mtime, timezone.utc).isoformat()
|
||
|
|
})
|
||
|
|
return samples
|
||
|
|
|
||
|
|
# ── Static Analysis ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def static_analysis(self, filepath: str) -> Dict:
|
||
|
|
"""Perform static analysis on a sample."""
|
||
|
|
if not os.path.exists(filepath):
|
||
|
|
return {'ok': False, 'error': 'File not found'}
|
||
|
|
|
||
|
|
result = {
|
||
|
|
'ok': True,
|
||
|
|
'file': filepath,
|
||
|
|
'name': Path(filepath).name,
|
||
|
|
'size': os.path.getsize(filepath)
|
||
|
|
}
|
||
|
|
|
||
|
|
# File type identification
|
||
|
|
if self.file_cmd:
|
||
|
|
try:
|
||
|
|
out = subprocess.check_output([self.file_cmd, filepath],
|
||
|
|
text=True, timeout=10)
|
||
|
|
result['file_type'] = out.split(':', 1)[-1].strip()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Hashes
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
data = f.read()
|
||
|
|
result['hashes'] = {
|
||
|
|
'md5': hashlib.md5(data).hexdigest(),
|
||
|
|
'sha1': hashlib.sha1(data).hexdigest(),
|
||
|
|
'sha256': hashlib.sha256(data).hexdigest()
|
||
|
|
}
|
||
|
|
|
||
|
|
# Strings extraction
|
||
|
|
if self.strings_cmd:
|
||
|
|
try:
|
||
|
|
out = subprocess.check_output(
|
||
|
|
[self.strings_cmd, '-n', '6', filepath],
|
||
|
|
text=True, timeout=30, stderr=subprocess.DEVNULL
|
||
|
|
)
|
||
|
|
strings = out.strip().split('\n')
|
||
|
|
result['strings_count'] = len(strings)
|
||
|
|
|
||
|
|
# Extract interesting strings
|
||
|
|
urls = [s for s in strings if re.match(r'https?://', s)]
|
||
|
|
ips = [s for s in strings if re.match(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', s)]
|
||
|
|
emails = [s for s in strings if re.match(r'[^@]+@[^@]+\.[^@]+', s)]
|
||
|
|
paths = [s for s in strings if s.startswith('/') or '\\' in s]
|
||
|
|
|
||
|
|
result['interesting_strings'] = {
|
||
|
|
'urls': urls[:20],
|
||
|
|
'ips': list(set(ips))[:20],
|
||
|
|
'emails': list(set(emails))[:10],
|
||
|
|
'paths': paths[:20]
|
||
|
|
}
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# YARA-like signature matching
|
||
|
|
indicators = {}
|
||
|
|
for category, patterns in BASIC_YARA_INDICATORS.items():
|
||
|
|
matches = [p.decode('utf-8', errors='replace') for p in patterns if p in data]
|
||
|
|
if matches:
|
||
|
|
indicators[category] = matches
|
||
|
|
|
||
|
|
result['indicators'] = indicators
|
||
|
|
result['indicator_count'] = sum(len(v) for v in indicators.values())
|
||
|
|
|
||
|
|
# PE header analysis
|
||
|
|
if data[:2] == b'MZ':
|
||
|
|
result['pe_info'] = self._parse_pe_header(data)
|
||
|
|
|
||
|
|
# ELF header analysis
|
||
|
|
if data[:4] == b'\x7fELF':
|
||
|
|
result['elf_info'] = self._parse_elf_header(data)
|
||
|
|
|
||
|
|
# Risk score
|
||
|
|
score = 0
|
||
|
|
if indicators.get('evasion_indicators'):
|
||
|
|
score += 30
|
||
|
|
if indicators.get('persistence_indicators'):
|
||
|
|
score += 25
|
||
|
|
if indicators.get('suspicious_imports'):
|
||
|
|
score += 20
|
||
|
|
if indicators.get('network_indicators'):
|
||
|
|
score += 15
|
||
|
|
if indicators.get('crypto_indicators'):
|
||
|
|
score += 10
|
||
|
|
|
||
|
|
result['risk_score'] = min(100, score)
|
||
|
|
result['risk_level'] = (
|
||
|
|
'critical' if score >= 70 else
|
||
|
|
'high' if score >= 50 else
|
||
|
|
'medium' if score >= 30 else
|
||
|
|
'low' if score >= 10 else
|
||
|
|
'clean'
|
||
|
|
)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _parse_pe_header(self, data: bytes) -> Dict:
|
||
|
|
"""Basic PE header parsing."""
|
||
|
|
info = {'format': 'PE'}
|
||
|
|
try:
|
||
|
|
import struct
|
||
|
|
e_lfanew = struct.unpack_from('<I', data, 0x3C)[0]
|
||
|
|
if data[e_lfanew:e_lfanew+4] == b'PE\x00\x00':
|
||
|
|
machine = struct.unpack_from('<H', data, e_lfanew + 4)[0]
|
||
|
|
info['machine'] = {0x14c: 'i386', 0x8664: 'x86_64', 0x1c0: 'ARM'}.get(machine, hex(machine))
|
||
|
|
num_sections = struct.unpack_from('<H', data, e_lfanew + 6)[0]
|
||
|
|
info['sections'] = num_sections
|
||
|
|
timestamp = struct.unpack_from('<I', data, e_lfanew + 8)[0]
|
||
|
|
info['compile_time'] = datetime.fromtimestamp(timestamp, timezone.utc).isoformat()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return info
|
||
|
|
|
||
|
|
def _parse_elf_header(self, data: bytes) -> Dict:
|
||
|
|
"""Basic ELF header parsing."""
|
||
|
|
info = {'format': 'ELF'}
|
||
|
|
try:
|
||
|
|
import struct
|
||
|
|
ei_class = data[4]
|
||
|
|
info['bits'] = {1: 32, 2: 64}.get(ei_class, 0)
|
||
|
|
ei_data = data[5]
|
||
|
|
info['endian'] = {1: 'little', 2: 'big'}.get(ei_data, 'unknown')
|
||
|
|
e_type = struct.unpack_from('<H', data, 16)[0]
|
||
|
|
info['type'] = {1: 'relocatable', 2: 'executable', 3: 'shared', 4: 'core'}.get(e_type, str(e_type))
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return info
|
||
|
|
|
||
|
|
# ── Dynamic Analysis (Docker) ────────────────────────────────────────
|
||
|
|
|
||
|
|
def dynamic_analysis(self, filepath: str, timeout: int = 60) -> str:
|
||
|
|
"""Run sample in Docker sandbox. Returns job_id."""
|
||
|
|
if not self.docker:
|
||
|
|
return ''
|
||
|
|
|
||
|
|
job_id = f'sandbox_{int(time.time())}'
|
||
|
|
self._jobs[job_id] = {
|
||
|
|
'type': 'dynamic', 'status': 'running',
|
||
|
|
'result': None, 'started': time.time()
|
||
|
|
}
|
||
|
|
|
||
|
|
def _run():
|
||
|
|
try:
|
||
|
|
container_name = f'autarch_sandbox_{job_id}'
|
||
|
|
sample_name = Path(filepath).name
|
||
|
|
|
||
|
|
# Run in isolated container
|
||
|
|
cmd = [
|
||
|
|
self.docker, 'run', '--rm',
|
||
|
|
'--name', container_name,
|
||
|
|
'--network', 'none', # No network
|
||
|
|
'--memory', '256m', # Memory limit
|
||
|
|
'--cpus', '1', # CPU limit
|
||
|
|
'--read-only', # Read-only root
|
||
|
|
'--tmpfs', '/tmp:size=64m',
|
||
|
|
'-v', f'{os.path.abspath(filepath)}:/sample/{sample_name}:ro',
|
||
|
|
'ubuntu:22.04',
|
||
|
|
'bash', '-c', f'''
|
||
|
|
# Log file operations
|
||
|
|
cp /sample/{sample_name} /tmp/test_sample
|
||
|
|
chmod +x /tmp/test_sample 2>/dev/null
|
||
|
|
# Try to run with strace if available
|
||
|
|
timeout {timeout} strace -f -o /tmp/trace.log /tmp/test_sample 2>/tmp/stderr.log || true
|
||
|
|
cat /tmp/trace.log 2>/dev/null | head -1000
|
||
|
|
echo "---STDERR---"
|
||
|
|
cat /tmp/stderr.log 2>/dev/null | head -100
|
||
|
|
'''
|
||
|
|
]
|
||
|
|
|
||
|
|
result = subprocess.run(cmd, capture_output=True, text=True,
|
||
|
|
timeout=timeout + 30)
|
||
|
|
|
||
|
|
# Parse strace output
|
||
|
|
syscalls = {}
|
||
|
|
files_accessed = []
|
||
|
|
network_calls = []
|
||
|
|
|
||
|
|
for line in result.stdout.split('\n'):
|
||
|
|
# Count syscalls
|
||
|
|
sc_match = re.match(r'.*?(\w+)\(', line)
|
||
|
|
if sc_match:
|
||
|
|
sc = sc_match.group(1)
|
||
|
|
syscalls[sc] = syscalls.get(sc, 0) + 1
|
||
|
|
|
||
|
|
# File access
|
||
|
|
if 'open(' in line or 'openat(' in line:
|
||
|
|
f_match = re.search(r'"([^"]+)"', line)
|
||
|
|
if f_match:
|
||
|
|
files_accessed.append(f_match.group(1))
|
||
|
|
|
||
|
|
# Network
|
||
|
|
if 'connect(' in line or 'socket(' in line:
|
||
|
|
network_calls.append(line.strip()[:100])
|
||
|
|
|
||
|
|
self._jobs[job_id]['status'] = 'complete'
|
||
|
|
self._jobs[job_id]['result'] = {
|
||
|
|
'ok': True,
|
||
|
|
'syscalls': syscalls,
|
||
|
|
'syscall_count': sum(syscalls.values()),
|
||
|
|
'files_accessed': list(set(files_accessed))[:50],
|
||
|
|
'network_calls': network_calls[:20],
|
||
|
|
'exit_code': result.returncode,
|
||
|
|
'stderr': result.stderr[:500] if result.stderr else ''
|
||
|
|
}
|
||
|
|
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
# Kill container
|
||
|
|
subprocess.run([self.docker, 'kill', container_name],
|
||
|
|
capture_output=True)
|
||
|
|
self._jobs[job_id]['status'] = 'complete'
|
||
|
|
self._jobs[job_id]['result'] = {
|
||
|
|
'ok': True, 'timeout': True,
|
||
|
|
'message': 'Analysis timed out (sample may be long-running)'
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
self._jobs[job_id]['status'] = 'error'
|
||
|
|
self._jobs[job_id]['result'] = {'ok': False, 'error': str(e)}
|
||
|
|
|
||
|
|
threading.Thread(target=_run, daemon=True).start()
|
||
|
|
return job_id
|
||
|
|
|
||
|
|
# ── Report Generation ────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def generate_report(self, filepath: str, include_dynamic: bool = False) -> Dict:
|
||
|
|
"""Generate comprehensive analysis report."""
|
||
|
|
static = self.static_analysis(filepath)
|
||
|
|
report = {
|
||
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
||
|
|
'sample': {
|
||
|
|
'name': Path(filepath).name,
|
||
|
|
'path': filepath,
|
||
|
|
'size': static.get('size', 0),
|
||
|
|
'hashes': static.get('hashes', {})
|
||
|
|
},
|
||
|
|
'static_analysis': static,
|
||
|
|
'risk_score': static.get('risk_score', 0),
|
||
|
|
'risk_level': static.get('risk_level', 'unknown')
|
||
|
|
}
|
||
|
|
|
||
|
|
# Save report
|
||
|
|
report_name = f'report_{static.get("hashes", {}).get("sha256", "unknown")[:16]}.json'
|
||
|
|
report_path = os.path.join(self.reports_dir, report_name)
|
||
|
|
with open(report_path, 'w') as f:
|
||
|
|
json.dump(report, f, indent=2)
|
||
|
|
|
||
|
|
report['report_path'] = report_path
|
||
|
|
self.analyses.append({
|
||
|
|
'name': Path(filepath).name,
|
||
|
|
'report': report_path,
|
||
|
|
'risk': report['risk_level'],
|
||
|
|
'timestamp': report['timestamp']
|
||
|
|
})
|
||
|
|
|
||
|
|
return {'ok': True, **report}
|
||
|
|
|
||
|
|
def list_reports(self) -> List[Dict]:
|
||
|
|
"""List analysis reports."""
|
||
|
|
reports = []
|
||
|
|
for f in Path(self.reports_dir).glob('*.json'):
|
||
|
|
try:
|
||
|
|
with open(f) as fh:
|
||
|
|
data = json.load(fh)
|
||
|
|
reports.append({
|
||
|
|
'name': f.name,
|
||
|
|
'path': str(f),
|
||
|
|
'sample': data.get('sample', {}).get('name', ''),
|
||
|
|
'risk': data.get('risk_level', 'unknown'),
|
||
|
|
'timestamp': data.get('timestamp', '')
|
||
|
|
})
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return reports
|
||
|
|
|
||
|
|
# ── Job Management ───────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def get_job(self, job_id: str) -> Optional[Dict]:
|
||
|
|
return self._jobs.get(job_id)
|
||
|
|
|
||
|
|
|
||
|
|
# ── Singleton ────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
_instance = None
|
||
|
|
|
||
|
|
def get_sandbox() -> MalwareSandbox:
|
||
|
|
global _instance
|
||
|
|
if _instance is None:
|
||
|
|
_instance = MalwareSandbox()
|
||
|
|
return _instance
|
||
|
|
|
||
|
|
|
||
|
|
# ── CLI Interface ────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def run():
|
||
|
|
"""CLI entry point for Malware Sandbox module."""
|
||
|
|
sandbox = get_sandbox()
|
||
|
|
|
||
|
|
while True:
|
||
|
|
status = sandbox.get_status()
|
||
|
|
print(f"\n{'='*60}")
|
||
|
|
print(f" Malware Sandbox")
|
||
|
|
print(f"{'='*60}")
|
||
|
|
print(f" Docker: {'OK' if status['docker'] else 'NOT AVAILABLE'}")
|
||
|
|
print(f" Samples: {status['samples']} Analyses: {status['analyses']}")
|
||
|
|
print()
|
||
|
|
print(" 1 — Submit Sample")
|
||
|
|
print(" 2 — Static Analysis")
|
||
|
|
print(" 3 — Dynamic Analysis (Docker)")
|
||
|
|
print(" 4 — Full Report")
|
||
|
|
print(" 5 — List Samples")
|
||
|
|
print(" 6 — List Reports")
|
||
|
|
print(" 0 — Back")
|
||
|
|
print()
|
||
|
|
|
||
|
|
choice = input(" > ").strip()
|
||
|
|
|
||
|
|
if choice == '0':
|
||
|
|
break
|
||
|
|
elif choice == '1':
|
||
|
|
path = input(" File path: ").strip()
|
||
|
|
if path:
|
||
|
|
result = sandbox.submit_sample(path)
|
||
|
|
if result['ok']:
|
||
|
|
s = result['sample']
|
||
|
|
print(f" Submitted: {s['name']} ({s['size']} bytes)")
|
||
|
|
print(f" SHA256: {s['hashes']['sha256']}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '2':
|
||
|
|
path = input(" Sample path: ").strip()
|
||
|
|
if path:
|
||
|
|
result = sandbox.static_analysis(path)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" Type: {result.get('file_type', 'unknown')}")
|
||
|
|
print(f" Risk: {result['risk_level']} ({result['risk_score']}/100)")
|
||
|
|
print(f" Strings: {result.get('strings_count', 0)}")
|
||
|
|
for cat, matches in result.get('indicators', {}).items():
|
||
|
|
print(f" {cat}: {', '.join(matches[:5])}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '3':
|
||
|
|
if not status['docker']:
|
||
|
|
print(" Docker not available")
|
||
|
|
continue
|
||
|
|
path = input(" Sample path: ").strip()
|
||
|
|
if path:
|
||
|
|
job_id = sandbox.dynamic_analysis(path)
|
||
|
|
print(f" Running in sandbox (job: {job_id})...")
|
||
|
|
while True:
|
||
|
|
job = sandbox.get_job(job_id)
|
||
|
|
if job['status'] != 'running':
|
||
|
|
r = job['result']
|
||
|
|
if r.get('ok'):
|
||
|
|
print(f" Syscalls: {r.get('syscall_count', 0)}")
|
||
|
|
print(f" Files: {len(r.get('files_accessed', []))}")
|
||
|
|
print(f" Network: {len(r.get('network_calls', []))}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {r.get('error', 'Unknown')}")
|
||
|
|
break
|
||
|
|
time.sleep(2)
|
||
|
|
elif choice == '4':
|
||
|
|
path = input(" Sample path: ").strip()
|
||
|
|
if path:
|
||
|
|
result = sandbox.generate_report(path)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" Report: {result['report_path']}")
|
||
|
|
print(f" Risk: {result['risk_level']} ({result['risk_score']}/100)")
|
||
|
|
elif choice == '5':
|
||
|
|
for s in sandbox.list_samples():
|
||
|
|
print(f" {s['name']} ({s['size']} bytes)")
|
||
|
|
elif choice == '6':
|
||
|
|
for r in sandbox.list_reports():
|
||
|
|
print(f" [{r['risk']}] {r['sample']} {r['timestamp'][:19]}")
|