Files
cam-mitm/api/server.py
sssnake 800052acc2 Initial commit — SetecSuite Camera MITM Framework
Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.

Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.

See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
    sudo /usr/bin/python3 gui.py

Co-authored by Setec Labs.
2026-04-09 08:14:18 -07:00

148 lines
5.4 KiB
Python

"""REST API server — allows external tools (like Claude) to control the MITM tool"""
import json
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from utils.log import log, log_lines, C_SUCCESS, C_ERROR, C_INFO
class MITMApiHandler(BaseHTTPRequestHandler):
controller = None # Set by start_server
def log_message(self, format, *args):
pass # Suppress default HTTP logging
def _send_json(self, data, code=200):
body = json.dumps(data).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def _read_body(self):
cl = int(self.headers.get("Content-Length", 0))
if cl > 0:
return json.loads(self.rfile.read(cl).decode("utf-8"))
return {}
def do_GET(self):
path = self.path.rstrip("/")
if path == "/status":
self._send_json({
"services_running": self.controller.services_running,
"flags": dict(self.controller.flags),
"config": self.controller.cfg.safe_dict(),
})
elif path == "/logs":
count = 100
if "?" in self.path:
for param in self.path.split("?")[1].split("&"):
if param.startswith("count="):
count = int(param.split("=")[1])
with log_lines._mutex if hasattr(log_lines, '_mutex') else threading.Lock():
lines = [(l, c) for l, c in list(log_lines)[-count:]]
self._send_json({"logs": [l for l, _ in lines]})
elif path == "/devices":
self._send_json({"devices": self.controller.get_devices()})
elif path == "/config":
self._send_json(self.controller.cfg.safe_dict())
elif path == "/fuzz/results":
if self.controller.fuzzer:
self._send_json({"results": self.controller.fuzzer.results})
else:
self._send_json({"results": []})
else:
self._send_json({"error": "not found", "endpoints": [
"GET /status", "GET /logs?count=N", "GET /devices",
"GET /config", "GET /fuzz/results",
"POST /start", "POST /stop", "POST /config",
"POST /command", "POST /api", "POST /fuzz/endpoints",
"POST /fuzz/params", "POST /fuzz/auth", "POST /fuzz/stop",
"POST /inject",
]}, 404)
def do_POST(self):
path = self.path.rstrip("/")
body = self._read_body()
if path == "/start":
threading.Thread(target=self.controller.start_services, daemon=True).start()
self._send_json({"status": "starting"})
elif path == "/stop":
threading.Thread(target=self.controller.stop_services, daemon=True).start()
self._send_json({"status": "stopping"})
elif path == "/config":
for k, v in body.items():
if k in self.controller.cfg.keys():
self.controller.cfg[k] = v
self.controller.cfg.save()
self._send_json({"status": "updated", "config": self.controller.cfg.safe_dict()})
elif path == "/command":
cmd = body.get("cmd", "")
if cmd:
self.controller.process_command(cmd)
self._send_json({"status": "executed", "cmd": cmd})
else:
self._send_json({"error": "provide 'cmd' field"}, 400)
elif path == "/api":
endpoint = body.get("endpoint", "")
data = body.get("data", {})
if endpoint:
from api import ubox_client
result = ubox_client.api_post(
self.controller.cfg["api_base"], endpoint,
data, self.controller.cfg["api_token"])
self._send_json({"result": result})
else:
self._send_json({"error": "provide 'endpoint' field"}, 400)
elif path == "/fuzz/endpoints":
threading.Thread(target=self.controller.run_fuzz_endpoints, daemon=True).start()
self._send_json({"status": "started"})
elif path == "/fuzz/params":
endpoint = body.get("endpoint", "user/device_list")
threading.Thread(target=self.controller.run_fuzz_params,
args=(endpoint,), daemon=True).start()
self._send_json({"status": "started", "endpoint": endpoint})
elif path == "/fuzz/auth":
threading.Thread(target=self.controller.run_fuzz_auth, daemon=True).start()
self._send_json({"status": "started"})
elif path == "/fuzz/stop":
if self.controller.fuzzer:
self.controller.fuzzer.stop()
self._send_json({"status": "stopped"})
elif path == "/inject":
result = self.controller.inject_packet(body)
self._send_json(result)
else:
self._send_json({"error": "not found"}, 404)
def start_server(controller, port=9090):
MITMApiHandler.controller = controller
server = HTTPServer(("0.0.0.0", port), MITMApiHandler)
server.timeout = 1
log(f"REST API: listening on :{port}", C_SUCCESS)
while controller.running:
server.handle_request()
server.server_close()
log("REST API: stopped", C_INFO)