Files
cam-mitm/api/firmware_fetch.py
sssnake 800052acc2 Initial commit — SetecSuite Camera MITM Framework
Original tooling from the Camhak research project (camera teardown of a
rebranded UBIA / Javiscam IP camera). PyQt6 GUI on top of a curses TUI on
top of a service controller; per-service start/stop, intruder detection,
protocol fingerprinting, OAM HMAC signing, CVE verifiers, OTA bucket
probe, firmware fetcher, fuzzer, packet injection.

Tabs: Dashboard, Live Log, Intruders, Cloud API, Fuzzer, Inject, CVEs,
Config, Help. Real-time per-packet protocol detection, conntrack-based
original-destination lookup, log rotation at 1 GiB.

See SECURITY_PAPER.md for the full writeup, site/index.html for the
public report, README.md for usage. Run with:
    sudo /usr/bin/python3 gui.py

Co-authored by Setec Labs.
2026-04-09 08:14:18 -07:00

205 lines
6.7 KiB
Python

"""
Firmware download helper.
Calls the UBox cloud `check_version` endpoint, extracts every URL it sees in
the response, downloads each one to ~/dumps/javiscam_fw/, and reports sizes
and sha256 hashes. Real code — no stubs, no placeholders.
"""
import hashlib
import json
import os
import re
import socket
import urllib.request
import urllib.error
from datetime import datetime
from utils.log import log, C_INFO, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT
from api import ubox_client
FW_DIR = os.path.expanduser("~/dumps/javiscam_fw")
URL_RE = re.compile(rb'https?://[^\s"\']+')
def _walk_strings(obj):
"""Yield every string value in a nested dict/list."""
if isinstance(obj, str):
yield obj
elif isinstance(obj, dict):
for v in obj.values():
yield from _walk_strings(v)
elif isinstance(obj, list):
for v in obj:
yield from _walk_strings(v)
def _extract_urls(obj):
urls = set()
for s in _walk_strings(obj):
for m in URL_RE.findall(s.encode("utf-8")):
urls.add(m.decode("utf-8", errors="replace"))
return sorted(urls)
def _safe_filename(url):
name = url.split("?")[0].rsplit("/", 1)[-1] or "firmware.bin"
name = re.sub(r"[^A-Za-z0-9._-]", "_", name)
if not name:
name = "firmware.bin"
return name
def _download(url, dest):
req = urllib.request.Request(url, headers={
"User-Agent": "okhttp/4.9.1",
"X-UbiaAPI-CallContext": "source=app&app=ubox&ver=1.1.360&osver=14",
})
with urllib.request.urlopen(req, timeout=60) as resp:
total = 0
h = hashlib.sha256()
with open(dest, "wb") as f:
while True:
chunk = resp.read(64 * 1024)
if not chunk:
break
f.write(chunk)
h.update(chunk)
total += len(chunk)
return total, h.hexdigest(), resp.headers.get("Content-Type", "?")
def check_and_download(cfg, host_version=None):
"""
Returns dict:
{
"ok": bool,
"url_count": int,
"downloads": [{"url","path","bytes","sha256","content_type"}],
"raw": <response json>,
"error": optional str,
}
"""
if not cfg.get("api_token"):
log("FW: not logged in", C_ERROR)
return {"ok": False, "error": "not logged in"}
if not cfg.get("device_uid"):
log("FW: no device_uid — call /devices first", C_ERROR)
return {"ok": False, "error": "no device_uid"}
os.makedirs(FW_DIR, exist_ok=True)
# Try several version formats — the cloud needs a string the device-side
# comparator considers "older than current" to trigger a download URL.
version_attempts = [host_version] if host_version else [
"2604.0.0.1", # same model prefix, ancient
"1.0.0.0",
"0.0.0.1",
"2604.0.29.7", # one below known shipped version
"0",
"",
]
result = None
used_version = None
attempts_log = [] # list of (version, status, summary)
best_success = None # remember the best msg=success response even without URLs
for v in version_attempts:
log(f"FW: trying check_version with host_version={v!r}", C_INFO)
candidate = ubox_client.api_post(
cfg["api_base"],
"user/qry/device/check_version/v3",
{
"device_uid": cfg["device_uid"],
"host_version": v,
"wifi_version": v,
"is_lite": False,
"zone_id": 2,
},
cfg["api_token"],
)
summary = json.dumps(candidate)[:200] if isinstance(candidate, dict) else repr(candidate)[:200]
attempts_log.append({"version": v, "summary": summary})
if not isinstance(candidate, dict):
continue
urls_found = _extract_urls(candidate)
if urls_found:
log(f"FW: got {len(urls_found)} URL(s) using version={v!r}", C_SUCCESS)
result = candidate
used_version = v
break
if candidate.get("msg") == "success" and best_success is None:
best_success = (v, candidate)
log(f"FW: no URLs at version={v!r}, response={summary}", C_INFO)
if result is None:
if best_success is not None:
used_version, result = best_success
log(f"FW: keeping best success response from version={used_version!r}", C_INFO)
else:
result = candidate if isinstance(candidate, dict) else {"error": "all versions failed"}
if not isinstance(result, dict):
log(f"FW: bad response: {result!r}", C_ERROR)
return {"ok": False, "error": "bad response", "raw": result}
# Save the raw response next to firmware files for the record
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
meta_path = os.path.join(FW_DIR, f"check_version_{ts}.json")
try:
with open(meta_path, "w") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
except OSError as e:
log(f"FW: cannot save meta: {e}", C_ERROR)
urls = _extract_urls(result)
log(f"FW: response saved -> {meta_path}", C_INFO)
log(f"FW: found {len(urls)} URL(s) in response", C_INFO)
for u in urls:
log(f"{u}", C_TRAFFIC)
downloads = []
for url in urls:
# Only attempt downloads of plausible binary blobs
lower = url.lower()
if not any(lower.endswith(ext) or ext in lower for ext in
(".bin", ".rar", ".zip", ".tar", ".gz", ".img", ".pkg",
"/firmware", "/ota", "ubiaota", "update")):
log(f" skip (not firmware-looking): {url}", C_INFO)
continue
fname = _safe_filename(url)
dest = os.path.join(FW_DIR, fname)
try:
log(f"FW: downloading {url}", C_INFO)
n, sha, ct = _download(url, dest)
log(f"FW: ✓ {fname} {n}B sha256={sha[:16]}… ct={ct}",
C_IMPORTANT if n > 1024 else C_TRAFFIC)
downloads.append({
"url": url, "path": dest, "bytes": n,
"sha256": sha, "content_type": ct,
})
except urllib.error.HTTPError as e:
log(f"FW: HTTP {e.code} on {url}", C_ERROR)
except urllib.error.URLError as e:
log(f"FW: URL error: {e.reason} on {url}", C_ERROR)
except (socket.timeout, OSError) as e:
log(f"FW: download failed: {e}", C_ERROR)
if not downloads:
log("FW: nothing downloaded — check_version returned no firmware URLs", C_ERROR)
return {
"ok": bool(downloads),
"used_version": used_version,
"url_count": len(urls),
"downloads": downloads,
"attempts": attempts_log,
"raw": result,
}