"""REST API server — allows external tools (like Claude) to control the MITM tool""" import json import threading from http.server import HTTPServer, BaseHTTPRequestHandler from utils.log import log, log_lines, C_SUCCESS, C_ERROR, C_INFO class MITMApiHandler(BaseHTTPRequestHandler): controller = None # Set by start_server def log_message(self, format, *args): pass # Suppress default HTTP logging def _send_json(self, data, code=200): body = json.dumps(data).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", len(body)) self.end_headers() self.wfile.write(body) def _read_body(self): cl = int(self.headers.get("Content-Length", 0)) if cl > 0: return json.loads(self.rfile.read(cl).decode("utf-8")) return {} def do_GET(self): path = self.path.rstrip("/") if path == "/status": self._send_json({ "services_running": self.controller.services_running, "flags": dict(self.controller.flags), "config": self.controller.cfg.safe_dict(), }) elif path == "/logs": count = 100 if "?" in self.path: for param in self.path.split("?")[1].split("&"): if param.startswith("count="): count = int(param.split("=")[1]) with log_lines._mutex if hasattr(log_lines, '_mutex') else threading.Lock(): lines = [(l, c) for l, c in list(log_lines)[-count:]] self._send_json({"logs": [l for l, _ in lines]}) elif path == "/devices": self._send_json({"devices": self.controller.get_devices()}) elif path == "/config": self._send_json(self.controller.cfg.safe_dict()) elif path == "/fuzz/results": if self.controller.fuzzer: self._send_json({"results": self.controller.fuzzer.results}) else: self._send_json({"results": []}) else: self._send_json({"error": "not found", "endpoints": [ "GET /status", "GET /logs?count=N", "GET /devices", "GET /config", "GET /fuzz/results", "POST /start", "POST /stop", "POST /config", "POST /command", "POST /api", "POST /fuzz/endpoints", "POST /fuzz/params", "POST /fuzz/auth", "POST /fuzz/stop", "POST /inject", ]}, 404) def do_POST(self): path = self.path.rstrip("/") body = self._read_body() if path == "/start": threading.Thread(target=self.controller.start_services, daemon=True).start() self._send_json({"status": "starting"}) elif path == "/stop": threading.Thread(target=self.controller.stop_services, daemon=True).start() self._send_json({"status": "stopping"}) elif path == "/config": for k, v in body.items(): if k in self.controller.cfg.keys(): self.controller.cfg[k] = v self.controller.cfg.save() self._send_json({"status": "updated", "config": self.controller.cfg.safe_dict()}) elif path == "/command": cmd = body.get("cmd", "") if cmd: self.controller.process_command(cmd) self._send_json({"status": "executed", "cmd": cmd}) else: self._send_json({"error": "provide 'cmd' field"}, 400) elif path == "/api": endpoint = body.get("endpoint", "") data = body.get("data", {}) if endpoint: from api import ubox_client result = ubox_client.api_post( self.controller.cfg["api_base"], endpoint, data, self.controller.cfg["api_token"]) self._send_json({"result": result}) else: self._send_json({"error": "provide 'endpoint' field"}, 400) elif path == "/fuzz/endpoints": threading.Thread(target=self.controller.run_fuzz_endpoints, daemon=True).start() self._send_json({"status": "started"}) elif path == "/fuzz/params": endpoint = body.get("endpoint", "user/device_list") threading.Thread(target=self.controller.run_fuzz_params, args=(endpoint,), daemon=True).start() self._send_json({"status": "started", "endpoint": endpoint}) elif path == "/fuzz/auth": threading.Thread(target=self.controller.run_fuzz_auth, daemon=True).start() self._send_json({"status": "started"}) elif path == "/fuzz/stop": if self.controller.fuzzer: self.controller.fuzzer.stop() self._send_json({"status": "stopped"}) elif path == "/inject": result = self.controller.inject_packet(body) self._send_json(result) else: self._send_json({"error": "not found"}, 404) def start_server(controller, port=9090): MITMApiHandler.controller = controller server = HTTPServer(("0.0.0.0", port), MITMApiHandler) server.timeout = 1 log(f"REST API: listening on :{port}", C_SUCCESS) while controller.running: server.handle_request() server.server_close() log("REST API: stopped", C_INFO)