86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
|
|
"""DNS interception — spoofs cloud domains to point at us"""
|
||
|
|
|
||
|
|
import socket
|
||
|
|
import struct
|
||
|
|
from utils.log import log, C_SUCCESS, C_IMPORTANT, C_ERROR
|
||
|
|
|
||
|
|
SPOOF_DOMAINS = [b"ubianet.com", b"aliyuncs.com", b"amazonaws.com", b"myqcloud.com"]
|
||
|
|
|
||
|
|
|
||
|
|
def parse_dns_name(data, offset):
|
||
|
|
labels = []
|
||
|
|
while offset < len(data):
|
||
|
|
length = data[offset]
|
||
|
|
if length == 0:
|
||
|
|
offset += 1
|
||
|
|
break
|
||
|
|
if (length & 0xC0) == 0xC0:
|
||
|
|
ptr = struct.unpack("!H", data[offset:offset + 2])[0] & 0x3FFF
|
||
|
|
labels.append(parse_dns_name(data, ptr)[0])
|
||
|
|
offset += 2
|
||
|
|
break
|
||
|
|
offset += 1
|
||
|
|
labels.append(data[offset:offset + length])
|
||
|
|
offset += length
|
||
|
|
return b".".join(labels), offset
|
||
|
|
|
||
|
|
|
||
|
|
def build_dns_response(query, ip):
|
||
|
|
resp = bytearray(query[:2])
|
||
|
|
resp += b"\x81\x80"
|
||
|
|
resp += query[4:6]
|
||
|
|
resp += b"\x00\x01\x00\x00\x00\x00"
|
||
|
|
resp += query[12:]
|
||
|
|
resp += b"\xc0\x0c\x00\x01\x00\x01"
|
||
|
|
resp += struct.pack("!I", 60)
|
||
|
|
resp += b"\x00\x04"
|
||
|
|
resp += socket.inet_aton(ip)
|
||
|
|
return bytes(resp)
|
||
|
|
|
||
|
|
|
||
|
|
def run(cfg, flags, running_check):
|
||
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
|
sock.settimeout(1)
|
||
|
|
try:
|
||
|
|
sock.bind(("0.0.0.0", 53))
|
||
|
|
except OSError as e:
|
||
|
|
log(f"DNS: bind :53 failed: {e}", C_ERROR)
|
||
|
|
return
|
||
|
|
|
||
|
|
flags["dns"] = True
|
||
|
|
log("DNS: listening on :53", C_SUCCESS)
|
||
|
|
|
||
|
|
while running_check():
|
||
|
|
try:
|
||
|
|
data, addr = sock.recvfrom(1024)
|
||
|
|
except socket.timeout:
|
||
|
|
continue
|
||
|
|
except:
|
||
|
|
break
|
||
|
|
if len(data) < 12:
|
||
|
|
continue
|
||
|
|
|
||
|
|
name, _ = parse_dns_name(data, 12)
|
||
|
|
name_str = name.decode("utf-8", errors="replace")
|
||
|
|
should_spoof = (addr[0] == cfg["camera_ip"] and
|
||
|
|
any(d in name.lower() for d in SPOOF_DOMAINS))
|
||
|
|
|
||
|
|
if should_spoof:
|
||
|
|
resp = build_dns_response(data, cfg["our_ip"])
|
||
|
|
sock.sendto(resp, addr)
|
||
|
|
log(f"DNS: {name_str} -> SPOOFED", C_IMPORTANT)
|
||
|
|
else:
|
||
|
|
fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
|
fwd.settimeout(3)
|
||
|
|
try:
|
||
|
|
fwd.sendto(data, (cfg["router_ip"], 53))
|
||
|
|
resp, _ = fwd.recvfrom(4096)
|
||
|
|
sock.sendto(resp, addr)
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
fwd.close()
|
||
|
|
|
||
|
|
sock.close()
|
||
|
|
flags["dns"] = False
|