Files

77 lines
2.7 KiB
Python
Raw Permalink Normal View History

"""E2E SSH Encryption Protocol — encrypt commands before SSH, decrypt responses after.
Even if SSH transport is compromised (MITM, key theft, rogue server),
command payloads and responses remain encrypted with the tunnel key.
Protocol:
Client -> VPS: base64( nonce[12] + AES-256-GCM(tunnel_key, command) )
VPS -> Client: base64( nonce[12] + AES-256-GCM(tunnel_key, response_json) )
The tunnel key is a random 32-byte key shared between client and VPS agent.
The Go agent on the VPS holds the same key at /etc/setec/tunnel.key.
"""
import os
import json
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import config
AGENT_PATH = "/usr/local/bin/setec-agent"
KEY_PATH = "/etc/setec/tunnel.key"
def encrypt_command(tunnel_key: bytes, command: str) -> str:
"""Encrypt a shell command. Returns base64 string."""
nonce = os.urandom(12)
aesgcm = AESGCM(tunnel_key)
ct = aesgcm.encrypt(nonce, command.encode("utf-8"), None)
return base64.b64encode(nonce + ct).decode("ascii")
def decrypt_response(tunnel_key: bytes, b64data: str) -> dict:
"""Decrypt an encrypted response from the agent. Returns parsed JSON."""
raw = base64.b64decode(b64data.strip())
if len(raw) < 28: # 12 nonce + 16 tag minimum
raise ValueError("Response too short to be valid ciphertext")
nonce = raw[:12]
ct = raw[12:]
aesgcm = AESGCM(tunnel_key)
plaintext = aesgcm.decrypt(nonce, ct, None)
return json.loads(plaintext.decode("utf-8"))
def is_e2e_enabled() -> bool:
"""Check if E2E encryption is configured and deployed."""
cfg = config.load()
return cfg.get("e2e_enabled", False) and cfg.get("e2e_tunnel_key_deployed", False)
def get_tunnel_key() -> bytes:
"""Load the tunnel key from local config (hex-encoded)."""
cfg = config.load()
key_hex = cfg.get("tunnel_key", "")
if not key_hex:
raise RuntimeError("No tunnel key configured. Run E2E setup first.")
return bytes.fromhex(key_hex)
def wrap_command_for_agent(tunnel_key: bytes, command: str) -> str:
"""Build the full SSH command that pipes encrypted data to the agent."""
encrypted = encrypt_command(tunnel_key, command)
return f'echo "{encrypted}" | {AGENT_PATH}'
def generate_deploy_commands(tunnel_key: bytes) -> list:
"""Generate shell commands to deploy the tunnel key on the VPS.
Returns list of (description, command) tuples.
"""
key_hex = tunnel_key.hex()
commands = [
("Create setec directory", "mkdir -p /etc/setec && chmod 700 /etc/setec"),
("Write tunnel key", f"echo '{key_hex}' > {KEY_PATH} && chmod 600 {KEY_PATH}"),
("Set key ownership", f"chown root:root {KEY_PATH}"),
]
return commands