Autarch/modules/forensics.py
DigiJ 2322f69516 v2.2.0 — Full arsenal expansion: 16 new security modules
Add WiFi Audit, API Fuzzer, Cloud Scanner, Threat Intel, Log Correlator,
Steganography, Anti-Forensics, BLE Scanner, Forensics, RFID/NFC, Malware
Sandbox, Password Toolkit, Web Scanner, Report Engine, Net Mapper, and
C2 Framework. Each module includes CLI interface, Flask routes, and web
UI template. Also includes Go DNS server source + binary, IP Capture
service, SYN Flood, Gone Fishing mail server, and hack hijack modules
from v2.0 work.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 05:20:39 -08:00

596 lines
25 KiB
Python

"""AUTARCH Forensics Toolkit
Disk imaging, file carving, metadata extraction, timeline building,
hash verification, and chain of custody logging for digital forensics.
"""
DESCRIPTION = "Digital forensics & evidence analysis"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "analyze"
import os
import re
import json
import time
import hashlib
import struct
import shutil
import subprocess
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Tuple
try:
from core.paths import find_tool, get_data_dir
except ImportError:
def find_tool(name):
return shutil.which(name)
def get_data_dir():
return str(Path(__file__).parent.parent / 'data')
# Optional imports
try:
from PIL import Image as PILImage
from PIL.ExifTags import TAGS, GPSTAGS
HAS_PIL = True
except ImportError:
HAS_PIL = False
# ── File Signatures for Carving ──────────────────────────────────────────────
FILE_SIGNATURES = [
{'name': 'JPEG', 'ext': '.jpg', 'magic': b'\xFF\xD8\xFF', 'footer': b'\xFF\xD9', 'max_size': 50*1024*1024},
{'name': 'PNG', 'ext': '.png', 'magic': b'\x89PNG\r\n\x1a\n', 'footer': b'IEND\xAE\x42\x60\x82', 'max_size': 50*1024*1024},
{'name': 'GIF', 'ext': '.gif', 'magic': b'GIF8', 'footer': b'\x00\x3B', 'max_size': 20*1024*1024},
{'name': 'PDF', 'ext': '.pdf', 'magic': b'%PDF', 'footer': b'%%EOF', 'max_size': 100*1024*1024},
{'name': 'ZIP', 'ext': '.zip', 'magic': b'PK\x03\x04', 'footer': None, 'max_size': 500*1024*1024},
{'name': 'RAR', 'ext': '.rar', 'magic': b'Rar!\x1a\x07', 'footer': None, 'max_size': 500*1024*1024},
{'name': 'ELF', 'ext': '.elf', 'magic': b'\x7fELF', 'footer': None, 'max_size': 100*1024*1024},
{'name': 'PE/EXE', 'ext': '.exe', 'magic': b'MZ', 'footer': None, 'max_size': 100*1024*1024},
{'name': 'SQLite', 'ext': '.sqlite', 'magic': b'SQLite format 3\x00', 'footer': None, 'max_size': 500*1024*1024},
{'name': 'DOCX', 'ext': '.docx', 'magic': b'PK\x03\x04', 'footer': None, 'max_size': 100*1024*1024},
{'name': '7z', 'ext': '.7z', 'magic': b"7z\xBC\xAF'\x1C", 'footer': None, 'max_size': 500*1024*1024},
{'name': 'BMP', 'ext': '.bmp', 'magic': b'BM', 'footer': None, 'max_size': 50*1024*1024},
{'name': 'MP3', 'ext': '.mp3', 'magic': b'\xFF\xFB', 'footer': None, 'max_size': 50*1024*1024},
{'name': 'MP4', 'ext': '.mp4', 'magic': b'\x00\x00\x00\x18ftyp', 'footer': None, 'max_size': 1024*1024*1024},
{'name': 'AVI', 'ext': '.avi', 'magic': b'RIFF', 'footer': None, 'max_size': 1024*1024*1024},
]
# ── Chain of Custody Logger ──────────────────────────────────────────────────
class CustodyLog:
"""Chain of custody logging for forensic evidence."""
def __init__(self, data_dir: str):
self.log_file = os.path.join(data_dir, 'custody_log.json')
self.entries: List[Dict] = []
self._load()
def _load(self):
if os.path.exists(self.log_file):
try:
with open(self.log_file) as f:
self.entries = json.load(f)
except Exception:
pass
def _save(self):
with open(self.log_file, 'w') as f:
json.dump(self.entries, f, indent=2)
def log(self, action: str, target: str, details: str = "",
evidence_hash: str = "") -> Dict:
"""Log a forensic action."""
entry = {
'id': len(self.entries) + 1,
'timestamp': datetime.now(timezone.utc).isoformat(),
'action': action,
'target': target,
'details': details,
'evidence_hash': evidence_hash,
'user': os.getenv('USER', os.getenv('USERNAME', 'unknown'))
}
self.entries.append(entry)
self._save()
return entry
def get_log(self) -> List[Dict]:
return self.entries
# ── Forensics Engine ─────────────────────────────────────────────────────────
class ForensicsEngine:
"""Digital forensics toolkit."""
def __init__(self):
self.data_dir = os.path.join(get_data_dir(), 'forensics')
os.makedirs(self.data_dir, exist_ok=True)
self.evidence_dir = os.path.join(self.data_dir, 'evidence')
os.makedirs(self.evidence_dir, exist_ok=True)
self.carved_dir = os.path.join(self.data_dir, 'carved')
os.makedirs(self.carved_dir, exist_ok=True)
self.custody = CustodyLog(self.data_dir)
self.dd = find_tool('dd') or shutil.which('dd')
# ── Hash Verification ────────────────────────────────────────────────
def hash_file(self, filepath: str, algorithms: List[str] = None) -> Dict:
"""Calculate file hashes for evidence integrity."""
algorithms = algorithms or ['md5', 'sha1', 'sha256']
if not os.path.exists(filepath):
return {'ok': False, 'error': 'File not found'}
try:
hashers = {alg: hashlib.new(alg) for alg in algorithms}
file_size = os.path.getsize(filepath)
with open(filepath, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
for h in hashers.values():
h.update(chunk)
hashes = {alg: h.hexdigest() for alg, h in hashers.items()}
self.custody.log('hash_verify', filepath,
f'Hashes: {", ".join(f"{k}={v[:16]}..." for k, v in hashes.items())}',
hashes.get('sha256', ''))
return {
'ok': True, 'file': filepath,
'size': file_size, 'hashes': hashes
}
except Exception as e:
return {'ok': False, 'error': str(e)}
def verify_hash(self, filepath: str, expected_hash: str,
algorithm: str = None) -> Dict:
"""Verify file against expected hash."""
# Auto-detect algorithm from hash length
if not algorithm:
hash_len = len(expected_hash)
algorithm = {32: 'md5', 40: 'sha1', 64: 'sha256', 128: 'sha512'}.get(hash_len)
if not algorithm:
return {'ok': False, 'error': f'Cannot detect algorithm for hash length {hash_len}'}
result = self.hash_file(filepath, [algorithm])
if not result['ok']:
return result
actual = result['hashes'][algorithm]
match = actual.lower() == expected_hash.lower()
self.custody.log('hash_verify', filepath,
f'Expected: {expected_hash[:16]}... Match: {match}')
return {
'ok': True, 'match': match,
'algorithm': algorithm,
'expected': expected_hash,
'actual': actual,
'file': filepath
}
# ── Disk Imaging ─────────────────────────────────────────────────────
def create_image(self, source: str, output: str = None,
block_size: int = 4096) -> Dict:
"""Create forensic disk image using dd."""
if not self.dd:
return {'ok': False, 'error': 'dd not found'}
if not output:
name = Path(source).name.replace('/', '_')
output = os.path.join(self.evidence_dir, f'{name}_{int(time.time())}.img')
self.custody.log('disk_image', source, f'Creating image: {output}')
try:
result = subprocess.run(
[self.dd, f'if={source}', f'of={output}', f'bs={block_size}',
'conv=noerror,sync', 'status=progress'],
capture_output=True, text=True, timeout=3600
)
if os.path.exists(output):
# Hash the image
hashes = self.hash_file(output, ['md5', 'sha256'])
self.custody.log('disk_image_complete', output,
f'Image created, SHA256: {hashes.get("hashes", {}).get("sha256", "?")}')
return {
'ok': True, 'source': source, 'output': output,
'size': os.path.getsize(output),
'hashes': hashes.get('hashes', {}),
'dd_output': result.stderr
}
return {'ok': False, 'error': 'Image file not created', 'stderr': result.stderr}
except subprocess.TimeoutExpired:
return {'ok': False, 'error': 'Imaging timed out (1hr limit)'}
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── File Carving ─────────────────────────────────────────────────────
def carve_files(self, source: str, file_types: List[str] = None,
max_files: int = 100) -> Dict:
"""Recover files from raw data by magic byte signatures."""
if not os.path.exists(source):
return {'ok': False, 'error': 'Source file not found'}
self.custody.log('file_carving', source, f'Starting carve, types={file_types}')
# Filter signatures
sigs = FILE_SIGNATURES
if file_types:
type_set = {t.lower() for t in file_types}
sigs = [s for s in sigs if s['name'].lower() in type_set or
s['ext'].lstrip('.').lower() in type_set]
carved = []
file_size = os.path.getsize(source)
chunk_size = 1024 * 1024 # 1MB chunks
try:
with open(source, 'rb') as f:
offset = 0
while offset < file_size and len(carved) < max_files:
f.seek(offset)
chunk = f.read(chunk_size)
if not chunk:
break
for sig in sigs:
pos = 0
while pos < len(chunk) and len(carved) < max_files:
idx = chunk.find(sig['magic'], pos)
if idx == -1:
break
abs_offset = offset + idx
# Try to find file end
file_end = abs_offset + sig['max_size']
if sig['footer']:
f.seek(abs_offset)
search_data = f.read(min(sig['max_size'], file_size - abs_offset))
footer_pos = search_data.find(sig['footer'], len(sig['magic']))
if footer_pos != -1:
file_end = abs_offset + footer_pos + len(sig['footer'])
# Extract file
extract_size = min(file_end - abs_offset, sig['max_size'])
f.seek(abs_offset)
file_data = f.read(extract_size)
# Save carved file
carved_name = f'carved_{len(carved):04d}_{sig["name"]}{sig["ext"]}'
carved_path = os.path.join(self.carved_dir, carved_name)
with open(carved_path, 'wb') as cf:
cf.write(file_data)
file_hash = hashlib.md5(file_data).hexdigest()
carved.append({
'name': carved_name,
'path': carved_path,
'type': sig['name'],
'offset': abs_offset,
'size': len(file_data),
'md5': file_hash
})
pos = idx + len(sig['magic'])
offset += chunk_size - max(len(s['magic']) for s in sigs)
self.custody.log('file_carving_complete', source,
f'Carved {len(carved)} files')
return {
'ok': True, 'source': source,
'carved': carved, 'count': len(carved),
'output_dir': self.carved_dir
}
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── Metadata Extraction ──────────────────────────────────────────────
def extract_metadata(self, filepath: str) -> Dict:
"""Extract metadata from files (EXIF, PDF, Office, etc.)."""
if not os.path.exists(filepath):
return {'ok': False, 'error': 'File not found'}
ext = Path(filepath).suffix.lower()
metadata = {
'file': filepath,
'name': Path(filepath).name,
'size': os.path.getsize(filepath),
'created': datetime.fromtimestamp(os.path.getctime(filepath), timezone.utc).isoformat(),
'modified': datetime.fromtimestamp(os.path.getmtime(filepath), timezone.utc).isoformat(),
'accessed': datetime.fromtimestamp(os.path.getatime(filepath), timezone.utc).isoformat(),
}
# EXIF for images
if ext in ('.jpg', '.jpeg', '.tiff', '.tif', '.png') and HAS_PIL:
try:
img = PILImage.open(filepath)
metadata['image'] = {
'width': img.size[0], 'height': img.size[1],
'format': img.format, 'mode': img.mode
}
exif = img._getexif()
if exif:
exif_data = {}
gps_data = {}
for tag_id, value in exif.items():
tag = TAGS.get(tag_id, tag_id)
if tag == 'GPSInfo':
for gps_id, gps_val in value.items():
gps_tag = GPSTAGS.get(gps_id, gps_id)
gps_data[str(gps_tag)] = str(gps_val)
else:
# Convert bytes to string for JSON serialization
if isinstance(value, bytes):
try:
value = value.decode('utf-8', errors='replace')
except Exception:
value = value.hex()
exif_data[str(tag)] = str(value)
metadata['exif'] = exif_data
if gps_data:
metadata['gps'] = gps_data
except Exception:
pass
# PDF metadata
elif ext == '.pdf':
try:
with open(filepath, 'rb') as f:
content = f.read(4096)
# Extract info dict
for key in [b'/Title', b'/Author', b'/Subject', b'/Creator',
b'/Producer', b'/CreationDate', b'/ModDate']:
pattern = key + rb'\s*\(([^)]*)\)'
m = re.search(pattern, content)
if m:
k = key.decode().lstrip('/')
metadata.setdefault('pdf', {})[k] = m.group(1).decode('utf-8', errors='replace')
except Exception:
pass
# Generic file header
try:
with open(filepath, 'rb') as f:
header = f.read(16)
metadata['magic_bytes'] = header.hex()
for sig in FILE_SIGNATURES:
if header.startswith(sig['magic']):
metadata['detected_type'] = sig['name']
break
except Exception:
pass
self.custody.log('metadata_extract', filepath, f'Type: {metadata.get("detected_type", "unknown")}')
return {'ok': True, **metadata}
# ── Timeline Builder ─────────────────────────────────────────────────
def build_timeline(self, directory: str, recursive: bool = True,
max_entries: int = 10000) -> Dict:
"""Build filesystem timeline from directory metadata."""
if not os.path.exists(directory):
return {'ok': False, 'error': 'Directory not found'}
events = []
count = 0
walk_fn = os.walk if recursive else lambda d: [(d, [], os.listdir(d))]
for root, dirs, files in walk_fn(directory):
for name in files:
if count >= max_entries:
break
filepath = os.path.join(root, name)
try:
stat = os.stat(filepath)
events.append({
'type': 'modified',
'timestamp': datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat(),
'epoch': stat.st_mtime,
'file': filepath,
'size': stat.st_size
})
events.append({
'type': 'created',
'timestamp': datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat(),
'epoch': stat.st_ctime,
'file': filepath,
'size': stat.st_size
})
events.append({
'type': 'accessed',
'timestamp': datetime.fromtimestamp(stat.st_atime, timezone.utc).isoformat(),
'epoch': stat.st_atime,
'file': filepath,
'size': stat.st_size
})
count += 1
except (OSError, PermissionError):
pass
# Sort by timestamp
events.sort(key=lambda e: e['epoch'])
self.custody.log('timeline_build', directory,
f'{count} files, {len(events)} events')
return {
'ok': True, 'directory': directory,
'events': events, 'event_count': len(events),
'file_count': count
}
# ── Evidence Management ──────────────────────────────────────────────
def list_evidence(self) -> List[Dict]:
"""List evidence files."""
evidence = []
edir = Path(self.evidence_dir)
for f in sorted(edir.iterdir()):
if f.is_file():
evidence.append({
'name': f.name,
'path': str(f),
'size': f.stat().st_size,
'modified': datetime.fromtimestamp(f.stat().st_mtime, timezone.utc).isoformat()
})
return evidence
def list_carved(self) -> List[Dict]:
"""List carved files."""
carved = []
cdir = Path(self.carved_dir)
for f in sorted(cdir.iterdir()):
if f.is_file():
carved.append({
'name': f.name,
'path': str(f),
'size': f.stat().st_size
})
return carved
def get_custody_log(self) -> List[Dict]:
"""Get chain of custody log."""
return self.custody.get_log()
# ── Singleton ────────────────────────────────────────────────────────────────
_instance = None
def get_forensics() -> ForensicsEngine:
global _instance
if _instance is None:
_instance = ForensicsEngine()
return _instance
# ── CLI Interface ────────────────────────────────────────────────────────────
def run():
"""CLI entry point for Forensics module."""
engine = get_forensics()
while True:
print(f"\n{'='*60}")
print(f" Digital Forensics Toolkit")
print(f"{'='*60}")
print()
print(" 1 — Hash File (integrity verification)")
print(" 2 — Verify Hash")
print(" 3 — Create Disk Image")
print(" 4 — Carve Files (recover deleted)")
print(" 5 — Extract Metadata (EXIF/PDF/headers)")
print(" 6 — Build Timeline")
print(" 7 — List Evidence")
print(" 8 — List Carved Files")
print(" 9 — Chain of Custody Log")
print(" 0 — Back")
print()
choice = input(" > ").strip()
if choice == '0':
break
elif choice == '1':
filepath = input(" File path: ").strip()
if filepath:
result = engine.hash_file(filepath)
if result['ok']:
print(f" Size: {result['size']} bytes")
for alg, h in result['hashes'].items():
print(f" {alg.upper()}: {h}")
else:
print(f" Error: {result['error']}")
elif choice == '2':
filepath = input(" File path: ").strip()
expected = input(" Expected hash: ").strip()
if filepath and expected:
result = engine.verify_hash(filepath, expected)
if result['ok']:
status = 'MATCH' if result['match'] else 'MISMATCH'
print(f" {status} ({result['algorithm'].upper()})")
else:
print(f" Error: {result['error']}")
elif choice == '3':
source = input(" Source device/file: ").strip()
output = input(" Output path (blank=auto): ").strip() or None
if source:
result = engine.create_image(source, output)
if result['ok']:
mb = result['size'] / (1024*1024)
print(f" Image created: {result['output']} ({mb:.1f} MB)")
else:
print(f" Error: {result['error']}")
elif choice == '4':
source = input(" Source file/image: ").strip()
types = input(" File types (blank=all, comma-sep): ").strip()
if source:
file_types = [t.strip() for t in types.split(',')] if types else None
result = engine.carve_files(source, file_types)
if result['ok']:
print(f" Carved {result['count']} files to {result['output_dir']}")
for c in result['carved'][:10]:
print(f" {c['name']} {c['type']} {c['size']} bytes offset={c['offset']}")
else:
print(f" Error: {result['error']}")
elif choice == '5':
filepath = input(" File path: ").strip()
if filepath:
result = engine.extract_metadata(filepath)
if result['ok']:
print(f" Name: {result['name']}")
print(f" Size: {result['size']}")
print(f" Type: {result.get('detected_type', 'unknown')}")
if 'exif' in result:
print(f" EXIF entries: {len(result['exif'])}")
for k, v in list(result['exif'].items())[:5]:
print(f" {k}: {v[:50]}")
if 'gps' in result:
print(f" GPS data: {result['gps']}")
else:
print(f" Error: {result['error']}")
elif choice == '6':
directory = input(" Directory path: ").strip()
if directory:
result = engine.build_timeline(directory)
if result['ok']:
print(f" {result['file_count']} files, {result['event_count']} events")
for e in result['events'][:10]:
print(f" {e['timestamp']} {e['type']:<10} {Path(e['file']).name}")
else:
print(f" Error: {result['error']}")
elif choice == '7':
for e in engine.list_evidence():
mb = e['size'] / (1024*1024)
print(f" {e['name']} ({mb:.1f} MB)")
elif choice == '8':
for c in engine.list_carved():
print(f" {c['name']} ({c['size']} bytes)")
elif choice == '9':
log = engine.get_custody_log()
print(f" {len(log)} entries:")
for entry in log[-10:]:
print(f" [{entry['timestamp'][:19]}] {entry['action']}: {entry['target']}")