Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.
Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.
See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
sudo /usr/bin/python3 gui.py
Co-authored by Setec Labs.
148 lines
5.4 KiB
Python
148 lines
5.4 KiB
Python
"""REST API server — allows external tools (like Claude) to control the MITM tool"""
|
|
|
|
import json
|
|
import threading
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from utils.log import log, log_lines, C_SUCCESS, C_ERROR, C_INFO
|
|
|
|
|
|
class MITMApiHandler(BaseHTTPRequestHandler):
|
|
controller = None # Set by start_server
|
|
|
|
def log_message(self, format, *args):
|
|
pass # Suppress default HTTP logging
|
|
|
|
def _send_json(self, data, code=200):
|
|
body = json.dumps(data).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", len(body))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _read_body(self):
|
|
cl = int(self.headers.get("Content-Length", 0))
|
|
if cl > 0:
|
|
return json.loads(self.rfile.read(cl).decode("utf-8"))
|
|
return {}
|
|
|
|
def do_GET(self):
|
|
path = self.path.rstrip("/")
|
|
|
|
if path == "/status":
|
|
self._send_json({
|
|
"services_running": self.controller.services_running,
|
|
"flags": dict(self.controller.flags),
|
|
"config": self.controller.cfg.safe_dict(),
|
|
})
|
|
|
|
elif path == "/logs":
|
|
count = 100
|
|
if "?" in self.path:
|
|
for param in self.path.split("?")[1].split("&"):
|
|
if param.startswith("count="):
|
|
count = int(param.split("=")[1])
|
|
with log_lines._mutex if hasattr(log_lines, '_mutex') else threading.Lock():
|
|
lines = [(l, c) for l, c in list(log_lines)[-count:]]
|
|
self._send_json({"logs": [l for l, _ in lines]})
|
|
|
|
elif path == "/devices":
|
|
self._send_json({"devices": self.controller.get_devices()})
|
|
|
|
elif path == "/config":
|
|
self._send_json(self.controller.cfg.safe_dict())
|
|
|
|
elif path == "/fuzz/results":
|
|
if self.controller.fuzzer:
|
|
self._send_json({"results": self.controller.fuzzer.results})
|
|
else:
|
|
self._send_json({"results": []})
|
|
|
|
else:
|
|
self._send_json({"error": "not found", "endpoints": [
|
|
"GET /status", "GET /logs?count=N", "GET /devices",
|
|
"GET /config", "GET /fuzz/results",
|
|
"POST /start", "POST /stop", "POST /config",
|
|
"POST /command", "POST /api", "POST /fuzz/endpoints",
|
|
"POST /fuzz/params", "POST /fuzz/auth", "POST /fuzz/stop",
|
|
"POST /inject",
|
|
]}, 404)
|
|
|
|
def do_POST(self):
|
|
path = self.path.rstrip("/")
|
|
body = self._read_body()
|
|
|
|
if path == "/start":
|
|
threading.Thread(target=self.controller.start_services, daemon=True).start()
|
|
self._send_json({"status": "starting"})
|
|
|
|
elif path == "/stop":
|
|
threading.Thread(target=self.controller.stop_services, daemon=True).start()
|
|
self._send_json({"status": "stopping"})
|
|
|
|
elif path == "/config":
|
|
for k, v in body.items():
|
|
if k in self.controller.cfg.keys():
|
|
self.controller.cfg[k] = v
|
|
self.controller.cfg.save()
|
|
self._send_json({"status": "updated", "config": self.controller.cfg.safe_dict()})
|
|
|
|
elif path == "/command":
|
|
cmd = body.get("cmd", "")
|
|
if cmd:
|
|
self.controller.process_command(cmd)
|
|
self._send_json({"status": "executed", "cmd": cmd})
|
|
else:
|
|
self._send_json({"error": "provide 'cmd' field"}, 400)
|
|
|
|
elif path == "/api":
|
|
endpoint = body.get("endpoint", "")
|
|
data = body.get("data", {})
|
|
if endpoint:
|
|
from api import ubox_client
|
|
result = ubox_client.api_post(
|
|
self.controller.cfg["api_base"], endpoint,
|
|
data, self.controller.cfg["api_token"])
|
|
self._send_json({"result": result})
|
|
else:
|
|
self._send_json({"error": "provide 'endpoint' field"}, 400)
|
|
|
|
elif path == "/fuzz/endpoints":
|
|
threading.Thread(target=self.controller.run_fuzz_endpoints, daemon=True).start()
|
|
self._send_json({"status": "started"})
|
|
|
|
elif path == "/fuzz/params":
|
|
endpoint = body.get("endpoint", "user/device_list")
|
|
threading.Thread(target=self.controller.run_fuzz_params,
|
|
args=(endpoint,), daemon=True).start()
|
|
self._send_json({"status": "started", "endpoint": endpoint})
|
|
|
|
elif path == "/fuzz/auth":
|
|
threading.Thread(target=self.controller.run_fuzz_auth, daemon=True).start()
|
|
self._send_json({"status": "started"})
|
|
|
|
elif path == "/fuzz/stop":
|
|
if self.controller.fuzzer:
|
|
self.controller.fuzzer.stop()
|
|
self._send_json({"status": "stopped"})
|
|
|
|
elif path == "/inject":
|
|
result = self.controller.inject_packet(body)
|
|
self._send_json(result)
|
|
|
|
else:
|
|
self._send_json({"error": "not found"}, 404)
|
|
|
|
|
|
def start_server(controller, port=9090):
|
|
MITMApiHandler.controller = controller
|
|
server = HTTPServer(("0.0.0.0", port), MITMApiHandler)
|
|
server.timeout = 1
|
|
log(f"REST API: listening on :{port}", C_SUCCESS)
|
|
|
|
while controller.running:
|
|
server.handle_request()
|
|
|
|
server.server_close()
|
|
log("REST API: stopped", C_INFO)
|