Autarch/modules/steganography.py

770 lines
28 KiB
Python
Raw Permalink Normal View History

"""AUTARCH Steganography
Image/audio/document steganography hide data in carrier files using LSB
encoding, DCT domain embedding, and whitespace encoding. Includes detection
via statistical analysis and optional AES-256 encryption.
"""
DESCRIPTION = "Steganography — hide & extract data in files"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "counter"
import os
import io
import re
import json
import struct
import hashlib
import secrets
from pathlib import Path
from typing import Dict, List, Optional, Tuple
try:
from core.paths import get_data_dir
except ImportError:
def get_data_dir():
return str(Path(__file__).parent.parent / 'data')
# Optional imports
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
try:
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
HAS_CRYPTO = True
except ImportError:
try:
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
try:
import wave
HAS_WAVE = True
except ImportError:
HAS_WAVE = False
# ── Encryption Layer ─────────────────────────────────────────────────────────
def _derive_key(password: str) -> bytes:
"""Derive 256-bit key from password."""
return hashlib.sha256(password.encode()).digest()
def _encrypt_data(data: bytes, password: str) -> bytes:
"""AES-256-CBC encrypt data."""
if not HAS_CRYPTO:
return data
key = _derive_key(password)
iv = secrets.token_bytes(16)
cipher = AES.new(key, AES.MODE_CBC, iv)
ct = cipher.encrypt(pad(data, AES.block_size))
return iv + ct
def _decrypt_data(data: bytes, password: str) -> bytes:
"""AES-256-CBC decrypt data."""
if not HAS_CRYPTO:
return data
key = _derive_key(password)
iv = data[:16]
ct = data[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
return unpad(cipher.decrypt(ct), AES.block_size)
# ── LSB Image Steganography ──────────────────────────────────────────────────
class ImageStego:
"""LSB steganography for PNG/BMP images."""
MAGIC = b'ASTS' # AUTARCH Stego Signature
@staticmethod
def capacity(image_path: str) -> Dict:
"""Calculate maximum payload capacity in bytes."""
if not HAS_PIL:
return {'ok': False, 'error': 'Pillow (PIL) not installed'}
try:
img = Image.open(image_path)
w, h = img.size
channels = len(img.getbands())
# 1 bit per channel per pixel, minus header
total_bits = w * h * channels
total_bytes = total_bits // 8 - 8 # subtract header (magic + length)
return {
'ok': True, 'capacity_bytes': max(0, total_bytes),
'width': w, 'height': h, 'channels': channels,
'format': img.format
}
except Exception as e:
return {'ok': False, 'error': str(e)}
@staticmethod
def hide(image_path: str, data: bytes, output_path: str,
password: str = None, bits_per_channel: int = 1) -> Dict:
"""Hide data in image using LSB encoding."""
if not HAS_PIL:
return {'ok': False, 'error': 'Pillow (PIL) not installed'}
try:
img = Image.open(image_path).convert('RGB')
pixels = list(img.getdata())
w, h = img.size
# Encrypt if password provided
payload = data
if password:
payload = _encrypt_data(data, password)
# Build header: magic(4) + length(4) + payload
header = ImageStego.MAGIC + struct.pack('>I', len(payload))
full_data = header + payload
# Convert to bits
bits = []
for byte in full_data:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
# Check capacity
max_bits = len(pixels) * 3 * bits_per_channel
if len(bits) > max_bits:
return {'ok': False, 'error': f'Data too large ({len(full_data)} bytes). '
f'Max capacity: {max_bits // 8} bytes'}
# Encode bits into LSB
bit_idx = 0
new_pixels = []
mask = ~((1 << bits_per_channel) - 1) & 0xFF
for pixel in pixels:
new_pixel = []
for channel_val in pixel:
if bit_idx < len(bits):
# Clear LSBs and set new value
new_val = (channel_val & mask) | bits[bit_idx]
new_pixel.append(new_val)
bit_idx += 1
else:
new_pixel.append(channel_val)
new_pixels.append(tuple(new_pixel))
# Save
stego_img = Image.new('RGB', (w, h))
stego_img.putdata(new_pixels)
stego_img.save(output_path, 'PNG')
return {
'ok': True,
'output': output_path,
'hidden_bytes': len(payload),
'encrypted': password is not None,
'message': f'Hidden {len(payload)} bytes in {output_path}'
}
except Exception as e:
return {'ok': False, 'error': str(e)}
@staticmethod
def extract(image_path: str, password: str = None,
bits_per_channel: int = 1) -> Dict:
"""Extract hidden data from image."""
if not HAS_PIL:
return {'ok': False, 'error': 'Pillow (PIL) not installed'}
try:
img = Image.open(image_path).convert('RGB')
pixels = list(img.getdata())
# Extract all LSBs
bits = []
for pixel in pixels:
for channel_val in pixel:
bits.append(channel_val & 1)
# Convert bits to bytes
all_bytes = bytearray()
for i in range(0, len(bits) - 7, 8):
byte = 0
for j in range(8):
byte = (byte << 1) | bits[i + j]
all_bytes.append(byte)
# Check magic
if all_bytes[:4] != ImageStego.MAGIC:
return {'ok': False, 'error': 'No hidden data found (magic mismatch)'}
# Read length
payload_len = struct.unpack('>I', bytes(all_bytes[4:8]))[0]
if payload_len > len(all_bytes) - 8:
return {'ok': False, 'error': 'Corrupted data (length exceeds image capacity)'}
payload = bytes(all_bytes[8:8 + payload_len])
# Decrypt if password provided
if password:
try:
payload = _decrypt_data(payload, password)
except Exception:
return {'ok': False, 'error': 'Decryption failed (wrong password?)'}
return {
'ok': True,
'data': payload,
'size': len(payload),
'encrypted': password is not None,
'message': f'Extracted {len(payload)} bytes'
}
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── Audio Steganography ──────────────────────────────────────────────────────
class AudioStego:
"""LSB steganography for WAV audio files."""
MAGIC = b'ASTS'
@staticmethod
def capacity(audio_path: str) -> Dict:
"""Calculate maximum payload capacity."""
if not HAS_WAVE:
return {'ok': False, 'error': 'wave module not available'}
try:
with wave.open(audio_path, 'rb') as w:
frames = w.getnframes()
channels = w.getnchannels()
sample_width = w.getsampwidth()
total_bytes = (frames * channels) // 8 - 8
return {
'ok': True, 'capacity_bytes': max(0, total_bytes),
'frames': frames, 'channels': channels,
'sample_width': sample_width,
'framerate': w.getframerate()
}
except Exception as e:
return {'ok': False, 'error': str(e)}
@staticmethod
def hide(audio_path: str, data: bytes, output_path: str,
password: str = None) -> Dict:
"""Hide data in WAV audio using LSB of samples."""
if not HAS_WAVE:
return {'ok': False, 'error': 'wave module not available'}
try:
with wave.open(audio_path, 'rb') as w:
params = w.getparams()
frames = w.readframes(w.getnframes())
payload = data
if password:
payload = _encrypt_data(data, password)
header = AudioStego.MAGIC + struct.pack('>I', len(payload))
full_data = header + payload
bits = []
for byte in full_data:
for i in range(7, -1, -1):
bits.append((byte >> i) & 1)
samples = list(frames)
if len(bits) > len(samples):
return {'ok': False, 'error': f'Data too large. Max: {len(samples) // 8} bytes'}
for i, bit in enumerate(bits):
samples[i] = (samples[i] & 0xFE) | bit
with wave.open(output_path, 'wb') as w:
w.setparams(params)
w.writeframes(bytes(samples))
return {
'ok': True, 'output': output_path,
'hidden_bytes': len(payload),
'encrypted': password is not None
}
except Exception as e:
return {'ok': False, 'error': str(e)}
@staticmethod
def extract(audio_path: str, password: str = None) -> Dict:
"""Extract hidden data from WAV audio."""
if not HAS_WAVE:
return {'ok': False, 'error': 'wave module not available'}
try:
with wave.open(audio_path, 'rb') as w:
frames = w.readframes(w.getnframes())
samples = list(frames)
bits = [s & 1 for s in samples]
all_bytes = bytearray()
for i in range(0, len(bits) - 7, 8):
byte = 0
for j in range(8):
byte = (byte << 1) | bits[i + j]
all_bytes.append(byte)
if all_bytes[:4] != AudioStego.MAGIC:
return {'ok': False, 'error': 'No hidden data found'}
payload_len = struct.unpack('>I', bytes(all_bytes[4:8]))[0]
payload = bytes(all_bytes[8:8 + payload_len])
if password:
try:
payload = _decrypt_data(payload, password)
except Exception:
return {'ok': False, 'error': 'Decryption failed'}
return {'ok': True, 'data': payload, 'size': len(payload)}
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── Document Steganography ───────────────────────────────────────────────────
class DocumentStego:
"""Whitespace and metadata steganography for text/documents."""
@staticmethod
def hide_whitespace(text: str, data: bytes, password: str = None) -> Dict:
"""Hide data using zero-width characters in text."""
payload = data
if password:
payload = _encrypt_data(data, password)
# Zero-width characters
ZWS = '\u200b' # zero-width space → 0
ZWNJ = '\u200c' # zero-width non-joiner → 1
ZWJ = '\u200d' # zero-width joiner → separator
# Convert payload to binary string
bits = ''.join(f'{byte:08b}' for byte in payload)
encoded = ''
for bit in bits:
encoded += ZWNJ if bit == '1' else ZWS
# Insert length prefix
length_bits = f'{len(payload):032b}'
length_encoded = ''
for bit in length_bits:
length_encoded += ZWNJ if bit == '1' else ZWS
hidden = length_encoded + ZWJ + encoded
# Insert after first line
lines = text.split('\n', 1)
if len(lines) > 1:
result = lines[0] + hidden + '\n' + lines[1]
else:
result = text + hidden
return {
'ok': True, 'text': result,
'hidden_bytes': len(payload),
'encrypted': password is not None
}
@staticmethod
def extract_whitespace(text: str, password: str = None) -> Dict:
"""Extract data hidden in zero-width characters."""
ZWS = '\u200b'
ZWNJ = '\u200c'
ZWJ = '\u200d'
# Find zero-width characters
zw_chars = ''.join(c for c in text if c in (ZWS, ZWNJ, ZWJ))
if ZWJ not in zw_chars:
return {'ok': False, 'error': 'No hidden data found'}
length_part, data_part = zw_chars.split(ZWJ, 1)
# Decode length
length_bits = ''.join('1' if c == ZWNJ else '0' for c in length_part)
if len(length_bits) < 32:
return {'ok': False, 'error': 'Corrupted header'}
payload_len = int(length_bits[:32], 2)
# Decode data
data_bits = ''.join('1' if c == ZWNJ else '0' for c in data_part)
payload = bytearray()
for i in range(0, min(len(data_bits), payload_len * 8), 8):
if i + 8 <= len(data_bits):
payload.append(int(data_bits[i:i+8], 2))
result_data = bytes(payload)
if password:
try:
result_data = _decrypt_data(result_data, password)
except Exception:
return {'ok': False, 'error': 'Decryption failed'}
return {'ok': True, 'data': result_data, 'size': len(result_data)}
# ── Detection / Analysis ────────────────────────────────────────────────────
class StegoDetector:
"""Statistical analysis to detect hidden data in files."""
@staticmethod
def analyze_image(image_path: str) -> Dict:
"""Analyze image for signs of steganography."""
if not HAS_PIL:
return {'ok': False, 'error': 'Pillow (PIL) not installed'}
try:
img = Image.open(image_path).convert('RGB')
pixels = list(img.getdata())
w, h = img.size
# Chi-square analysis on LSBs
observed = [0, 0] # count of 0s and 1s in R channel LSBs
for pixel in pixels:
observed[pixel[0] & 1] += 1
total = sum(observed)
expected = total / 2
chi_sq = sum((o - expected) ** 2 / expected for o in observed)
# RS analysis (Regular-Singular groups)
# Count pixel pairs where LSB flip changes smoothness
regular = 0
singular = 0
for i in range(0, len(pixels) - 1, 2):
p1, p2 = pixels[i][0], pixels[i+1][0]
diff_orig = abs(p1 - p2)
diff_flip = abs((p1 ^ 1) - p2)
if diff_flip > diff_orig:
regular += 1
elif diff_flip < diff_orig:
singular += 1
total_pairs = regular + singular
rs_ratio = regular / total_pairs if total_pairs > 0 else 0.5
# Check for ASTS magic in LSBs
bits = []
for pixel in pixels[:100]:
for c in pixel:
bits.append(c & 1)
header_bytes = bytearray()
for i in range(0, min(32, len(bits)), 8):
byte = 0
for j in range(8):
byte = (byte << 1) | bits[i + j]
header_bytes.append(byte)
has_asts_magic = header_bytes[:4] == ImageStego.MAGIC
# Scoring
score = 0
indicators = []
if chi_sq < 1.0:
score += 30
indicators.append(f'LSB distribution very uniform (chi²={chi_sq:.2f})')
elif chi_sq < 3.84:
score += 15
indicators.append(f'LSB distribution slightly uniform (chi²={chi_sq:.2f})')
if rs_ratio > 0.6:
score += 25
indicators.append(f'RS analysis suggests embedding (R/S={rs_ratio:.3f})')
if has_asts_magic:
score += 50
indicators.append('AUTARCH stego signature detected in LSB')
# Check file size vs expected
file_size = os.path.getsize(image_path)
expected_size = w * h * 3 # rough uncompressed estimate
if file_size > expected_size * 0.9: # PNG should be smaller
score += 10
indicators.append('File larger than expected for format')
verdict = 'clean'
if score >= 50:
verdict = 'likely_stego'
elif score >= 25:
verdict = 'suspicious'
return {
'ok': True,
'verdict': verdict,
'confidence_score': min(100, score),
'chi_square': round(chi_sq, 4),
'rs_ratio': round(rs_ratio, 4),
'has_magic': has_asts_magic,
'indicators': indicators,
'image_info': {'width': w, 'height': h, 'size': file_size}
}
except Exception as e:
return {'ok': False, 'error': str(e)}
@staticmethod
def analyze_audio(audio_path: str) -> Dict:
"""Analyze audio file for signs of steganography."""
if not HAS_WAVE:
return {'ok': False, 'error': 'wave module not available'}
try:
with wave.open(audio_path, 'rb') as w:
frames = w.readframes(min(w.getnframes(), 100000))
params = w.getparams()
samples = list(frames)
observed = [0, 0]
for s in samples:
observed[s & 1] += 1
total = sum(observed)
expected = total / 2
chi_sq = sum((o - expected) ** 2 / expected for o in observed)
# Check for magic
bits = [s & 1 for s in samples[:100]]
header_bytes = bytearray()
for i in range(0, min(32, len(bits)), 8):
byte = 0
for j in range(8):
byte = (byte << 1) | bits[i + j]
header_bytes.append(byte)
has_magic = header_bytes[:4] == AudioStego.MAGIC
score = 0
indicators = []
if chi_sq < 1.0:
score += 30
indicators.append(f'LSB distribution uniform (chi²={chi_sq:.2f})')
if has_magic:
score += 50
indicators.append('AUTARCH stego signature detected')
verdict = 'clean'
if score >= 50:
verdict = 'likely_stego'
elif score >= 25:
verdict = 'suspicious'
return {
'ok': True, 'verdict': verdict,
'confidence_score': min(100, score),
'chi_square': round(chi_sq, 4),
'has_magic': has_magic,
'indicators': indicators,
'audio_info': {
'channels': params.nchannels,
'framerate': params.framerate,
'frames': params.nframes
}
}
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── Steganography Manager ───────────────────────────────────────────────────
class StegoManager:
"""Unified interface for all steganography operations."""
def __init__(self):
self.data_dir = os.path.join(get_data_dir(), 'stego')
os.makedirs(self.data_dir, exist_ok=True)
self.image = ImageStego()
self.audio = AudioStego()
self.document = DocumentStego()
self.detector = StegoDetector()
def get_capabilities(self) -> Dict:
"""Check available steganography capabilities."""
return {
'image': HAS_PIL,
'audio': HAS_WAVE,
'document': True,
'encryption': HAS_CRYPTO,
'detection': HAS_PIL or HAS_WAVE
}
def hide(self, carrier_path: str, data: bytes, output_path: str = None,
password: str = None, carrier_type: str = None) -> Dict:
"""Hide data in a carrier file (auto-detect type)."""
if not carrier_type:
ext = Path(carrier_path).suffix.lower()
if ext in ('.png', '.bmp', '.tiff', '.tif'):
carrier_type = 'image'
elif ext in ('.wav', '.wave'):
carrier_type = 'audio'
else:
return {'ok': False, 'error': f'Unsupported carrier format: {ext}'}
if not output_path:
p = Path(carrier_path)
output_path = str(p.parent / f'{p.stem}_stego{p.suffix}')
if carrier_type == 'image':
return self.image.hide(carrier_path, data, output_path, password)
elif carrier_type == 'audio':
return self.audio.hide(carrier_path, data, output_path, password)
return {'ok': False, 'error': f'Unsupported type: {carrier_type}'}
def extract(self, carrier_path: str, password: str = None,
carrier_type: str = None) -> Dict:
"""Extract hidden data from carrier file."""
if not carrier_type:
ext = Path(carrier_path).suffix.lower()
if ext in ('.png', '.bmp', '.tiff', '.tif'):
carrier_type = 'image'
elif ext in ('.wav', '.wave'):
carrier_type = 'audio'
if carrier_type == 'image':
return self.image.extract(carrier_path, password)
elif carrier_type == 'audio':
return self.audio.extract(carrier_path, password)
return {'ok': False, 'error': f'Unsupported type: {carrier_type}'}
def detect(self, file_path: str) -> Dict:
"""Analyze file for steganographic content."""
ext = Path(file_path).suffix.lower()
if ext in ('.png', '.bmp', '.tiff', '.tif', '.jpg', '.jpeg'):
return self.detector.analyze_image(file_path)
elif ext in ('.wav', '.wave'):
return self.detector.analyze_audio(file_path)
return {'ok': False, 'error': f'Unsupported format for detection: {ext}'}
def capacity(self, file_path: str) -> Dict:
"""Check capacity of a carrier file."""
ext = Path(file_path).suffix.lower()
if ext in ('.png', '.bmp', '.tiff', '.tif'):
return self.image.capacity(file_path)
elif ext in ('.wav', '.wave'):
return self.audio.capacity(file_path)
return {'ok': False, 'error': f'Unsupported format: {ext}'}
# ── Singleton ────────────────────────────────────────────────────────────────
_instance = None
def get_stego_manager() -> StegoManager:
global _instance
if _instance is None:
_instance = StegoManager()
return _instance
# ── CLI Interface ────────────────────────────────────────────────────────────
def run():
"""CLI entry point for Steganography module."""
mgr = get_stego_manager()
while True:
caps = mgr.get_capabilities()
print(f"\n{'='*60}")
print(f" Steganography")
print(f"{'='*60}")
print(f" Image: {'OK' if caps['image'] else 'MISSING (pip install Pillow)'}")
print(f" Audio: {'OK' if caps['audio'] else 'MISSING'}")
print(f" Encryption: {'OK' if caps['encryption'] else 'MISSING (pip install pycryptodome)'}")
print()
print(" 1 — Hide Data in File")
print(" 2 — Extract Data from File")
print(" 3 — Detect Steganography")
print(" 4 — Check Carrier Capacity")
print(" 5 — Hide Text in Document (whitespace)")
print(" 6 — Extract Text from Document")
print(" 0 — Back")
print()
choice = input(" > ").strip()
if choice == '0':
break
elif choice == '1':
carrier = input(" Carrier file path: ").strip()
message = input(" Message to hide: ").strip()
output = input(" Output file path (blank=auto): ").strip() or None
password = input(" Encryption password (blank=none): ").strip() or None
if carrier and message:
result = mgr.hide(carrier, message.encode(), output, password)
if result['ok']:
print(f" Success: {result.get('message', result.get('output'))}")
else:
print(f" Error: {result['error']}")
elif choice == '2':
carrier = input(" Stego file path: ").strip()
password = input(" Password (blank=none): ").strip() or None
if carrier:
result = mgr.extract(carrier, password)
if result['ok']:
try:
text = result['data'].decode('utf-8')
print(f" Extracted ({result['size']} bytes): {text}")
except UnicodeDecodeError:
print(f" Extracted {result['size']} bytes (binary data)")
else:
print(f" Error: {result['error']}")
elif choice == '3':
filepath = input(" File to analyze: ").strip()
if filepath:
result = mgr.detect(filepath)
if result['ok']:
print(f" Verdict: {result['verdict']} (score: {result['confidence_score']})")
for ind in result.get('indicators', []):
print(f" - {ind}")
else:
print(f" Error: {result['error']}")
elif choice == '4':
filepath = input(" Carrier file: ").strip()
if filepath:
result = mgr.capacity(filepath)
if result['ok']:
kb = result['capacity_bytes'] / 1024
print(f" Capacity: {result['capacity_bytes']} bytes ({kb:.1f} KB)")
else:
print(f" Error: {result['error']}")
elif choice == '5':
text = input(" Cover text: ").strip()
message = input(" Hidden message: ").strip()
password = input(" Password (blank=none): ").strip() or None
if text and message:
result = mgr.document.hide_whitespace(text, message.encode(), password)
if result['ok']:
print(f" Output text (copy this):")
print(f" {result['text']}")
else:
print(f" Error: {result['error']}")
elif choice == '6':
text = input(" Text with hidden data: ").strip()
password = input(" Password (blank=none): ").strip() or None
if text:
result = mgr.document.extract_whitespace(text, password)
if result['ok']:
print(f" Hidden message: {result['data'].decode('utf-8', errors='replace')}")
else:
print(f" Error: {result['error']}")