Files
cam-mitm/api/firmware_fetch.py

205 lines
6.7 KiB
Python
Raw Normal View History

"""
Firmware download helper.
Calls the UBox cloud `check_version` endpoint, extracts every URL it sees in
the response, downloads each one to ~/dumps/javiscam_fw/, and reports sizes
and sha256 hashes. Real code no stubs, no placeholders.
"""
import hashlib
import json
import os
import re
import socket
import urllib.request
import urllib.error
from datetime import datetime
from utils.log import log, C_INFO, C_SUCCESS, C_ERROR, C_TRAFFIC, C_IMPORTANT
from api import ubox_client
FW_DIR = os.path.expanduser("~/dumps/javiscam_fw")
URL_RE = re.compile(rb'https?://[^\s"\']+')
def _walk_strings(obj):
"""Yield every string value in a nested dict/list."""
if isinstance(obj, str):
yield obj
elif isinstance(obj, dict):
for v in obj.values():
yield from _walk_strings(v)
elif isinstance(obj, list):
for v in obj:
yield from _walk_strings(v)
def _extract_urls(obj):
urls = set()
for s in _walk_strings(obj):
for m in URL_RE.findall(s.encode("utf-8")):
urls.add(m.decode("utf-8", errors="replace"))
return sorted(urls)
def _safe_filename(url):
name = url.split("?")[0].rsplit("/", 1)[-1] or "firmware.bin"
name = re.sub(r"[^A-Za-z0-9._-]", "_", name)
if not name:
name = "firmware.bin"
return name
def _download(url, dest):
req = urllib.request.Request(url, headers={
"User-Agent": "okhttp/4.9.1",
"X-UbiaAPI-CallContext": "source=app&app=ubox&ver=1.1.360&osver=14",
})
with urllib.request.urlopen(req, timeout=60) as resp:
total = 0
h = hashlib.sha256()
with open(dest, "wb") as f:
while True:
chunk = resp.read(64 * 1024)
if not chunk:
break
f.write(chunk)
h.update(chunk)
total += len(chunk)
return total, h.hexdigest(), resp.headers.get("Content-Type", "?")
def check_and_download(cfg, host_version=None):
"""
Returns dict:
{
"ok": bool,
"url_count": int,
"downloads": [{"url","path","bytes","sha256","content_type"}],
"raw": <response json>,
"error": optional str,
}
"""
if not cfg.get("api_token"):
log("FW: not logged in", C_ERROR)
return {"ok": False, "error": "not logged in"}
if not cfg.get("device_uid"):
log("FW: no device_uid — call /devices first", C_ERROR)
return {"ok": False, "error": "no device_uid"}
os.makedirs(FW_DIR, exist_ok=True)
# Try several version formats — the cloud needs a string the device-side
# comparator considers "older than current" to trigger a download URL.
version_attempts = [host_version] if host_version else [
"2604.0.0.1", # same model prefix, ancient
"1.0.0.0",
"0.0.0.1",
"2604.0.29.7", # one below known shipped version
"0",
"",
]
result = None
used_version = None
attempts_log = [] # list of (version, status, summary)
best_success = None # remember the best msg=success response even without URLs
for v in version_attempts:
log(f"FW: trying check_version with host_version={v!r}", C_INFO)
candidate = ubox_client.api_post(
cfg["api_base"],
"user/qry/device/check_version/v3",
{
"device_uid": cfg["device_uid"],
"host_version": v,
"wifi_version": v,
"is_lite": False,
"zone_id": 2,
},
cfg["api_token"],
)
summary = json.dumps(candidate)[:200] if isinstance(candidate, dict) else repr(candidate)[:200]
attempts_log.append({"version": v, "summary": summary})
if not isinstance(candidate, dict):
continue
urls_found = _extract_urls(candidate)
if urls_found:
log(f"FW: got {len(urls_found)} URL(s) using version={v!r}", C_SUCCESS)
result = candidate
used_version = v
break
if candidate.get("msg") == "success" and best_success is None:
best_success = (v, candidate)
log(f"FW: no URLs at version={v!r}, response={summary}", C_INFO)
if result is None:
if best_success is not None:
used_version, result = best_success
log(f"FW: keeping best success response from version={used_version!r}", C_INFO)
else:
result = candidate if isinstance(candidate, dict) else {"error": "all versions failed"}
if not isinstance(result, dict):
log(f"FW: bad response: {result!r}", C_ERROR)
return {"ok": False, "error": "bad response", "raw": result}
# Save the raw response next to firmware files for the record
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
meta_path = os.path.join(FW_DIR, f"check_version_{ts}.json")
try:
with open(meta_path, "w") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
except OSError as e:
log(f"FW: cannot save meta: {e}", C_ERROR)
urls = _extract_urls(result)
log(f"FW: response saved -> {meta_path}", C_INFO)
log(f"FW: found {len(urls)} URL(s) in response", C_INFO)
for u in urls:
log(f"{u}", C_TRAFFIC)
downloads = []
for url in urls:
# Only attempt downloads of plausible binary blobs
lower = url.lower()
if not any(lower.endswith(ext) or ext in lower for ext in
(".bin", ".rar", ".zip", ".tar", ".gz", ".img", ".pkg",
"/firmware", "/ota", "ubiaota", "update")):
log(f" skip (not firmware-looking): {url}", C_INFO)
continue
fname = _safe_filename(url)
dest = os.path.join(FW_DIR, fname)
try:
log(f"FW: downloading {url}", C_INFO)
n, sha, ct = _download(url, dest)
log(f"FW: ✓ {fname} {n}B sha256={sha[:16]}… ct={ct}",
C_IMPORTANT if n > 1024 else C_TRAFFIC)
downloads.append({
"url": url, "path": dest, "bytes": n,
"sha256": sha, "content_type": ct,
})
except urllib.error.HTTPError as e:
log(f"FW: HTTP {e.code} on {url}", C_ERROR)
except urllib.error.URLError as e:
log(f"FW: URL error: {e.reason} on {url}", C_ERROR)
except (socket.timeout, OSError) as e:
log(f"FW: download failed: {e}", C_ERROR)
if not downloads:
log("FW: nothing downloaded — check_version returned no firmware URLs", C_ERROR)
return {
"ok": bool(downloads),
"used_version": used_version,
"url_count": len(urls),
"downloads": downloads,
"attempts": attempts_log,
"raw": result,
}