Templated from cam-mitm. The camera-specific code (UBox cloud client, CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception with peek-before-wrap, raw sniffer with conntrack-based original-dst lookup, protocol fingerprinting, intruder detection, packet injection, log rotation, PyQt6 GUI on top of a service Controller. All 'camera' references renamed to 'target' throughout. Configuration moved into ~/.config/setec-mitm/config.json with the Settings tab as the primary editor. Plugin system at targets/<name>/plugin.py for vendor-specific code. See README.md for full setup, plugin authoring, and troubleshooting. Co-authored by Setec Labs.
188 lines
6.2 KiB
Python
188 lines
6.2 KiB
Python
"""
|
|
Intruder watch — detects unauthorized parties interacting with the target.
|
|
|
|
Watches the raw socket for:
|
|
1. Any LAN host that isn't us, the router, or the target, exchanging traffic
|
|
with the target.
|
|
2. ARP replies for the target's IP coming from a MAC that isn't the target —
|
|
i.e. someone else is ARP-spoofing.
|
|
3. Outbound packets from the target to destinations not on the known cloud
|
|
whitelist (suggests new C2 / unknown firmware behavior).
|
|
4. New TCP/UDP destination ports the target initiates that we haven't seen.
|
|
|
|
Findings are pushed to utils.log AND to a shared `intruders` deque the GUI
|
|
reads from for the Intruders tab.
|
|
"""
|
|
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from datetime import datetime
|
|
|
|
from utils.log import log, C_ERROR, C_SUCCESS, C_IMPORTANT, C_TRAFFIC
|
|
|
|
# Shared state the GUI inspects
|
|
intruders = deque(maxlen=500)
|
|
_intruder_lock = threading.Lock()
|
|
|
|
# Known cloud destinations the target is *expected* to talk to (from findings.md).
|
|
# Anything outside this set is suspicious.
|
|
KNOWN_CLOUD_NETS = [
|
|
# Tencent Cloud (P2P relay, COS)
|
|
("43.0.0.0", 8),
|
|
("119.28.0.0", 14),
|
|
("129.226.0.0", 15),
|
|
("150.109.0.0", 16),
|
|
# Alibaba Cloud (OSS, OTA)
|
|
("8.208.0.0", 12),
|
|
("47.74.0.0", 15),
|
|
("47.88.0.0", 13),
|
|
("118.178.0.0", 15),
|
|
# AWS (NTP buckets)
|
|
("3.64.0.0", 12),
|
|
("54.93.0.0", 16),
|
|
# Akamai (connectivity check, microsoft etc.)
|
|
("23.0.0.0", 8),
|
|
("104.64.0.0", 10),
|
|
# Microsoft / Apple / Amazon connectivity checks
|
|
("17.0.0.0", 8), # Apple
|
|
("13.64.0.0", 11), # Microsoft
|
|
("52.0.0.0", 8), # Amazon
|
|
# qq.com (Tencent connectivity probe)
|
|
("182.254.0.0", 16),
|
|
]
|
|
|
|
|
|
def _ip_to_int(ip):
|
|
return struct.unpack("!I", socket.inet_aton(ip))[0]
|
|
|
|
|
|
def _in_net(ip, base, prefix):
|
|
ip_i = _ip_to_int(ip)
|
|
base_i = _ip_to_int(base)
|
|
mask = (0xFFFFFFFF << (32 - prefix)) & 0xFFFFFFFF
|
|
return (ip_i & mask) == (base_i & mask)
|
|
|
|
|
|
def _is_known_cloud(ip):
|
|
for base, prefix in KNOWN_CLOUD_NETS:
|
|
if _in_net(ip, base, prefix):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_lan(ip):
|
|
return ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")
|
|
|
|
|
|
def _record(kind, src, dst, detail):
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
entry = {"ts": ts, "kind": kind, "src": src, "dst": dst, "detail": detail}
|
|
with _intruder_lock:
|
|
intruders.append(entry)
|
|
log(f"INTRUDER [{kind}] {src} -> {dst} {detail}", C_IMPORTANT)
|
|
|
|
|
|
def get_intruders():
|
|
with _intruder_lock:
|
|
return list(intruders)
|
|
|
|
|
|
def clear_intruders():
|
|
with _intruder_lock:
|
|
intruders.clear()
|
|
|
|
|
|
def run(cfg, flags, running_check):
|
|
try:
|
|
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
|
|
sock.bind((cfg["iface"], 0))
|
|
sock.settimeout(1)
|
|
except Exception as e:
|
|
log(f"IntruderWatch: cannot open raw socket: {e}", C_ERROR)
|
|
return
|
|
|
|
flags["intruder"] = True
|
|
log("IntruderWatch: armed", C_SUCCESS)
|
|
|
|
tgt_ip = cfg["target_ip"]
|
|
tgt_mac = cfg["target_mac"].lower()
|
|
our_ip = cfg["our_ip"]
|
|
router_ip = cfg["router_ip"]
|
|
|
|
seen_lan_peers = set() # other LAN hosts that contacted the target
|
|
seen_outbound = set() # (dst_ip, proto, port) tuples
|
|
seen_arp_macs = set() # MACs claiming to be the target
|
|
|
|
while running_check():
|
|
try:
|
|
pkt, _ = sock.recvfrom(65535)
|
|
except socket.timeout:
|
|
continue
|
|
except Exception:
|
|
break
|
|
|
|
if len(pkt) < 14:
|
|
continue
|
|
|
|
eth_proto = struct.unpack("!H", pkt[12:14])[0]
|
|
eth_src = ":".join(f"{b:02x}" for b in pkt[6:12])
|
|
eth_dst = ":".join(f"{b:02x}" for b in pkt[0:6])
|
|
|
|
# ── ARP (0x0806) ────────────────────────────────────────────
|
|
if eth_proto == 0x0806 and len(pkt) >= 42:
|
|
arp = pkt[14:42]
|
|
opcode = struct.unpack("!H", arp[6:8])[0]
|
|
sender_mac = ":".join(f"{b:02x}" for b in arp[8:14])
|
|
sender_ip = socket.inet_ntoa(arp[14:18])
|
|
if opcode == 2 and sender_ip == tgt_ip and sender_mac != tgt_mac:
|
|
key = sender_mac
|
|
if key not in seen_arp_macs:
|
|
seen_arp_macs.add(key)
|
|
_record("ARP_SPOOF", sender_mac, tgt_ip,
|
|
f"someone else claims to be target (real={tgt_mac})")
|
|
continue
|
|
|
|
# ── IPv4 (0x0800) ───────────────────────────────────────────
|
|
if eth_proto != 0x0800 or len(pkt) < 34:
|
|
continue
|
|
|
|
ip_hdr = pkt[14:34]
|
|
ihl = (ip_hdr[0] & 0x0F) * 4
|
|
proto = ip_hdr[9]
|
|
src_ip = socket.inet_ntoa(ip_hdr[12:16])
|
|
dst_ip = socket.inet_ntoa(ip_hdr[16:20])
|
|
|
|
# Target is involved?
|
|
if tgt_ip not in (src_ip, dst_ip):
|
|
continue
|
|
|
|
peer_ip = dst_ip if src_ip == tgt_ip else src_ip
|
|
|
|
t_start = 14 + ihl
|
|
sp = dp = 0
|
|
if proto in (6, 17) and len(pkt) >= t_start + 4:
|
|
sp, dp = struct.unpack("!HH", pkt[t_start:t_start + 4])
|
|
|
|
# ── Rule 1: LAN peer that isn't us/router/target ────────────
|
|
if _is_lan(peer_ip) and peer_ip not in (our_ip, router_ip, tgt_ip):
|
|
if peer_ip not in seen_lan_peers:
|
|
seen_lan_peers.add(peer_ip)
|
|
_record("LAN_PEER", peer_ip, tgt_ip,
|
|
f"unknown LAN host talking to target (proto={proto} port={dp or sp})")
|
|
|
|
# ── Rule 2: outbound to non-whitelisted internet ────────────
|
|
if src_ip == tgt_ip and not _is_lan(peer_ip):
|
|
if not _is_known_cloud(peer_ip):
|
|
key = (peer_ip, proto, dp)
|
|
if key not in seen_outbound:
|
|
seen_outbound.add(key)
|
|
_record("UNKNOWN_DST", tgt_ip, peer_ip,
|
|
f"target contacting unlisted host (proto={proto} dport={dp})")
|
|
|
|
sock.close()
|
|
flags["intruder"] = False
|
|
log("IntruderWatch: stopped", C_SUCCESS)
|