Files
setec-mitm/utils/proto.py
sssnake 20e7eb343d Initial commit — SetecMITM generic IoT MITM framework
Templated from cam-mitm. The camera-specific code (UBox cloud client,
CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's
left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception
with peek-before-wrap, raw sniffer with conntrack-based original-dst
lookup, protocol fingerprinting, intruder detection, packet injection,
log rotation, PyQt6 GUI on top of a service Controller.

All 'camera' references renamed to 'target' throughout. Configuration
moved into ~/.config/setec-mitm/config.json with the Settings tab as
the primary editor. Plugin system at targets/<name>/plugin.py for
vendor-specific code.

See README.md for full setup, plugin authoring, and troubleshooting.

Co-authored by Setec Labs.
2026-04-09 08:38:59 -07:00

95 lines
2.7 KiB
Python

"""
Protocol fingerprinting from packet payload first bytes.
Returns a short label like 'TLS', 'HTTP', 'IOTC', '?'.
"""
import threading
from collections import Counter
_seen = Counter()
_lock = threading.Lock()
def detect(data: bytes) -> str:
if not data:
return "?"
n = len(data)
b = data
# TLS: 0x16 (handshake) 0x03 0x0[0-4]
if n >= 3 and b[0] == 0x16 and b[1] == 0x03 and b[2] <= 0x04:
return "TLS"
# TLS app data / alert / change_cipher_spec
if n >= 3 and b[0] in (0x14, 0x15, 0x17) and b[1] == 0x03 and b[2] <= 0x04:
return "TLS-DATA"
# HTTP request methods (ASCII)
head = bytes(b[:8])
for verb in (b"GET ", b"POST ", b"PUT ", b"HEAD ", b"DELETE ", b"OPTIONS", b"PATCH ", b"CONNECT"):
if head.startswith(verb):
return "HTTP"
if head.startswith(b"HTTP/"):
return "HTTP-RESP"
# RTSP
if head.startswith(b"RTSP/") or head.startswith(b"OPTIONS rtsp") or head.startswith(b"DESCRIBE"):
return "RTSP"
# SSH banner
if head.startswith(b"SSH-"):
return "SSH"
# FTP banner
if head[:3] in (b"220", b"221", b"230"):
return "FTP?"
# DNS — udp payload usually starts with 16-bit ID then flags 0x01 0x00 (query) or 0x81 0x80 (resp)
if n >= 4 and b[2] in (0x01, 0x81) and b[3] in (0x00, 0x80, 0x20, 0xa0):
return "DNS"
# NTP — first byte: LI(2)|VN(3)|Mode(3); common values 0x1b (client), 0x24 (server)
if n >= 48 and b[0] in (0x1b, 0x23, 0x24, 0xdb, 0xe3):
return "NTP?"
# ThroughTek Kalay IOTC/AVAPI — begins with 0xF1 0xD0 or 0xF1 0xE0 family
if n >= 2 and b[0] == 0xF1 and b[1] in (0xD0, 0xE0, 0xF0, 0xC0, 0xA0, 0x10, 0x20, 0x30):
return "IOTC"
# STUN — first byte 0x00 or 0x01, second byte 0x00/0x01/0x11, magic cookie 0x2112A442 at offset 4
if n >= 8 and b[0] in (0x00, 0x01) and b[4:8] == b"\x21\x12\xa4\x42":
return "STUN"
# mDNS multicast / SSDP
if head.startswith(b"M-SEARCH") or head.startswith(b"NOTIFY *") or head.startswith(b"HTTP/1.1 200 OK"):
return "SSDP"
# MQTT — first byte 0x10 (CONNECT), 0x20 CONNACK, 0x30 PUBLISH...
if n >= 2 and (b[0] & 0xF0) in (0x10, 0x20, 0x30, 0x40, 0xC0, 0xD0, 0xE0) and b[0] != 0x00:
# weak signal — only if remaining length is sane
if 2 <= b[1] <= 200 and (b[0] & 0x0F) == 0:
return "MQTT?"
return "?"
def label_with_hex(data: bytes) -> str:
"""Return 'PROTO[hex6]' for log lines."""
p = detect(data)
h = data[:6].hex() if data else ""
return f"{p}[{h}]"
def record(proto: str):
with _lock:
_seen[proto] += 1
def seen_counts():
with _lock:
return dict(_seen)
def reset():
with _lock:
_seen.clear()