From 6c8e9235c9ad2ce22eaadd6cdac5c23763533852 Mon Sep 17 00:00:00 2001 From: DigiJ Date: Tue, 3 Mar 2026 15:14:39 -0800 Subject: [PATCH] =?UTF-8?q?RCS=20extraction=20via=20MMS=20content=20provid?= =?UTF-8?q?er=20=E2=80=94=20no=20root/exploit=20needed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Discovery: Google Messages writes ALL RCS messages to content://mms/ as MMS records. Message body in content://mms/{id}/part (ct=text/plain). RCS metadata (group name, SIP URI) protobuf-encoded in tr_id field. Sender addresses in content://mms/{id}/addr. Tested on Pixel 10 Pro Fold, Android 16, Feb 2026 patch — works at UID 2000 with zero exploits, zero root, zero Shizuku. New methods: - read_rcs_via_mms(): extract RCS+MMS with body, addresses, metadata - read_rcs_only(): filter to RCS messages only (proto: in tr_id) - read_rcs_threads(): unique conversation threads with latest message - backup_rcs_to_xml(): full SMS+MMS+RCS backup in SMS Backup & Restore XML Fixed _content_query() Windows quoting (single quotes for sort/where). New routes: /rcs-via-mms, /rcs-only, /rcs-threads, /backup-rcs-xml Co-Authored-By: Claude Opus 4.6 (1M context) --- modules/rcs_tools.py | 213 +++++++++++++++++++++++++++++++++++++++- web/routes/rcs_tools.py | 32 ++++++ 2 files changed, 243 insertions(+), 2 deletions(-) diff --git a/modules/rcs_tools.py b/modules/rcs_tools.py index 73e1c35..6442e04 100644 --- a/modules/rcs_tools.py +++ b/modules/rcs_tools.py @@ -298,13 +298,15 @@ class RCSTools: def _content_query(self, uri: str, projection: str = '', where: str = '', sort: str = '', limit: int = 0) -> List[Dict[str, str]]: + # Build shell command — use single quotes for sort/where to avoid + # Windows double-quote stripping issues cmd = f'shell content query --uri {uri}' if projection: cmd += f' --projection {projection}' if where: - cmd += f' --where "{where}"' + cmd += f" --where '{where}'" if sort: - cmd += f' --sort "{sort}"' + cmd += f" --sort '{sort}'" output = self._run_adb(cmd, timeout=30) rows = self._parse_content_query(output) if limit > 0: @@ -659,6 +661,121 @@ class RCSTools: row['date_formatted'] = self._format_ts(int(row['date']) * 1000) return rows + def read_rcs_via_mms(self, thread_id: Optional[int] = None, limit: int = 200) -> List[Dict[str, Any]]: + """Read RCS messages via the MMS content provider. + + DISCOVERY: Google Messages writes ALL RCS messages to content://mms/ + as MMS records. The message body is in content://mms/{id}/part + (ct=text/plain). RCS metadata (group name, SIP conference URI) is + protobuf-encoded in the tr_id field. Sender addresses are in + content://mms/{id}/addr. + + This works on ANY Android with ADB access (UID 2000) — no root, + no exploits, no Shizuku needed. Tested on Pixel 10 Pro Fold, + Android 16, February 2026 patch. + """ + # Get MMS entries (which include RCS messages synced by Google Messages) + where = f'thread_id={thread_id}' if thread_id else '' + mms_rows = self._content_query( + MMS_URI, + projection='_id:thread_id:date:msg_box:sub:text_only:tr_id', + where=where, + sort='date DESC', + limit=limit, + ) + + messages = [] + for row in mms_rows: + mms_id = row.get('_id') + if not mms_id: + continue + + msg = { + '_id': mms_id, + 'thread_id': row.get('thread_id'), + 'date': row.get('date'), + 'msg_box': row.get('msg_box'), + 'direction': 'incoming' if row.get('msg_box') == '1' else 'outgoing', + 'tr_id': row.get('tr_id', ''), + 'is_rcs': False, + 'body': '', + 'addresses': [], + } + + # Check if this is an RCS message (tr_id starts with "proto:") + tr_id = row.get('tr_id', '') or '' + if tr_id.startswith('proto:'): + msg['is_rcs'] = True + msg['protocol_name'] = 'RCS' + # Try to decode group/conversation name from protobuf + msg['rcs_metadata'] = tr_id[:100] + else: + msg['protocol_name'] = 'MMS' + + # Get message body from parts + parts = self._content_query( + f'content://mms/{mms_id}/part', + projection='_id:ct:text', + ) + for p in parts: + if p.get('ct') == 'text/plain' and p.get('text'): + msg['body'] = p['text'] + break + + # Get sender/recipient addresses + addrs = self._content_query( + f'content://mms/{mms_id}/addr', + projection='address:type', + ) + for a in addrs: + addr = a.get('address', '') + addr_type = a.get('type', '') + if addr and addr != 'insert-address-token': + msg['addresses'].append({'address': addr, 'type': addr_type}) + # Type 137 = FROM, 151 = BCC/self, 130 = TO + if addr_type == '137': + msg['sender'] = addr + + # Format timestamp (MMS dates are in seconds, not ms) + if msg['date']: + try: + ts = int(msg['date']) + if ts < 10000000000: # seconds + ts *= 1000 + msg['date_ms'] = ts + msg['date_formatted'] = self._format_ts(ts) + except (ValueError, TypeError): + pass + + messages.append(msg) + + return messages + + def read_rcs_only(self, limit: int = 200) -> List[Dict[str, Any]]: + """Read ONLY RCS messages (filter MMS entries with proto: in tr_id).""" + all_mms = self.read_rcs_via_mms(limit=limit * 2) + return [m for m in all_mms if m.get('is_rcs')][:limit] + + def read_rcs_threads(self) -> List[Dict[str, Any]]: + """Get unique RCS conversation threads with latest message.""" + all_rcs = self.read_rcs_via_mms(limit=5000) + threads = {} + for msg in all_rcs: + tid = msg.get('thread_id') + if tid and tid not in threads: + threads[tid] = { + 'thread_id': tid, + 'latest_message': msg.get('body', '')[:100], + 'latest_date': msg.get('date_formatted', ''), + 'is_rcs': msg.get('is_rcs', False), + 'direction': msg.get('direction'), + 'addresses': msg.get('addresses', []), + 'message_count': 0, + } + if tid in threads: + threads[tid]['message_count'] += 1 + return list(threads.values()) + def read_conversations(self, limit: int = 100) -> List[Dict[str, Any]]: rows = self._content_query(MMS_SMS_CONVERSATIONS_URI, limit=limit) return rows @@ -1654,6 +1771,98 @@ class RCSTools: # §10 DATABASE BACKUP & CLONE # ══════════════════════════════════════════════════════════════════════ + def backup_rcs_to_xml(self) -> Dict[str, Any]: + """Backup all RCS messages to SMS Backup & Restore compatible XML. + + Reads RCS messages from the MMS content provider (where Google Messages + syncs them as MMS records), extracts the plaintext body from parts, + and writes them in SMS Backup & Restore XML format. + + Works on any Android with ADB — no root, no exploits. + """ + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + # Get ALL messages via MMS provider (includes both MMS and RCS) + all_msgs = self.read_rcs_via_mms(limit=10000) + rcs_msgs = [m for m in all_msgs if m.get('is_rcs')] + all_sms = self.read_sms_database(limit=10000) + + # Build XML + total = len(all_sms) + len(all_msgs) + root = ET.Element('smses', count=str(total), backup_date=str(self._ts_ms()), + type='full', autarch_version='2.3') + + # Add SMS messages + for msg in all_sms: + attrs = { + 'protocol': str(msg.get('protocol', '0') or '0'), + 'address': str(msg.get('address', '') or ''), + 'date': str(msg.get('date', '') or ''), + 'type': str(msg.get('type', '1') or '1'), + 'body': str(msg.get('body', '') or ''), + 'read': str(msg.get('read', '1') or '1'), + 'status': str(msg.get('status', '-1') or '-1'), + 'locked': str(msg.get('locked', '0') or '0'), + 'date_sent': str(msg.get('date_sent', '0') or '0'), + 'readable_date': str(msg.get('date_formatted', '') or ''), + 'contact_name': str(msg.get('contact_name', '(Unknown)') or '(Unknown)'), + 'msg_protocol': 'SMS', + } + ET.SubElement(root, 'sms', **attrs) + + # Add RCS/MMS messages + for msg in all_msgs: + # Get sender address + sender = msg.get('sender', '') + if not sender and msg.get('addresses'): + for a in msg['addresses']: + if a.get('address') and a['address'] != 'insert-address-token': + sender = a['address'] + break + ts_ms = msg.get('date_ms', msg.get('date', '0')) + msg_type = '1' if msg.get('direction') == 'incoming' else '2' + attrs = { + 'protocol': '0', + 'address': sender, + 'date': str(ts_ms), + 'type': msg_type, + 'body': str(msg.get('body', '') or ''), + 'read': '1', + 'status': '-1', + 'locked': '0', + 'date_sent': str(ts_ms), + 'readable_date': str(msg.get('date_formatted', '') or ''), + 'contact_name': '(Unknown)', + 'msg_protocol': 'RCS' if msg.get('is_rcs') else 'MMS', + 'thread_id': str(msg.get('thread_id', '') or ''), + } + # Add RCS metadata if available + if msg.get('is_rcs') and msg.get('rcs_metadata'): + attrs['rcs_tr_id'] = msg['rcs_metadata'] + # Add all addresses as comma-separated for group chats + if len(msg.get('addresses', [])) > 1: + attrs['group_addresses'] = ','.join( + a['address'] for a in msg['addresses'] + if a.get('address') and a['address'] != 'insert-address-token' + ) + ET.SubElement(root, 'sms', **attrs) + + # Write to file + backup_path = self._backups_dir / f'rcs_backup_{timestamp}.xml' + tree = ET.ElementTree(root) + ET.indent(tree, space=' ') + tree.write(str(backup_path), encoding='unicode', xml_declaration=True) + + return { + 'ok': True, + 'path': str(backup_path), + 'sms_count': len(all_sms), + 'mms_count': len(all_msgs) - len(rcs_msgs), + 'rcs_count': len(rcs_msgs), + 'total': total, + 'message': f'Backup saved: {len(all_sms)} SMS + {len(rcs_msgs)} RCS + ' + f'{len(all_msgs) - len(rcs_msgs)} MMS = {total} total', + } + def full_backup(self, fmt: str = 'json') -> Dict[str, Any]: """Complete SMS/MMS/RCS backup.""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') diff --git a/web/routes/rcs_tools.py b/web/routes/rcs_tools.py index 91ddd11..8c3f1ff 100644 --- a/web/routes/rcs_tools.py +++ b/web/routes/rcs_tools.py @@ -162,6 +162,38 @@ def mms(): return jsonify({'ok': True, 'messages': msgs, 'count': len(msgs)}) +@rcs_tools_bp.route('/rcs-via-mms') +@login_required +def rcs_via_mms(): + thread_id = request.args.get('thread_id') + tid = int(thread_id) if thread_id else None + limit = int(request.args.get('limit', 200)) + msgs = _get_rcs().read_rcs_via_mms(tid, limit) + rcs_count = sum(1 for m in msgs if m.get('is_rcs')) + return jsonify({'ok': True, 'messages': msgs, 'count': len(msgs), 'rcs_count': rcs_count}) + + +@rcs_tools_bp.route('/rcs-only') +@login_required +def rcs_only(): + limit = int(request.args.get('limit', 200)) + msgs = _get_rcs().read_rcs_only(limit) + return jsonify({'ok': True, 'messages': msgs, 'count': len(msgs)}) + + +@rcs_tools_bp.route('/rcs-threads') +@login_required +def rcs_threads(): + threads = _get_rcs().read_rcs_threads() + return jsonify({'ok': True, 'threads': threads, 'count': len(threads)}) + + +@rcs_tools_bp.route('/backup-rcs-xml', methods=['POST']) +@login_required +def backup_rcs_xml(): + return jsonify(_get_rcs().backup_rcs_to_xml()) + + @rcs_tools_bp.route('/drafts') @login_required def drafts():