Templated from cam-mitm. The camera-specific code (UBox cloud client, CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception with peek-before-wrap, raw sniffer with conntrack-based original-dst lookup, protocol fingerprinting, intruder detection, packet injection, log rotation, PyQt6 GUI on top of a service Controller. All 'camera' references renamed to 'target' throughout. Configuration moved into ~/.config/setec-mitm/config.json with the Settings tab as the primary editor. Plugin system at targets/<name>/plugin.py for vendor-specific code. See README.md for full setup, plugin authoring, and troubleshooting. Co-authored by Setec Labs.
180 lines
6.0 KiB
Python
180 lines
6.0 KiB
Python
"""HTTP and HTTPS MITM servers — intercept target cloud traffic"""
|
|
|
|
import socket
|
|
import ssl
|
|
import os
|
|
import json
|
|
import threading
|
|
from utils.log import log, hexdump, save_raw, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT
|
|
from utils import proto as proto_id
|
|
|
|
|
|
def _handle_http(conn, addr, cfg):
|
|
try:
|
|
conn.settimeout(5)
|
|
data = conn.recv(8192)
|
|
if data:
|
|
text = data.decode("utf-8", errors="replace")
|
|
lines = text.split("\r\n")
|
|
log(f"HTTP {addr[0]}: {lines[0]}", C_TRAFFIC)
|
|
for l in lines[1:6]:
|
|
if l:
|
|
log(f" {l}", 0)
|
|
save_raw(cfg["log_dir"], f"http_{addr[0]}", data)
|
|
conn.sendall(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
|
|
except:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _handle_https(conn, addr, cfg):
|
|
try:
|
|
conn.settimeout(5)
|
|
data = b""
|
|
while True:
|
|
chunk = conn.recv(4096)
|
|
if not chunk:
|
|
break
|
|
data += chunk
|
|
if b"\r\n\r\n" in data:
|
|
# Check Content-Length for body
|
|
cl = 0
|
|
for line in data.split(b"\r\n"):
|
|
if line.lower().startswith(b"content-length:"):
|
|
cl = int(line.split(b":")[1].strip())
|
|
break
|
|
hdr_end = data.index(b"\r\n\r\n") + 4
|
|
if len(data) >= hdr_end + cl:
|
|
break
|
|
|
|
if data:
|
|
try:
|
|
hdr_end = data.index(b"\r\n\r\n")
|
|
headers = data[:hdr_end].decode("utf-8", errors="replace")
|
|
body = data[hdr_end + 4:]
|
|
lines = headers.split("\r\n")
|
|
log(f"HTTPS {addr[0]}: {lines[0]}", C_TRAFFIC)
|
|
for l in lines[1:8]:
|
|
if l:
|
|
log(f" {l}", 0)
|
|
if body:
|
|
try:
|
|
parsed = json.loads(body)
|
|
log(f" BODY: {json.dumps(parsed)}", C_IMPORTANT)
|
|
except:
|
|
log(f" BODY ({len(body)}B):", 0)
|
|
log(hexdump(body), 0)
|
|
except:
|
|
log(f"HTTPS raw {addr[0]}: {len(data)}B", C_TRAFFIC)
|
|
log(hexdump(data), 0)
|
|
|
|
save_raw(cfg["log_dir"], f"https_{addr[0]}", data)
|
|
conn.sendall(b'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n'
|
|
b'Content-Length: 27\r\n\r\n{"code":0,"msg":"success"}')
|
|
except:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _generate_cert(log_dir):
|
|
cert = f"{log_dir}/mitm_cert.pem"
|
|
key = f"{log_dir}/mitm_key.pem"
|
|
if not os.path.exists(cert):
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
os.system(f'openssl req -x509 -newkey rsa:2048 -keyout {key} '
|
|
f'-out {cert} -days 365 -nodes '
|
|
f'-subj "/CN=portal.ubianet.com" 2>/dev/null')
|
|
return cert, key
|
|
|
|
|
|
def run_http(cfg, flags, running_check):
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srv.settimeout(1)
|
|
try:
|
|
srv.bind(("0.0.0.0", 80))
|
|
except OSError as e:
|
|
log(f"HTTP: bind :80 failed: {e}", C_ERROR)
|
|
return
|
|
srv.listen(5)
|
|
flags["http"] = True
|
|
log("HTTP: listening on :80", C_SUCCESS)
|
|
|
|
while running_check():
|
|
try:
|
|
conn, addr = srv.accept()
|
|
threading.Thread(target=_handle_http, args=(conn, addr, cfg), daemon=True).start()
|
|
except socket.timeout:
|
|
continue
|
|
except:
|
|
break
|
|
srv.close()
|
|
flags["http"] = False
|
|
|
|
|
|
def run_https(cfg, flags, running_check):
|
|
cert, key = _generate_cert(cfg["log_dir"])
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ctx.load_cert_chain(cert, key)
|
|
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srv.settimeout(1)
|
|
try:
|
|
srv.bind(("0.0.0.0", 443))
|
|
except OSError as e:
|
|
log(f"HTTPS: bind :443 failed: {e}", C_ERROR)
|
|
return
|
|
srv.listen(5)
|
|
flags["https"] = True
|
|
log("HTTPS: listening on :443", C_SUCCESS)
|
|
|
|
while running_check():
|
|
try:
|
|
conn, addr = srv.accept()
|
|
# Peek at first bytes to detect TLS vs raw protocol
|
|
try:
|
|
conn.settimeout(3)
|
|
peek = conn.recv(8, socket.MSG_PEEK)
|
|
except Exception as e:
|
|
log(f"443 peek fail {addr[0]}: {e}", C_ERROR)
|
|
conn.close()
|
|
continue
|
|
conn.settimeout(None)
|
|
|
|
# TLS ClientHello starts with 0x16 0x03 0x0[0-4]
|
|
is_tls = len(peek) >= 3 and peek[0] == 0x16 and peek[1] == 0x03
|
|
|
|
if is_tls:
|
|
try:
|
|
ssl_conn = ctx.wrap_socket(conn, server_side=True)
|
|
threading.Thread(target=_handle_https, args=(ssl_conn, addr, cfg),
|
|
daemon=True).start()
|
|
except ssl.SSLError as e:
|
|
log(f"SSL fail {addr[0]}: {e} (first8={peek.hex()})", C_ERROR)
|
|
save_raw(cfg["log_dir"], f"raw_tls_fail_{addr[0]}", peek)
|
|
conn.close()
|
|
else:
|
|
# Non-TLS protocol on :443 — capture raw
|
|
pname = proto_id.detect(peek)
|
|
proto_id.record(pname)
|
|
log(f"NON-TLS on :443 from {addr[0]} proto={pname} first8={peek.hex()}", C_IMPORTANT)
|
|
try:
|
|
conn.settimeout(2)
|
|
full = conn.recv(4096)
|
|
if full:
|
|
log(f" Raw ({len(full)}B):", 0)
|
|
log(hexdump(full[:256]), 0)
|
|
save_raw(cfg["log_dir"], f"raw_443_{addr[0]}", full)
|
|
except Exception as e:
|
|
log(f" recv fail: {e}", C_ERROR)
|
|
conn.close()
|
|
except socket.timeout:
|
|
continue
|
|
except:
|
|
break
|
|
srv.close()
|
|
flags["https"] = False
|