Files
cam-mitm/services/http_server.py

180 lines
6.0 KiB
Python
Raw Permalink Normal View History

"""HTTP and HTTPS MITM servers — intercept camera cloud traffic"""
import socket
import ssl
import os
import json
import threading
from utils.log import log, hexdump, save_raw, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT
from utils import proto as proto_id
def _handle_http(conn, addr, cfg):
try:
conn.settimeout(5)
data = conn.recv(8192)
if data:
text = data.decode("utf-8", errors="replace")
lines = text.split("\r\n")
log(f"HTTP {addr[0]}: {lines[0]}", C_TRAFFIC)
for l in lines[1:6]:
if l:
log(f" {l}", 0)
save_raw(cfg["log_dir"], f"http_{addr[0]}", data)
conn.sendall(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
except:
pass
finally:
conn.close()
def _handle_https(conn, addr, cfg):
try:
conn.settimeout(5)
data = b""
while True:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
if b"\r\n\r\n" in data:
# Check Content-Length for body
cl = 0
for line in data.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
cl = int(line.split(b":")[1].strip())
break
hdr_end = data.index(b"\r\n\r\n") + 4
if len(data) >= hdr_end + cl:
break
if data:
try:
hdr_end = data.index(b"\r\n\r\n")
headers = data[:hdr_end].decode("utf-8", errors="replace")
body = data[hdr_end + 4:]
lines = headers.split("\r\n")
log(f"HTTPS {addr[0]}: {lines[0]}", C_TRAFFIC)
for l in lines[1:8]:
if l:
log(f" {l}", 0)
if body:
try:
parsed = json.loads(body)
log(f" BODY: {json.dumps(parsed)}", C_IMPORTANT)
except:
log(f" BODY ({len(body)}B):", 0)
log(hexdump(body), 0)
except:
log(f"HTTPS raw {addr[0]}: {len(data)}B", C_TRAFFIC)
log(hexdump(data), 0)
save_raw(cfg["log_dir"], f"https_{addr[0]}", data)
conn.sendall(b'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n'
b'Content-Length: 27\r\n\r\n{"code":0,"msg":"success"}')
except:
pass
finally:
conn.close()
def _generate_cert(log_dir):
cert = f"{log_dir}/mitm_cert.pem"
key = f"{log_dir}/mitm_key.pem"
if not os.path.exists(cert):
os.makedirs(log_dir, exist_ok=True)
os.system(f'openssl req -x509 -newkey rsa:2048 -keyout {key} '
f'-out {cert} -days 365 -nodes '
f'-subj "/CN=portal.ubianet.com" 2>/dev/null')
return cert, key
def run_http(cfg, flags, running_check):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.settimeout(1)
try:
srv.bind(("0.0.0.0", 80))
except OSError as e:
log(f"HTTP: bind :80 failed: {e}", C_ERROR)
return
srv.listen(5)
flags["http"] = True
log("HTTP: listening on :80", C_SUCCESS)
while running_check():
try:
conn, addr = srv.accept()
threading.Thread(target=_handle_http, args=(conn, addr, cfg), daemon=True).start()
except socket.timeout:
continue
except:
break
srv.close()
flags["http"] = False
def run_https(cfg, flags, running_check):
cert, key = _generate_cert(cfg["log_dir"])
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(cert, key)
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.settimeout(1)
try:
srv.bind(("0.0.0.0", 443))
except OSError as e:
log(f"HTTPS: bind :443 failed: {e}", C_ERROR)
return
srv.listen(5)
flags["https"] = True
log("HTTPS: listening on :443", C_SUCCESS)
while running_check():
try:
conn, addr = srv.accept()
# Peek at first bytes to detect TLS vs raw protocol
try:
conn.settimeout(3)
peek = conn.recv(8, socket.MSG_PEEK)
except Exception as e:
log(f"443 peek fail {addr[0]}: {e}", C_ERROR)
conn.close()
continue
conn.settimeout(None)
# TLS ClientHello starts with 0x16 0x03 0x0[0-4]
is_tls = len(peek) >= 3 and peek[0] == 0x16 and peek[1] == 0x03
if is_tls:
try:
ssl_conn = ctx.wrap_socket(conn, server_side=True)
threading.Thread(target=_handle_https, args=(ssl_conn, addr, cfg),
daemon=True).start()
except ssl.SSLError as e:
log(f"SSL fail {addr[0]}: {e} (first8={peek.hex()})", C_ERROR)
save_raw(cfg["log_dir"], f"raw_tls_fail_{addr[0]}", peek)
conn.close()
else:
# Non-TLS protocol on :443 — capture raw
pname = proto_id.detect(peek)
proto_id.record(pname)
log(f"NON-TLS on :443 from {addr[0]} proto={pname} first8={peek.hex()}", C_IMPORTANT)
try:
conn.settimeout(2)
full = conn.recv(4096)
if full:
log(f" Raw ({len(full)}B):", 0)
log(hexdump(full[:256]), 0)
save_raw(cfg["log_dir"], f"raw_443_{addr[0]}", full)
except Exception as e:
log(f" recv fail: {e}", C_ERROR)
conn.close()
except socket.timeout:
continue
except:
break
srv.close()
flags["https"] = False