Templated from cam-mitm. The camera-specific code (UBox cloud client, CVE verifiers, OAM HMAC signing, fuzzer wordlists) is removed; what's left is the generic core: ARP spoof, DNS spoof, HTTP/HTTPS interception with peek-before-wrap, raw sniffer with conntrack-based original-dst lookup, protocol fingerprinting, intruder detection, packet injection, log rotation, PyQt6 GUI on top of a service Controller. All 'camera' references renamed to 'target' throughout. Configuration moved into ~/.config/setec-mitm/config.json with the Settings tab as the primary editor. Plugin system at targets/<name>/plugin.py for vendor-specific code. See README.md for full setup, plugin authoring, and troubleshooting. Co-authored by Setec Labs.
179 lines
5.9 KiB
Python
179 lines
5.9 KiB
Python
"""Packet injection — craft and send raw packets to the target or network"""
|
|
|
|
import socket
|
|
import struct
|
|
import os
|
|
from utils.log import log, C_SUCCESS, C_ERROR, C_INFO, C_IMPORTANT
|
|
|
|
|
|
def _checksum(data):
|
|
"""Calculate IP/TCP/UDP checksum"""
|
|
if len(data) % 2:
|
|
data += b"\x00"
|
|
s = sum(struct.unpack("!%dH" % (len(data) // 2), data))
|
|
s = (s >> 16) + (s & 0xFFFF)
|
|
s += s >> 16
|
|
return ~s & 0xFFFF
|
|
|
|
|
|
def build_ethernet(src_mac, dst_mac, ethertype=0x0800):
|
|
src = bytes.fromhex(src_mac.replace(":", ""))
|
|
dst = bytes.fromhex(dst_mac.replace(":", ""))
|
|
return dst + src + struct.pack("!H", ethertype)
|
|
|
|
|
|
def build_ip(src_ip, dst_ip, proto, payload_len):
|
|
ver_ihl = 0x45
|
|
tos = 0
|
|
total_len = 20 + payload_len
|
|
ident = os.getpid() & 0xFFFF
|
|
flags_frag = 0x4000 # Don't Fragment
|
|
ttl = 64
|
|
header = struct.pack("!BBHHHBBH4s4s",
|
|
ver_ihl, tos, total_len, ident, flags_frag,
|
|
ttl, proto, 0,
|
|
socket.inet_aton(src_ip), socket.inet_aton(dst_ip))
|
|
chk = _checksum(header)
|
|
return header[:10] + struct.pack("!H", chk) + header[12:]
|
|
|
|
|
|
def build_udp(src_port, dst_port, payload):
|
|
length = 8 + len(payload)
|
|
header = struct.pack("!HHH", src_port, dst_port, length) + b"\x00\x00"
|
|
return header + payload
|
|
|
|
|
|
def build_tcp_syn(src_port, dst_port, seq=1000):
|
|
data_offset = 5 << 4
|
|
flags = 0x02 # SYN
|
|
window = 65535
|
|
header = struct.pack("!HHIIBBHHH",
|
|
src_port, dst_port, seq, 0,
|
|
data_offset, flags, window, 0, 0)
|
|
return header
|
|
|
|
|
|
def build_arp_request(src_mac, src_ip, target_ip):
|
|
src_m = bytes.fromhex(src_mac.replace(":", ""))
|
|
dst_m = b"\xff\xff\xff\xff\xff\xff"
|
|
eth = dst_m + src_m + b"\x08\x06"
|
|
arp = struct.pack("!HHBBH", 1, 0x0800, 6, 4, 1) # request
|
|
arp += src_m + socket.inet_aton(src_ip)
|
|
arp += b"\x00" * 6 + socket.inet_aton(target_ip)
|
|
return eth + arp
|
|
|
|
|
|
def build_arp_reply(src_mac, dst_mac, src_ip, dst_ip):
|
|
src_m = bytes.fromhex(src_mac.replace(":", ""))
|
|
dst_m = bytes.fromhex(dst_mac.replace(":", ""))
|
|
eth = dst_m + src_m + b"\x08\x06"
|
|
arp = struct.pack("!HHBBH", 1, 0x0800, 6, 4, 2) # reply
|
|
arp += src_m + socket.inet_aton(src_ip)
|
|
arp += dst_m + socket.inet_aton(dst_ip)
|
|
return eth + arp
|
|
|
|
|
|
def build_dns_query(domain, src_port=12345):
|
|
"""Build a DNS query packet payload"""
|
|
txid = struct.pack("!H", os.getpid() & 0xFFFF)
|
|
flags = b"\x01\x00" # standard query
|
|
counts = struct.pack("!HHHH", 1, 0, 0, 0)
|
|
qname = b""
|
|
for label in domain.encode().split(b"."):
|
|
qname += bytes([len(label)]) + label
|
|
qname += b"\x00"
|
|
qtype = struct.pack("!HH", 1, 1) # A record, IN class
|
|
return txid + flags + counts + qname + qtype
|
|
|
|
|
|
def send_raw(iface, packet):
|
|
"""Send a raw Ethernet frame"""
|
|
try:
|
|
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
|
|
sock.bind((iface, 0))
|
|
sock.send(packet)
|
|
sock.close()
|
|
return True
|
|
except Exception as e:
|
|
log(f"INJECT: send failed: {e}", C_ERROR)
|
|
return False
|
|
|
|
|
|
def send_udp(dst_ip, dst_port, payload, src_port=0):
|
|
"""Send UDP datagram using normal socket"""
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
if src_port:
|
|
sock.bind(("", src_port))
|
|
sock.sendto(payload, (dst_ip, dst_port))
|
|
sock.close()
|
|
log(f"INJECT: UDP sent to {dst_ip}:{dst_port} ({len(payload)}B)", C_SUCCESS)
|
|
return True
|
|
except Exception as e:
|
|
log(f"INJECT: UDP failed: {e}", C_ERROR)
|
|
return False
|
|
|
|
|
|
def inject(cfg, params):
|
|
"""
|
|
Inject a packet based on params dict.
|
|
params: {
|
|
"type": "udp"|"arp_request"|"arp_reply"|"dns_query"|"raw",
|
|
"dst_ip": "...",
|
|
"dst_port": 1234,
|
|
"src_port": 5678,
|
|
"payload": "hex string or ascii",
|
|
"payload_hex": true/false,
|
|
"domain": "for dns_query",
|
|
"src_mac": "...", "dst_mac": "...",
|
|
"src_ip": "...",
|
|
}
|
|
"""
|
|
ptype = params.get("type", "udp")
|
|
iface = cfg["iface"]
|
|
|
|
if ptype == "udp":
|
|
dst_ip = params.get("dst_ip", cfg["target_ip"])
|
|
dst_port = int(params.get("dst_port", 10240))
|
|
src_port = int(params.get("src_port", 0))
|
|
payload = params.get("payload", "")
|
|
if params.get("payload_hex"):
|
|
payload = bytes.fromhex(payload)
|
|
else:
|
|
payload = payload.encode()
|
|
return {"ok": send_udp(dst_ip, dst_port, payload, src_port)}
|
|
|
|
elif ptype == "arp_request":
|
|
our_mac = open(f"/sys/class/net/{iface}/address").read().strip()
|
|
target_ip = params.get("dst_ip", cfg["target_ip"])
|
|
pkt = build_arp_request(our_mac, cfg["our_ip"], target_ip)
|
|
return {"ok": send_raw(iface, pkt)}
|
|
|
|
elif ptype == "arp_reply":
|
|
src_mac = params.get("src_mac", open(f"/sys/class/net/{iface}/address").read().strip())
|
|
dst_mac = params.get("dst_mac", cfg["target_mac"])
|
|
src_ip = params.get("src_ip", cfg["router_ip"])
|
|
dst_ip = params.get("dst_ip", cfg["target_ip"])
|
|
pkt = build_arp_reply(src_mac, dst_mac, src_ip, dst_ip)
|
|
log(f"INJECT: ARP reply {src_ip} is-at {src_mac} -> {dst_ip}", C_IMPORTANT)
|
|
return {"ok": send_raw(iface, pkt)}
|
|
|
|
elif ptype == "dns_query":
|
|
domain = params.get("domain", "portal.ubianet.com")
|
|
payload = build_dns_query(domain)
|
|
dst_ip = params.get("dst_ip", cfg["router_ip"])
|
|
return {"ok": send_udp(dst_ip, 53, payload)}
|
|
|
|
elif ptype == "raw":
|
|
payload = params.get("payload", "")
|
|
if params.get("payload_hex"):
|
|
payload = bytes.fromhex(payload)
|
|
else:
|
|
payload = payload.encode()
|
|
# Need full ethernet frame for raw
|
|
return {"ok": send_raw(iface, payload)}
|
|
|
|
else:
|
|
log(f"INJECT: unknown type '{ptype}'", C_ERROR)
|
|
return {"ok": False, "error": f"unknown type: {ptype}"}
|