"""Packet injection — craft and send raw packets to the camera or network""" import socket import struct import os from utils.log import log, C_SUCCESS, C_ERROR, C_INFO, C_IMPORTANT def _checksum(data): """Calculate IP/TCP/UDP checksum""" if len(data) % 2: data += b"\x00" s = sum(struct.unpack("!%dH" % (len(data) // 2), data)) s = (s >> 16) + (s & 0xFFFF) s += s >> 16 return ~s & 0xFFFF def build_ethernet(src_mac, dst_mac, ethertype=0x0800): src = bytes.fromhex(src_mac.replace(":", "")) dst = bytes.fromhex(dst_mac.replace(":", "")) return dst + src + struct.pack("!H", ethertype) def build_ip(src_ip, dst_ip, proto, payload_len): ver_ihl = 0x45 tos = 0 total_len = 20 + payload_len ident = os.getpid() & 0xFFFF flags_frag = 0x4000 # Don't Fragment ttl = 64 header = struct.pack("!BBHHHBBH4s4s", ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, 0, socket.inet_aton(src_ip), socket.inet_aton(dst_ip)) chk = _checksum(header) return header[:10] + struct.pack("!H", chk) + header[12:] def build_udp(src_port, dst_port, payload): length = 8 + len(payload) header = struct.pack("!HHH", src_port, dst_port, length) + b"\x00\x00" return header + payload def build_tcp_syn(src_port, dst_port, seq=1000): data_offset = 5 << 4 flags = 0x02 # SYN window = 65535 header = struct.pack("!HHIIBBHHH", src_port, dst_port, seq, 0, data_offset, flags, window, 0, 0) return header def build_arp_request(src_mac, src_ip, target_ip): src_m = bytes.fromhex(src_mac.replace(":", "")) dst_m = b"\xff\xff\xff\xff\xff\xff" eth = dst_m + src_m + b"\x08\x06" arp = struct.pack("!HHBBH", 1, 0x0800, 6, 4, 1) # request arp += src_m + socket.inet_aton(src_ip) arp += b"\x00" * 6 + socket.inet_aton(target_ip) return eth + arp def build_arp_reply(src_mac, dst_mac, src_ip, dst_ip): src_m = bytes.fromhex(src_mac.replace(":", "")) dst_m = bytes.fromhex(dst_mac.replace(":", "")) eth = dst_m + src_m + b"\x08\x06" arp = struct.pack("!HHBBH", 1, 0x0800, 6, 4, 2) # reply arp += src_m + socket.inet_aton(src_ip) arp += dst_m + socket.inet_aton(dst_ip) return eth + arp def build_dns_query(domain, src_port=12345): """Build a DNS query packet payload""" txid = struct.pack("!H", os.getpid() & 0xFFFF) flags = b"\x01\x00" # standard query counts = struct.pack("!HHHH", 1, 0, 0, 0) qname = b"" for label in domain.encode().split(b"."): qname += bytes([len(label)]) + label qname += b"\x00" qtype = struct.pack("!HH", 1, 1) # A record, IN class return txid + flags + counts + qname + qtype def send_raw(iface, packet): """Send a raw Ethernet frame""" try: sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003)) sock.bind((iface, 0)) sock.send(packet) sock.close() return True except Exception as e: log(f"INJECT: send failed: {e}", C_ERROR) return False def send_udp(dst_ip, dst_port, payload, src_port=0): """Send UDP datagram using normal socket""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if src_port: sock.bind(("", src_port)) sock.sendto(payload, (dst_ip, dst_port)) sock.close() log(f"INJECT: UDP sent to {dst_ip}:{dst_port} ({len(payload)}B)", C_SUCCESS) return True except Exception as e: log(f"INJECT: UDP failed: {e}", C_ERROR) return False def inject(cfg, params): """ Inject a packet based on params dict. params: { "type": "udp"|"arp_request"|"arp_reply"|"dns_query"|"raw", "dst_ip": "...", "dst_port": 1234, "src_port": 5678, "payload": "hex string or ascii", "payload_hex": true/false, "domain": "for dns_query", "src_mac": "...", "dst_mac": "...", "src_ip": "...", } """ ptype = params.get("type", "udp") iface = cfg["iface"] if ptype == "udp": dst_ip = params.get("dst_ip", cfg["camera_ip"]) dst_port = int(params.get("dst_port", 10240)) src_port = int(params.get("src_port", 0)) payload = params.get("payload", "") if params.get("payload_hex"): payload = bytes.fromhex(payload) else: payload = payload.encode() return {"ok": send_udp(dst_ip, dst_port, payload, src_port)} elif ptype == "arp_request": our_mac = open(f"/sys/class/net/{iface}/address").read().strip() target_ip = params.get("dst_ip", cfg["camera_ip"]) pkt = build_arp_request(our_mac, cfg["our_ip"], target_ip) return {"ok": send_raw(iface, pkt)} elif ptype == "arp_reply": src_mac = params.get("src_mac", open(f"/sys/class/net/{iface}/address").read().strip()) dst_mac = params.get("dst_mac", cfg["camera_mac"]) src_ip = params.get("src_ip", cfg["router_ip"]) dst_ip = params.get("dst_ip", cfg["camera_ip"]) pkt = build_arp_reply(src_mac, dst_mac, src_ip, dst_ip) log(f"INJECT: ARP reply {src_ip} is-at {src_mac} -> {dst_ip}", C_IMPORTANT) return {"ok": send_raw(iface, pkt)} elif ptype == "dns_query": domain = params.get("domain", "portal.ubianet.com") payload = build_dns_query(domain) dst_ip = params.get("dst_ip", cfg["router_ip"]) return {"ok": send_udp(dst_ip, 53, payload)} elif ptype == "raw": payload = params.get("payload", "") if params.get("payload_hex"): payload = bytes.fromhex(payload) else: payload = payload.encode() # Need full ethernet frame for raw return {"ok": send_raw(iface, payload)} else: log(f"INJECT: unknown type '{ptype}'", C_ERROR) return {"ok": False, "error": f"unknown type: {ptype}"}