"""DNS interception — spoofs cloud domains to point at us""" import socket import struct from utils.log import log, C_SUCCESS, C_IMPORTANT, C_ERROR SPOOF_DOMAINS = [b"ubianet.com", b"aliyuncs.com", b"amazonaws.com", b"myqcloud.com"] def parse_dns_name(data, offset): labels = [] while offset < len(data): length = data[offset] if length == 0: offset += 1 break if (length & 0xC0) == 0xC0: ptr = struct.unpack("!H", data[offset:offset + 2])[0] & 0x3FFF labels.append(parse_dns_name(data, ptr)[0]) offset += 2 break offset += 1 labels.append(data[offset:offset + length]) offset += length return b".".join(labels), offset def build_dns_response(query, ip): resp = bytearray(query[:2]) resp += b"\x81\x80" resp += query[4:6] resp += b"\x00\x01\x00\x00\x00\x00" resp += query[12:] resp += b"\xc0\x0c\x00\x01\x00\x01" resp += struct.pack("!I", 60) resp += b"\x00\x04" resp += socket.inet_aton(ip) return bytes(resp) def run(cfg, flags, running_check): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(1) try: sock.bind(("0.0.0.0", 53)) except OSError as e: log(f"DNS: bind :53 failed: {e}", C_ERROR) return flags["dns"] = True log("DNS: listening on :53", C_SUCCESS) while running_check(): try: data, addr = sock.recvfrom(1024) except socket.timeout: continue except: break if len(data) < 12: continue name, _ = parse_dns_name(data, 12) name_str = name.decode("utf-8", errors="replace") should_spoof = (addr[0] == cfg["camera_ip"] and any(d in name.lower() for d in SPOOF_DOMAINS)) if should_spoof: resp = build_dns_response(data, cfg["our_ip"]) sock.sendto(resp, addr) log(f"DNS: {name_str} -> SPOOFED", C_IMPORTANT) else: fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) fwd.settimeout(3) try: fwd.sendto(data, (cfg["router_ip"], 53)) resp, _ = fwd.recvfrom(4096) sock.sendto(resp, addr) except: pass fwd.close() sock.close() flags["dns"] = False