Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.
Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.
See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
sudo /usr/bin/python3 gui.py
Co-authored by Setec Labs.
107 lines
3.4 KiB
Python
107 lines
3.4 KiB
Python
"""Raw packet sniffer — catches all camera traffic headed to us on any port"""
|
|
|
|
import socket
|
|
import struct
|
|
import subprocess
|
|
from utils.log import log, hexdump, save_raw, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT
|
|
from utils import proto as proto_id
|
|
|
|
|
|
_orig_dst_cache = {}
|
|
|
|
|
|
def _lookup_orig_dst(src_ip, src_port, proto):
|
|
key = (src_ip, src_port, proto)
|
|
if key in _orig_dst_cache:
|
|
return _orig_dst_cache[key]
|
|
result = None
|
|
try:
|
|
out = subprocess.run(
|
|
["conntrack", "-L", "-s", src_ip, "-p", proto, "--sport", str(src_port)],
|
|
capture_output=True, text=True, timeout=2,
|
|
).stdout
|
|
for line in out.splitlines():
|
|
parts = line.split()
|
|
d_ip = None
|
|
d_port = None
|
|
for p in parts:
|
|
if p.startswith("dst=") and d_ip is None:
|
|
d_ip = p[4:]
|
|
elif p.startswith("dport=") and d_port is None:
|
|
d_port = p[6:]
|
|
if d_ip and d_port:
|
|
break
|
|
if d_ip and d_port:
|
|
result = f"{d_ip}:{d_port}"
|
|
break
|
|
except Exception:
|
|
result = None
|
|
_orig_dst_cache[key] = result
|
|
return result
|
|
|
|
|
|
def run(cfg, flags, running_check):
|
|
try:
|
|
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
|
|
sock.bind((cfg["iface"], 0))
|
|
sock.settimeout(1)
|
|
except:
|
|
log("Sniffer: cannot open raw socket", C_ERROR)
|
|
return
|
|
|
|
flags["sniffer"] = True
|
|
log("Sniffer: watching all camera packets", C_SUCCESS)
|
|
seen = set()
|
|
|
|
while running_check():
|
|
try:
|
|
pkt, _ = sock.recvfrom(65535)
|
|
except socket.timeout:
|
|
continue
|
|
except:
|
|
break
|
|
if len(pkt) < 34:
|
|
continue
|
|
|
|
eth_proto = struct.unpack("!H", pkt[12:14])[0]
|
|
if eth_proto != 0x0800:
|
|
continue
|
|
|
|
ip_hdr = pkt[14:34]
|
|
ihl = (ip_hdr[0] & 0x0F) * 4
|
|
proto = ip_hdr[9]
|
|
src_ip = socket.inet_ntoa(ip_hdr[12:16])
|
|
dst_ip = socket.inet_ntoa(ip_hdr[16:20])
|
|
|
|
if src_ip != cfg["camera_ip"] or dst_ip != cfg["our_ip"]:
|
|
continue
|
|
|
|
t_start = 14 + ihl
|
|
|
|
if proto == 17 and len(pkt) >= t_start + 8:
|
|
sp, dp = struct.unpack("!HH", pkt[t_start:t_start + 4])
|
|
if dp == 53:
|
|
continue
|
|
payload = pkt[t_start + 8:]
|
|
key = f"udp:{dp}"
|
|
if key not in seen:
|
|
seen.add(key)
|
|
log(f"SNIFF: new UDP port {sp}->{dp}", C_IMPORTANT)
|
|
orig = _lookup_orig_dst(src_ip, sp, "udp") or "?"
|
|
pname = proto_id.detect(payload)
|
|
proto_id.record(pname)
|
|
log(f"SNIFF: UDP {cfg['camera_ip']}:{sp} -> {dst_ip}:{dp} (orig={orig}) [{pname} {payload[:6].hex()}] ({len(payload)}B)", C_TRAFFIC)
|
|
log(hexdump(payload), 0)
|
|
save_raw(cfg["log_dir"], f"sniff_udp{dp}_{sp}", payload)
|
|
|
|
elif proto == 6 and len(pkt) >= t_start + 4:
|
|
sp, dp = struct.unpack("!HH", pkt[t_start:t_start + 4])
|
|
key = f"tcp:{dp}"
|
|
if key not in seen:
|
|
seen.add(key)
|
|
orig = _lookup_orig_dst(src_ip, sp, "tcp") or "?"
|
|
log(f"SNIFF: new TCP {cfg['camera_ip']}:{sp} -> {dst_ip}:{dp} (orig={orig})", C_IMPORTANT)
|
|
|
|
sock.close()
|
|
flags["sniffer"] = False
|