Files
cam-mitm/api/ota_bucket_probe.py
sssnake 800052acc2 Initial commit — SetecSuite Camera MITM Framework
Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.

Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.

See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
    sudo /usr/bin/python3 gui.py

Co-authored by Setec Labs.
2026-04-09 08:14:18 -07:00

155 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OTA bucket enumerator.
The UBIA OTA buckets are Tencent COS:
ubiaota-us-1312441409.cos.na-siliconvalley.myqcloud.com
ubiaota-cn-1312441409.cos.ap-guangzhou.myqcloud.com
Anonymous LIST is denied, but individual objects can be public-read (we
confirmed this with the dev_add_doc/1159_video/* demo video). This module
guesses common firmware paths and reports any that return non-403/404.
"""
import json
import os
import urllib.request
import urllib.error
from datetime import datetime
from utils.log import log, C_INFO, C_SUCCESS, C_ERROR, C_IMPORTANT, C_TRAFFIC
BUCKETS = [
"http://ubiaota-us-1312441409.cos.na-siliconvalley.myqcloud.com",
"http://ubiaota-cn-1312441409.cos.ap-guangzhou.myqcloud.com",
]
# Path templates — {pid} = product id (1619 for our camera),
# {model} = model number (2604), {ver} = a version string
PATH_TEMPLATES = [
"/dev_add_doc/{pid}/firmware.bin",
"/dev_add_doc/{pid}/host.bin",
"/dev_add_doc/{pid}/wifi.bin",
"/dev_add_doc/{pid}/{model}.bin",
"/dev_add_doc/{pid}/{ver}.bin",
"/dev_add_doc/{pid}/{ver}/host.bin",
"/dev_add_doc/{pid}/{ver}/firmware.bin",
"/dev_add_doc/{pid}/{model}.{ver}.bin",
"/dev_add_doc/{pid}_video/",
"/firmware/{pid}/host.bin",
"/firmware/{pid}/{ver}.bin",
"/firmware/{model}/{ver}.bin",
"/ota/{pid}/host.bin",
"/ota/{pid}/{ver}.bin",
"/ota/{model}/{ver}.bin",
"/ota/{model}.{ver}.bin",
"/{pid}/firmware.bin",
"/{pid}/host.bin",
"/{pid}/{ver}.bin",
"/{model}/{ver}.bin",
"/host/{model}/{ver}.bin",
"/wifi/{model}/{ver}.bin",
"/upgrade/{pid}/{ver}.bin",
"/upgrade/{model}/{ver}.bin",
"/dev_fw/{pid}/{ver}.bin",
"/dev_fw/{model}/{ver}.bin",
"/{model}.{ver}.bin",
"/{ver}.bin",
]
# Versions to try — descending from current downward
VERSIONS_TO_TRY = [
"2604.1.2.69",
"2604.1.2.68",
"2604.1.2.0",
"2604.0.29.8",
"2604.0.29.7",
"2604.0.29.0",
"2604.0.0.0",
"2604",
]
PRODUCT_IDS = ["1619"]
MODELS = ["2604"]
def _head(url, timeout=8):
req = urllib.request.Request(url, method="HEAD", headers={
"User-Agent": "okhttp/4.9.1",
})
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, dict(r.headers), None
except urllib.error.HTTPError as e:
return e.code, dict(e.headers) if e.headers else {}, None
except urllib.error.URLError as e:
return None, {}, str(e.reason)
except Exception as e:
return None, {}, str(e)
def _build_paths():
paths = set()
for pid in PRODUCT_IDS:
for model in MODELS:
for ver in VERSIONS_TO_TRY:
for tmpl in PATH_TEMPLATES:
paths.add(tmpl.format(pid=pid, model=model, ver=ver))
return sorted(paths)
def probe(cfg=None):
"""
Try every path×bucket combo. Report any HEAD that returns 200 or
that has Content-Length > 0. Also report 403 (auth required —
means the object exists), separately from 404.
Returns:
{
"ok": bool,
"found": [{"url","status","size","content_type"}],
"exists_403": [...],
"tried": int,
}
"""
paths = _build_paths()
log(f"OTA-PROBE: trying {len(paths)} paths × {len(BUCKETS)} buckets = "
f"{len(paths) * len(BUCKETS)} requests…", C_INFO)
found = []
exists_403 = []
tried = 0
for bucket in BUCKETS:
for p in paths:
url = bucket + p
tried += 1
status, headers, err = _head(url)
if err:
continue
ct = headers.get("Content-Type", "?")
cl = headers.get("Content-Length", "0")
try:
cl_i = int(cl)
except (TypeError, ValueError):
cl_i = 0
if status == 200:
log(f" ✓ HIT 200 {url} size={cl_i} ct={ct}", C_IMPORTANT)
found.append({"url": url, "status": 200,
"size": cl_i, "content_type": ct})
elif status == 403:
# Tencent COS returns 403 for "exists but no access" AND for
# "doesn't exist" — but the response body differs. We log
# them anyway since some might be real.
exists_403.append({"url": url, "status": 403, "ct": ct})
elif status and status not in (404,):
log(f" ? {status} {url}", C_TRAFFIC)
log(f"OTA-PROBE: done. {len(found)} hits, {len(exists_403)} 403s, "
f"{tried - len(found) - len(exists_403)} misses", C_SUCCESS)
return {
"ok": bool(found),
"found": found,
"exists_403_count": len(exists_403),
"tried": tried,
}