596 lines
25 KiB
Python
596 lines
25 KiB
Python
|
|
"""AUTARCH Forensics Toolkit
|
||
|
|
|
||
|
|
Disk imaging, file carving, metadata extraction, timeline building,
|
||
|
|
hash verification, and chain of custody logging for digital forensics.
|
||
|
|
"""
|
||
|
|
|
||
|
|
DESCRIPTION = "Digital forensics & evidence analysis"
|
||
|
|
AUTHOR = "darkHal"
|
||
|
|
VERSION = "1.0"
|
||
|
|
CATEGORY = "analyze"
|
||
|
|
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import json
|
||
|
|
import time
|
||
|
|
import hashlib
|
||
|
|
import struct
|
||
|
|
import shutil
|
||
|
|
import subprocess
|
||
|
|
from pathlib import Path
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from typing import Dict, List, Optional, Any, Tuple
|
||
|
|
|
||
|
|
try:
|
||
|
|
from core.paths import find_tool, get_data_dir
|
||
|
|
except ImportError:
|
||
|
|
def find_tool(name):
|
||
|
|
return shutil.which(name)
|
||
|
|
def get_data_dir():
|
||
|
|
return str(Path(__file__).parent.parent / 'data')
|
||
|
|
|
||
|
|
# Optional imports
|
||
|
|
try:
|
||
|
|
from PIL import Image as PILImage
|
||
|
|
from PIL.ExifTags import TAGS, GPSTAGS
|
||
|
|
HAS_PIL = True
|
||
|
|
except ImportError:
|
||
|
|
HAS_PIL = False
|
||
|
|
|
||
|
|
|
||
|
|
# ── File Signatures for Carving ──────────────────────────────────────────────
|
||
|
|
|
||
|
|
FILE_SIGNATURES = [
|
||
|
|
{'name': 'JPEG', 'ext': '.jpg', 'magic': b'\xFF\xD8\xFF', 'footer': b'\xFF\xD9', 'max_size': 50*1024*1024},
|
||
|
|
{'name': 'PNG', 'ext': '.png', 'magic': b'\x89PNG\r\n\x1a\n', 'footer': b'IEND\xAE\x42\x60\x82', 'max_size': 50*1024*1024},
|
||
|
|
{'name': 'GIF', 'ext': '.gif', 'magic': b'GIF8', 'footer': b'\x00\x3B', 'max_size': 20*1024*1024},
|
||
|
|
{'name': 'PDF', 'ext': '.pdf', 'magic': b'%PDF', 'footer': b'%%EOF', 'max_size': 100*1024*1024},
|
||
|
|
{'name': 'ZIP', 'ext': '.zip', 'magic': b'PK\x03\x04', 'footer': None, 'max_size': 500*1024*1024},
|
||
|
|
{'name': 'RAR', 'ext': '.rar', 'magic': b'Rar!\x1a\x07', 'footer': None, 'max_size': 500*1024*1024},
|
||
|
|
{'name': 'ELF', 'ext': '.elf', 'magic': b'\x7fELF', 'footer': None, 'max_size': 100*1024*1024},
|
||
|
|
{'name': 'PE/EXE', 'ext': '.exe', 'magic': b'MZ', 'footer': None, 'max_size': 100*1024*1024},
|
||
|
|
{'name': 'SQLite', 'ext': '.sqlite', 'magic': b'SQLite format 3\x00', 'footer': None, 'max_size': 500*1024*1024},
|
||
|
|
{'name': 'DOCX', 'ext': '.docx', 'magic': b'PK\x03\x04', 'footer': None, 'max_size': 100*1024*1024},
|
||
|
|
{'name': '7z', 'ext': '.7z', 'magic': b"7z\xBC\xAF'\x1C", 'footer': None, 'max_size': 500*1024*1024},
|
||
|
|
{'name': 'BMP', 'ext': '.bmp', 'magic': b'BM', 'footer': None, 'max_size': 50*1024*1024},
|
||
|
|
{'name': 'MP3', 'ext': '.mp3', 'magic': b'\xFF\xFB', 'footer': None, 'max_size': 50*1024*1024},
|
||
|
|
{'name': 'MP4', 'ext': '.mp4', 'magic': b'\x00\x00\x00\x18ftyp', 'footer': None, 'max_size': 1024*1024*1024},
|
||
|
|
{'name': 'AVI', 'ext': '.avi', 'magic': b'RIFF', 'footer': None, 'max_size': 1024*1024*1024},
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
# ── Chain of Custody Logger ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class CustodyLog:
|
||
|
|
"""Chain of custody logging for forensic evidence."""
|
||
|
|
|
||
|
|
def __init__(self, data_dir: str):
|
||
|
|
self.log_file = os.path.join(data_dir, 'custody_log.json')
|
||
|
|
self.entries: List[Dict] = []
|
||
|
|
self._load()
|
||
|
|
|
||
|
|
def _load(self):
|
||
|
|
if os.path.exists(self.log_file):
|
||
|
|
try:
|
||
|
|
with open(self.log_file) as f:
|
||
|
|
self.entries = json.load(f)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def _save(self):
|
||
|
|
with open(self.log_file, 'w') as f:
|
||
|
|
json.dump(self.entries, f, indent=2)
|
||
|
|
|
||
|
|
def log(self, action: str, target: str, details: str = "",
|
||
|
|
evidence_hash: str = "") -> Dict:
|
||
|
|
"""Log a forensic action."""
|
||
|
|
entry = {
|
||
|
|
'id': len(self.entries) + 1,
|
||
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
||
|
|
'action': action,
|
||
|
|
'target': target,
|
||
|
|
'details': details,
|
||
|
|
'evidence_hash': evidence_hash,
|
||
|
|
'user': os.getenv('USER', os.getenv('USERNAME', 'unknown'))
|
||
|
|
}
|
||
|
|
self.entries.append(entry)
|
||
|
|
self._save()
|
||
|
|
return entry
|
||
|
|
|
||
|
|
def get_log(self) -> List[Dict]:
|
||
|
|
return self.entries
|
||
|
|
|
||
|
|
|
||
|
|
# ── Forensics Engine ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class ForensicsEngine:
|
||
|
|
"""Digital forensics toolkit."""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.data_dir = os.path.join(get_data_dir(), 'forensics')
|
||
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
||
|
|
self.evidence_dir = os.path.join(self.data_dir, 'evidence')
|
||
|
|
os.makedirs(self.evidence_dir, exist_ok=True)
|
||
|
|
self.carved_dir = os.path.join(self.data_dir, 'carved')
|
||
|
|
os.makedirs(self.carved_dir, exist_ok=True)
|
||
|
|
self.custody = CustodyLog(self.data_dir)
|
||
|
|
self.dd = find_tool('dd') or shutil.which('dd')
|
||
|
|
|
||
|
|
# ── Hash Verification ────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def hash_file(self, filepath: str, algorithms: List[str] = None) -> Dict:
|
||
|
|
"""Calculate file hashes for evidence integrity."""
|
||
|
|
algorithms = algorithms or ['md5', 'sha1', 'sha256']
|
||
|
|
|
||
|
|
if not os.path.exists(filepath):
|
||
|
|
return {'ok': False, 'error': 'File not found'}
|
||
|
|
|
||
|
|
try:
|
||
|
|
hashers = {alg: hashlib.new(alg) for alg in algorithms}
|
||
|
|
file_size = os.path.getsize(filepath)
|
||
|
|
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
while True:
|
||
|
|
chunk = f.read(8192)
|
||
|
|
if not chunk:
|
||
|
|
break
|
||
|
|
for h in hashers.values():
|
||
|
|
h.update(chunk)
|
||
|
|
|
||
|
|
hashes = {alg: h.hexdigest() for alg, h in hashers.items()}
|
||
|
|
|
||
|
|
self.custody.log('hash_verify', filepath,
|
||
|
|
f'Hashes: {", ".join(f"{k}={v[:16]}..." for k, v in hashes.items())}',
|
||
|
|
hashes.get('sha256', ''))
|
||
|
|
|
||
|
|
return {
|
||
|
|
'ok': True, 'file': filepath,
|
||
|
|
'size': file_size, 'hashes': hashes
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
return {'ok': False, 'error': str(e)}
|
||
|
|
|
||
|
|
def verify_hash(self, filepath: str, expected_hash: str,
|
||
|
|
algorithm: str = None) -> Dict:
|
||
|
|
"""Verify file against expected hash."""
|
||
|
|
# Auto-detect algorithm from hash length
|
||
|
|
if not algorithm:
|
||
|
|
hash_len = len(expected_hash)
|
||
|
|
algorithm = {32: 'md5', 40: 'sha1', 64: 'sha256', 128: 'sha512'}.get(hash_len)
|
||
|
|
if not algorithm:
|
||
|
|
return {'ok': False, 'error': f'Cannot detect algorithm for hash length {hash_len}'}
|
||
|
|
|
||
|
|
result = self.hash_file(filepath, [algorithm])
|
||
|
|
if not result['ok']:
|
||
|
|
return result
|
||
|
|
|
||
|
|
actual = result['hashes'][algorithm]
|
||
|
|
match = actual.lower() == expected_hash.lower()
|
||
|
|
|
||
|
|
self.custody.log('hash_verify', filepath,
|
||
|
|
f'Expected: {expected_hash[:16]}... Match: {match}')
|
||
|
|
|
||
|
|
return {
|
||
|
|
'ok': True, 'match': match,
|
||
|
|
'algorithm': algorithm,
|
||
|
|
'expected': expected_hash,
|
||
|
|
'actual': actual,
|
||
|
|
'file': filepath
|
||
|
|
}
|
||
|
|
|
||
|
|
# ── Disk Imaging ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def create_image(self, source: str, output: str = None,
|
||
|
|
block_size: int = 4096) -> Dict:
|
||
|
|
"""Create forensic disk image using dd."""
|
||
|
|
if not self.dd:
|
||
|
|
return {'ok': False, 'error': 'dd not found'}
|
||
|
|
|
||
|
|
if not output:
|
||
|
|
name = Path(source).name.replace('/', '_')
|
||
|
|
output = os.path.join(self.evidence_dir, f'{name}_{int(time.time())}.img')
|
||
|
|
|
||
|
|
self.custody.log('disk_image', source, f'Creating image: {output}')
|
||
|
|
|
||
|
|
try:
|
||
|
|
result = subprocess.run(
|
||
|
|
[self.dd, f'if={source}', f'of={output}', f'bs={block_size}',
|
||
|
|
'conv=noerror,sync', 'status=progress'],
|
||
|
|
capture_output=True, text=True, timeout=3600
|
||
|
|
)
|
||
|
|
|
||
|
|
if os.path.exists(output):
|
||
|
|
# Hash the image
|
||
|
|
hashes = self.hash_file(output, ['md5', 'sha256'])
|
||
|
|
|
||
|
|
self.custody.log('disk_image_complete', output,
|
||
|
|
f'Image created, SHA256: {hashes.get("hashes", {}).get("sha256", "?")}')
|
||
|
|
|
||
|
|
return {
|
||
|
|
'ok': True, 'source': source, 'output': output,
|
||
|
|
'size': os.path.getsize(output),
|
||
|
|
'hashes': hashes.get('hashes', {}),
|
||
|
|
'dd_output': result.stderr
|
||
|
|
}
|
||
|
|
return {'ok': False, 'error': 'Image file not created', 'stderr': result.stderr}
|
||
|
|
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
return {'ok': False, 'error': 'Imaging timed out (1hr limit)'}
|
||
|
|
except Exception as e:
|
||
|
|
return {'ok': False, 'error': str(e)}
|
||
|
|
|
||
|
|
# ── File Carving ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def carve_files(self, source: str, file_types: List[str] = None,
|
||
|
|
max_files: int = 100) -> Dict:
|
||
|
|
"""Recover files from raw data by magic byte signatures."""
|
||
|
|
if not os.path.exists(source):
|
||
|
|
return {'ok': False, 'error': 'Source file not found'}
|
||
|
|
|
||
|
|
self.custody.log('file_carving', source, f'Starting carve, types={file_types}')
|
||
|
|
|
||
|
|
# Filter signatures
|
||
|
|
sigs = FILE_SIGNATURES
|
||
|
|
if file_types:
|
||
|
|
type_set = {t.lower() for t in file_types}
|
||
|
|
sigs = [s for s in sigs if s['name'].lower() in type_set or
|
||
|
|
s['ext'].lstrip('.').lower() in type_set]
|
||
|
|
|
||
|
|
carved = []
|
||
|
|
file_size = os.path.getsize(source)
|
||
|
|
chunk_size = 1024 * 1024 # 1MB chunks
|
||
|
|
|
||
|
|
try:
|
||
|
|
with open(source, 'rb') as f:
|
||
|
|
offset = 0
|
||
|
|
while offset < file_size and len(carved) < max_files:
|
||
|
|
f.seek(offset)
|
||
|
|
chunk = f.read(chunk_size)
|
||
|
|
if not chunk:
|
||
|
|
break
|
||
|
|
|
||
|
|
for sig in sigs:
|
||
|
|
pos = 0
|
||
|
|
while pos < len(chunk) and len(carved) < max_files:
|
||
|
|
idx = chunk.find(sig['magic'], pos)
|
||
|
|
if idx == -1:
|
||
|
|
break
|
||
|
|
|
||
|
|
abs_offset = offset + idx
|
||
|
|
# Try to find file end
|
||
|
|
file_end = abs_offset + sig['max_size']
|
||
|
|
if sig['footer']:
|
||
|
|
f.seek(abs_offset)
|
||
|
|
search_data = f.read(min(sig['max_size'], file_size - abs_offset))
|
||
|
|
footer_pos = search_data.find(sig['footer'], len(sig['magic']))
|
||
|
|
if footer_pos != -1:
|
||
|
|
file_end = abs_offset + footer_pos + len(sig['footer'])
|
||
|
|
|
||
|
|
# Extract file
|
||
|
|
extract_size = min(file_end - abs_offset, sig['max_size'])
|
||
|
|
f.seek(abs_offset)
|
||
|
|
file_data = f.read(extract_size)
|
||
|
|
|
||
|
|
# Save carved file
|
||
|
|
carved_name = f'carved_{len(carved):04d}_{sig["name"]}{sig["ext"]}'
|
||
|
|
carved_path = os.path.join(self.carved_dir, carved_name)
|
||
|
|
with open(carved_path, 'wb') as cf:
|
||
|
|
cf.write(file_data)
|
||
|
|
|
||
|
|
file_hash = hashlib.md5(file_data).hexdigest()
|
||
|
|
carved.append({
|
||
|
|
'name': carved_name,
|
||
|
|
'path': carved_path,
|
||
|
|
'type': sig['name'],
|
||
|
|
'offset': abs_offset,
|
||
|
|
'size': len(file_data),
|
||
|
|
'md5': file_hash
|
||
|
|
})
|
||
|
|
|
||
|
|
pos = idx + len(sig['magic'])
|
||
|
|
|
||
|
|
offset += chunk_size - max(len(s['magic']) for s in sigs)
|
||
|
|
|
||
|
|
self.custody.log('file_carving_complete', source,
|
||
|
|
f'Carved {len(carved)} files')
|
||
|
|
|
||
|
|
return {
|
||
|
|
'ok': True, 'source': source,
|
||
|
|
'carved': carved, 'count': len(carved),
|
||
|
|
'output_dir': self.carved_dir
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
return {'ok': False, 'error': str(e)}
|
||
|
|
|
||
|
|
# ── Metadata Extraction ──────────────────────────────────────────────
|
||
|
|
|
||
|
|
def extract_metadata(self, filepath: str) -> Dict:
|
||
|
|
"""Extract metadata from files (EXIF, PDF, Office, etc.)."""
|
||
|
|
if not os.path.exists(filepath):
|
||
|
|
return {'ok': False, 'error': 'File not found'}
|
||
|
|
|
||
|
|
ext = Path(filepath).suffix.lower()
|
||
|
|
metadata = {
|
||
|
|
'file': filepath,
|
||
|
|
'name': Path(filepath).name,
|
||
|
|
'size': os.path.getsize(filepath),
|
||
|
|
'created': datetime.fromtimestamp(os.path.getctime(filepath), timezone.utc).isoformat(),
|
||
|
|
'modified': datetime.fromtimestamp(os.path.getmtime(filepath), timezone.utc).isoformat(),
|
||
|
|
'accessed': datetime.fromtimestamp(os.path.getatime(filepath), timezone.utc).isoformat(),
|
||
|
|
}
|
||
|
|
|
||
|
|
# EXIF for images
|
||
|
|
if ext in ('.jpg', '.jpeg', '.tiff', '.tif', '.png') and HAS_PIL:
|
||
|
|
try:
|
||
|
|
img = PILImage.open(filepath)
|
||
|
|
metadata['image'] = {
|
||
|
|
'width': img.size[0], 'height': img.size[1],
|
||
|
|
'format': img.format, 'mode': img.mode
|
||
|
|
}
|
||
|
|
exif = img._getexif()
|
||
|
|
if exif:
|
||
|
|
exif_data = {}
|
||
|
|
gps_data = {}
|
||
|
|
for tag_id, value in exif.items():
|
||
|
|
tag = TAGS.get(tag_id, tag_id)
|
||
|
|
if tag == 'GPSInfo':
|
||
|
|
for gps_id, gps_val in value.items():
|
||
|
|
gps_tag = GPSTAGS.get(gps_id, gps_id)
|
||
|
|
gps_data[str(gps_tag)] = str(gps_val)
|
||
|
|
else:
|
||
|
|
# Convert bytes to string for JSON serialization
|
||
|
|
if isinstance(value, bytes):
|
||
|
|
try:
|
||
|
|
value = value.decode('utf-8', errors='replace')
|
||
|
|
except Exception:
|
||
|
|
value = value.hex()
|
||
|
|
exif_data[str(tag)] = str(value)
|
||
|
|
metadata['exif'] = exif_data
|
||
|
|
if gps_data:
|
||
|
|
metadata['gps'] = gps_data
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# PDF metadata
|
||
|
|
elif ext == '.pdf':
|
||
|
|
try:
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
content = f.read(4096)
|
||
|
|
# Extract info dict
|
||
|
|
for key in [b'/Title', b'/Author', b'/Subject', b'/Creator',
|
||
|
|
b'/Producer', b'/CreationDate', b'/ModDate']:
|
||
|
|
pattern = key + rb'\s*\(([^)]*)\)'
|
||
|
|
m = re.search(pattern, content)
|
||
|
|
if m:
|
||
|
|
k = key.decode().lstrip('/')
|
||
|
|
metadata.setdefault('pdf', {})[k] = m.group(1).decode('utf-8', errors='replace')
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Generic file header
|
||
|
|
try:
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
header = f.read(16)
|
||
|
|
metadata['magic_bytes'] = header.hex()
|
||
|
|
for sig in FILE_SIGNATURES:
|
||
|
|
if header.startswith(sig['magic']):
|
||
|
|
metadata['detected_type'] = sig['name']
|
||
|
|
break
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
self.custody.log('metadata_extract', filepath, f'Type: {metadata.get("detected_type", "unknown")}')
|
||
|
|
|
||
|
|
return {'ok': True, **metadata}
|
||
|
|
|
||
|
|
# ── Timeline Builder ─────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def build_timeline(self, directory: str, recursive: bool = True,
|
||
|
|
max_entries: int = 10000) -> Dict:
|
||
|
|
"""Build filesystem timeline from directory metadata."""
|
||
|
|
if not os.path.exists(directory):
|
||
|
|
return {'ok': False, 'error': 'Directory not found'}
|
||
|
|
|
||
|
|
events = []
|
||
|
|
count = 0
|
||
|
|
|
||
|
|
walk_fn = os.walk if recursive else lambda d: [(d, [], os.listdir(d))]
|
||
|
|
for root, dirs, files in walk_fn(directory):
|
||
|
|
for name in files:
|
||
|
|
if count >= max_entries:
|
||
|
|
break
|
||
|
|
filepath = os.path.join(root, name)
|
||
|
|
try:
|
||
|
|
stat = os.stat(filepath)
|
||
|
|
events.append({
|
||
|
|
'type': 'modified',
|
||
|
|
'timestamp': datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat(),
|
||
|
|
'epoch': stat.st_mtime,
|
||
|
|
'file': filepath,
|
||
|
|
'size': stat.st_size
|
||
|
|
})
|
||
|
|
events.append({
|
||
|
|
'type': 'created',
|
||
|
|
'timestamp': datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat(),
|
||
|
|
'epoch': stat.st_ctime,
|
||
|
|
'file': filepath,
|
||
|
|
'size': stat.st_size
|
||
|
|
})
|
||
|
|
events.append({
|
||
|
|
'type': 'accessed',
|
||
|
|
'timestamp': datetime.fromtimestamp(stat.st_atime, timezone.utc).isoformat(),
|
||
|
|
'epoch': stat.st_atime,
|
||
|
|
'file': filepath,
|
||
|
|
'size': stat.st_size
|
||
|
|
})
|
||
|
|
count += 1
|
||
|
|
except (OSError, PermissionError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Sort by timestamp
|
||
|
|
events.sort(key=lambda e: e['epoch'])
|
||
|
|
|
||
|
|
self.custody.log('timeline_build', directory,
|
||
|
|
f'{count} files, {len(events)} events')
|
||
|
|
|
||
|
|
return {
|
||
|
|
'ok': True, 'directory': directory,
|
||
|
|
'events': events, 'event_count': len(events),
|
||
|
|
'file_count': count
|
||
|
|
}
|
||
|
|
|
||
|
|
# ── Evidence Management ──────────────────────────────────────────────
|
||
|
|
|
||
|
|
def list_evidence(self) -> List[Dict]:
|
||
|
|
"""List evidence files."""
|
||
|
|
evidence = []
|
||
|
|
edir = Path(self.evidence_dir)
|
||
|
|
for f in sorted(edir.iterdir()):
|
||
|
|
if f.is_file():
|
||
|
|
evidence.append({
|
||
|
|
'name': f.name,
|
||
|
|
'path': str(f),
|
||
|
|
'size': f.stat().st_size,
|
||
|
|
'modified': datetime.fromtimestamp(f.stat().st_mtime, timezone.utc).isoformat()
|
||
|
|
})
|
||
|
|
return evidence
|
||
|
|
|
||
|
|
def list_carved(self) -> List[Dict]:
|
||
|
|
"""List carved files."""
|
||
|
|
carved = []
|
||
|
|
cdir = Path(self.carved_dir)
|
||
|
|
for f in sorted(cdir.iterdir()):
|
||
|
|
if f.is_file():
|
||
|
|
carved.append({
|
||
|
|
'name': f.name,
|
||
|
|
'path': str(f),
|
||
|
|
'size': f.stat().st_size
|
||
|
|
})
|
||
|
|
return carved
|
||
|
|
|
||
|
|
def get_custody_log(self) -> List[Dict]:
|
||
|
|
"""Get chain of custody log."""
|
||
|
|
return self.custody.get_log()
|
||
|
|
|
||
|
|
|
||
|
|
# ── Singleton ────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
_instance = None
|
||
|
|
|
||
|
|
def get_forensics() -> ForensicsEngine:
|
||
|
|
global _instance
|
||
|
|
if _instance is None:
|
||
|
|
_instance = ForensicsEngine()
|
||
|
|
return _instance
|
||
|
|
|
||
|
|
|
||
|
|
# ── CLI Interface ────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def run():
|
||
|
|
"""CLI entry point for Forensics module."""
|
||
|
|
engine = get_forensics()
|
||
|
|
|
||
|
|
while True:
|
||
|
|
print(f"\n{'='*60}")
|
||
|
|
print(f" Digital Forensics Toolkit")
|
||
|
|
print(f"{'='*60}")
|
||
|
|
print()
|
||
|
|
print(" 1 — Hash File (integrity verification)")
|
||
|
|
print(" 2 — Verify Hash")
|
||
|
|
print(" 3 — Create Disk Image")
|
||
|
|
print(" 4 — Carve Files (recover deleted)")
|
||
|
|
print(" 5 — Extract Metadata (EXIF/PDF/headers)")
|
||
|
|
print(" 6 — Build Timeline")
|
||
|
|
print(" 7 — List Evidence")
|
||
|
|
print(" 8 — List Carved Files")
|
||
|
|
print(" 9 — Chain of Custody Log")
|
||
|
|
print(" 0 — Back")
|
||
|
|
print()
|
||
|
|
|
||
|
|
choice = input(" > ").strip()
|
||
|
|
|
||
|
|
if choice == '0':
|
||
|
|
break
|
||
|
|
elif choice == '1':
|
||
|
|
filepath = input(" File path: ").strip()
|
||
|
|
if filepath:
|
||
|
|
result = engine.hash_file(filepath)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" Size: {result['size']} bytes")
|
||
|
|
for alg, h in result['hashes'].items():
|
||
|
|
print(f" {alg.upper()}: {h}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '2':
|
||
|
|
filepath = input(" File path: ").strip()
|
||
|
|
expected = input(" Expected hash: ").strip()
|
||
|
|
if filepath and expected:
|
||
|
|
result = engine.verify_hash(filepath, expected)
|
||
|
|
if result['ok']:
|
||
|
|
status = 'MATCH' if result['match'] else 'MISMATCH'
|
||
|
|
print(f" {status} ({result['algorithm'].upper()})")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '3':
|
||
|
|
source = input(" Source device/file: ").strip()
|
||
|
|
output = input(" Output path (blank=auto): ").strip() or None
|
||
|
|
if source:
|
||
|
|
result = engine.create_image(source, output)
|
||
|
|
if result['ok']:
|
||
|
|
mb = result['size'] / (1024*1024)
|
||
|
|
print(f" Image created: {result['output']} ({mb:.1f} MB)")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '4':
|
||
|
|
source = input(" Source file/image: ").strip()
|
||
|
|
types = input(" File types (blank=all, comma-sep): ").strip()
|
||
|
|
if source:
|
||
|
|
file_types = [t.strip() for t in types.split(',')] if types else None
|
||
|
|
result = engine.carve_files(source, file_types)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" Carved {result['count']} files to {result['output_dir']}")
|
||
|
|
for c in result['carved'][:10]:
|
||
|
|
print(f" {c['name']} {c['type']} {c['size']} bytes offset={c['offset']}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '5':
|
||
|
|
filepath = input(" File path: ").strip()
|
||
|
|
if filepath:
|
||
|
|
result = engine.extract_metadata(filepath)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" Name: {result['name']}")
|
||
|
|
print(f" Size: {result['size']}")
|
||
|
|
print(f" Type: {result.get('detected_type', 'unknown')}")
|
||
|
|
if 'exif' in result:
|
||
|
|
print(f" EXIF entries: {len(result['exif'])}")
|
||
|
|
for k, v in list(result['exif'].items())[:5]:
|
||
|
|
print(f" {k}: {v[:50]}")
|
||
|
|
if 'gps' in result:
|
||
|
|
print(f" GPS data: {result['gps']}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '6':
|
||
|
|
directory = input(" Directory path: ").strip()
|
||
|
|
if directory:
|
||
|
|
result = engine.build_timeline(directory)
|
||
|
|
if result['ok']:
|
||
|
|
print(f" {result['file_count']} files, {result['event_count']} events")
|
||
|
|
for e in result['events'][:10]:
|
||
|
|
print(f" {e['timestamp']} {e['type']:<10} {Path(e['file']).name}")
|
||
|
|
else:
|
||
|
|
print(f" Error: {result['error']}")
|
||
|
|
elif choice == '7':
|
||
|
|
for e in engine.list_evidence():
|
||
|
|
mb = e['size'] / (1024*1024)
|
||
|
|
print(f" {e['name']} ({mb:.1f} MB)")
|
||
|
|
elif choice == '8':
|
||
|
|
for c in engine.list_carved():
|
||
|
|
print(f" {c['name']} ({c['size']} bytes)")
|
||
|
|
elif choice == '9':
|
||
|
|
log = engine.get_custody_log()
|
||
|
|
print(f" {len(log)} entries:")
|
||
|
|
for entry in log[-10:]:
|
||
|
|
print(f" [{entry['timestamp'][:19]}] {entry['action']}: {entry['target']}")
|