"""Raw packet sniffer — catches all camera traffic headed to us on any port""" import socket import struct import subprocess from utils.log import log, hexdump, save_raw, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT from utils import proto as proto_id _orig_dst_cache = {} def _lookup_orig_dst(src_ip, src_port, proto): key = (src_ip, src_port, proto) if key in _orig_dst_cache: return _orig_dst_cache[key] result = None try: out = subprocess.run( ["conntrack", "-L", "-s", src_ip, "-p", proto, "--sport", str(src_port)], capture_output=True, text=True, timeout=2, ).stdout for line in out.splitlines(): parts = line.split() d_ip = None d_port = None for p in parts: if p.startswith("dst=") and d_ip is None: d_ip = p[4:] elif p.startswith("dport=") and d_port is None: d_port = p[6:] if d_ip and d_port: break if d_ip and d_port: result = f"{d_ip}:{d_port}" break except Exception: result = None _orig_dst_cache[key] = result return result def run(cfg, flags, running_check): try: sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003)) sock.bind((cfg["iface"], 0)) sock.settimeout(1) except: log("Sniffer: cannot open raw socket", C_ERROR) return flags["sniffer"] = True log("Sniffer: watching all camera packets", C_SUCCESS) seen = set() while running_check(): try: pkt, _ = sock.recvfrom(65535) except socket.timeout: continue except: break if len(pkt) < 34: continue eth_proto = struct.unpack("!H", pkt[12:14])[0] if eth_proto != 0x0800: continue ip_hdr = pkt[14:34] ihl = (ip_hdr[0] & 0x0F) * 4 proto = ip_hdr[9] src_ip = socket.inet_ntoa(ip_hdr[12:16]) dst_ip = socket.inet_ntoa(ip_hdr[16:20]) if src_ip != cfg["camera_ip"] or dst_ip != cfg["our_ip"]: continue t_start = 14 + ihl if proto == 17 and len(pkt) >= t_start + 8: sp, dp = struct.unpack("!HH", pkt[t_start:t_start + 4]) if dp == 53: continue payload = pkt[t_start + 8:] key = f"udp:{dp}" if key not in seen: seen.add(key) log(f"SNIFF: new UDP port {sp}->{dp}", C_IMPORTANT) orig = _lookup_orig_dst(src_ip, sp, "udp") or "?" pname = proto_id.detect(payload) proto_id.record(pname) log(f"SNIFF: UDP {cfg['camera_ip']}:{sp} -> {dst_ip}:{dp} (orig={orig}) [{pname} {payload[:6].hex()}] ({len(payload)}B)", C_TRAFFIC) log(hexdump(payload), 0) save_raw(cfg["log_dir"], f"sniff_udp{dp}_{sp}", payload) elif proto == 6 and len(pkt) >= t_start + 4: sp, dp = struct.unpack("!HH", pkt[t_start:t_start + 4]) key = f"tcp:{dp}" if key not in seen: seen.add(key) orig = _lookup_orig_dst(src_ip, sp, "tcp") or "?" log(f"SNIFF: new TCP {cfg['camera_ip']}:{sp} -> {dst_ip}:{dp} (orig={orig})", C_IMPORTANT) sock.close() flags["sniffer"] = False