"""E2E SSH Encryption Protocol — encrypt commands before SSH, decrypt responses after. Even if SSH transport is compromised (MITM, key theft, rogue server), command payloads and responses remain encrypted with the tunnel key. Protocol: Client -> VPS: base64( nonce[12] + AES-256-GCM(tunnel_key, command) ) VPS -> Client: base64( nonce[12] + AES-256-GCM(tunnel_key, response_json) ) The tunnel key is a random 32-byte key shared between client and VPS agent. The Go agent on the VPS holds the same key at /etc/setec/tunnel.key. """ import os import json import base64 from cryptography.hazmat.primitives.ciphers.aead import AESGCM import config AGENT_PATH = "/usr/local/bin/setec-agent" KEY_PATH = "/etc/setec/tunnel.key" def encrypt_command(tunnel_key: bytes, command: str) -> str: """Encrypt a shell command. Returns base64 string.""" nonce = os.urandom(12) aesgcm = AESGCM(tunnel_key) ct = aesgcm.encrypt(nonce, command.encode("utf-8"), None) return base64.b64encode(nonce + ct).decode("ascii") def decrypt_response(tunnel_key: bytes, b64data: str) -> dict: """Decrypt an encrypted response from the agent. Returns parsed JSON.""" raw = base64.b64decode(b64data.strip()) if len(raw) < 28: # 12 nonce + 16 tag minimum raise ValueError("Response too short to be valid ciphertext") nonce = raw[:12] ct = raw[12:] aesgcm = AESGCM(tunnel_key) plaintext = aesgcm.decrypt(nonce, ct, None) return json.loads(plaintext.decode("utf-8")) def is_e2e_enabled() -> bool: """Check if E2E encryption is configured and deployed.""" cfg = config.load() return cfg.get("e2e_enabled", False) and cfg.get("e2e_tunnel_key_deployed", False) def get_tunnel_key() -> bytes: """Load the tunnel key from local config (hex-encoded).""" cfg = config.load() key_hex = cfg.get("tunnel_key", "") if not key_hex: raise RuntimeError("No tunnel key configured. Run E2E setup first.") return bytes.fromhex(key_hex) def wrap_command_for_agent(tunnel_key: bytes, command: str) -> str: """Build the full SSH command that pipes encrypted data to the agent.""" encrypted = encrypt_command(tunnel_key, command) return f'echo "{encrypted}" | {AGENT_PATH}' def generate_deploy_commands(tunnel_key: bytes) -> list: """Generate shell commands to deploy the tunnel key on the VPS. Returns list of (description, command) tuples. """ key_hex = tunnel_key.hex() commands = [ ("Create setec directory", "mkdir -p /etc/setec && chmod 700 /etc/setec"), ("Write tunnel key", f"echo '{key_hex}' > {KEY_PATH} && chmod 600 {KEY_PATH}"), ("Set key ownership", f"chown root:root {KEY_PATH}"), ] return commands