KernelSU-Next/Magisk/APatch module providing: - Threat scanner with 11,000+ IOCs (stalkerware, Pegasus, government spyware) - FrostGuard file integrity monitor (pseudo-locked-bootloader) - Encryption key wiper / BFU mode transition - Forensic Shield (anti-Cellebrite with 71 known binary hashes) - Silent SMS detection (Type-0, Class-0, WAP Push) - Network monitor with C2/tracker domain and IP blocking - vigild daemon with periodic scanning and alert management - Full CLI interface
556 lines
24 KiB
Python
556 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vigil IOC Database Builder
|
|
Extracts Indicators of Compromise from research repositories and writes
|
|
unified indicator files for the Vigil anti-surveillance module.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import OrderedDict
|
|
|
|
RESEARCH = "/home/snake/research/repos"
|
|
OUT_DIR = "/home/snake/vigil/vigil/ioc"
|
|
|
|
os.makedirs(OUT_DIR, exist_ok=True)
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Helpers
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def dedup_sorted(lines):
|
|
"""Return sorted, deduplicated list (case-sensitive)."""
|
|
return sorted(set(l for l in lines if l.strip()))
|
|
|
|
|
|
def write_ioc(filename, lines, header=""):
|
|
path = os.path.join(OUT_DIR, filename)
|
|
lines = dedup_sorted(lines)
|
|
with open(path, "w") as f:
|
|
if header:
|
|
f.write(header + "\n")
|
|
for line in lines:
|
|
f.write(line + "\n")
|
|
return len(lines)
|
|
|
|
|
|
def read_file(path):
|
|
if not os.path.isfile(path):
|
|
return ""
|
|
with open(path, "r", errors="replace") as f:
|
|
return f.read()
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# YAML parser (no pyyaml dependency -- simple state-machine parser)
|
|
# Parses stalkerware-indicators/ioc.yaml
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def parse_stalkerware_yaml(path):
|
|
"""
|
|
Parse the stalkerware-indicators ioc.yaml which has a known structure:
|
|
- name: ThreatName
|
|
type: stalkerware
|
|
packages:
|
|
- com.example.pkg
|
|
certificates:
|
|
- DEADBEEF...
|
|
websites:
|
|
- example.com
|
|
distribution:
|
|
- dist.example.com
|
|
c2:
|
|
ips:
|
|
- 1.2.3.4
|
|
domains:
|
|
- c2.example.com
|
|
Returns lists of: packages, certificates, domains (with category), ips
|
|
"""
|
|
text = read_file(path)
|
|
if not text:
|
|
return [], [], [], []
|
|
|
|
packages = [] # (pkg, threat_name, type)
|
|
certificates = [] # (hash, threat_name)
|
|
domains = [] # (domain, threat_name, category)
|
|
ips = [] # (ip, threat_name)
|
|
|
|
current_name = None
|
|
current_type = "stalkerware"
|
|
current_section = None # packages, certificates, websites, distribution, c2_ips, c2_domains
|
|
in_c2 = False
|
|
|
|
for line in text.splitlines():
|
|
stripped = line.rstrip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
|
|
# Top-level entry
|
|
m = re.match(r'^- name:\s*(.+)', stripped)
|
|
if m:
|
|
current_name = m.group(1).strip()
|
|
current_section = None
|
|
in_c2 = False
|
|
continue
|
|
|
|
# type field
|
|
m = re.match(r'^\s+type:\s*(.+)', stripped)
|
|
if m:
|
|
current_type = m.group(1).strip()
|
|
continue
|
|
|
|
# names field (aliases) -- skip
|
|
if re.match(r'^\s+names:\s*$', stripped):
|
|
current_section = "names"
|
|
continue
|
|
|
|
# Section headers
|
|
if re.match(r'^\s+packages:\s*$', stripped):
|
|
current_section = "packages"
|
|
in_c2 = False
|
|
continue
|
|
if re.match(r'^\s+certificates:\s*$', stripped):
|
|
current_section = "certificates"
|
|
in_c2 = False
|
|
continue
|
|
if re.match(r'^\s+websites:\s*$', stripped):
|
|
current_section = "websites"
|
|
in_c2 = False
|
|
continue
|
|
if re.match(r'^\s+distribution:\s*$', stripped):
|
|
current_section = "distribution"
|
|
in_c2 = False
|
|
continue
|
|
if re.match(r'^\s+c2:\s*$', stripped):
|
|
in_c2 = True
|
|
current_section = None
|
|
continue
|
|
if in_c2 and re.match(r'^\s+ips:\s*$', stripped):
|
|
current_section = "c2_ips"
|
|
continue
|
|
if in_c2 and re.match(r'^\s+domains:\s*$', stripped):
|
|
current_section = "c2_domains"
|
|
continue
|
|
|
|
# List items
|
|
m = re.match(r'^\s+- (.+)', stripped)
|
|
if m and current_name:
|
|
val = m.group(1).strip()
|
|
if current_section == "packages":
|
|
packages.append((val, current_name, current_type))
|
|
elif current_section == "certificates":
|
|
certificates.append((val, current_name))
|
|
elif current_section == "websites":
|
|
domains.append((val, current_name, "tracking"))
|
|
elif current_section == "distribution":
|
|
domains.append((val, current_name, "distribution"))
|
|
elif current_section == "c2_domains":
|
|
domains.append((val, current_name, "c2"))
|
|
elif current_section == "c2_ips":
|
|
ips.append((val, current_name))
|
|
# skip "names" items
|
|
|
|
return packages, certificates, domains, ips
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 1. PACKAGES
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_packages():
|
|
lines = []
|
|
|
|
# Source 1: stalkerware-indicators
|
|
pkgs, _, _, _ = parse_stalkerware_yaml(
|
|
os.path.join(RESEARCH, "stalkerware-indicators/ioc.yaml"))
|
|
for pkg, name, typ in pkgs:
|
|
lines.append(f"{pkg}|{name}|{typ}")
|
|
|
|
# Source 2: threat-research indicators -- look for android_package_name in CSVs
|
|
tr_csv_dir = os.path.join(RESEARCH, "threat-research/indicators/csv")
|
|
if os.path.isdir(tr_csv_dir):
|
|
for root, dirs, files in os.walk(tr_csv_dir):
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("indicator_type") or row.get("type") or "").strip().lower()
|
|
val = (row.get("indicator_value") or row.get("value") or "").strip()
|
|
comment = (row.get("comment") or "").strip()
|
|
if "android" in itype and "package" in itype and val:
|
|
threat = comment if comment else os.path.basename(fn).replace(".csv", "")
|
|
lines.append(f"{val}|{threat}|spyware")
|
|
except Exception:
|
|
pass
|
|
|
|
# Source 3: isdi app-flags.csv -- extract appId where flag is spyware
|
|
isdi_path = os.path.join(RESEARCH, "isdi/static_data/app-flags.csv")
|
|
if os.path.isfile(isdi_path):
|
|
content = read_file(isdi_path)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
flag = (row.get("flag") or "").strip().lower()
|
|
app_id = (row.get("appId") or "").strip()
|
|
if flag == "spyware" and app_id:
|
|
title = (row.get("title") or "").strip()
|
|
threat = title.split(",")[0].split("|")[0].strip() if title else "Unknown"
|
|
lines.append(f"{app_id}|{threat}|stalkerware")
|
|
|
|
# Source 4: MVT ROOT_PACKAGES
|
|
mvt_utils = os.path.join(RESEARCH, "mvt/src/mvt/android/utils.py")
|
|
if os.path.isfile(mvt_utils):
|
|
content = read_file(mvt_utils)
|
|
# Extract ROOT_PACKAGES list
|
|
m = re.search(r'ROOT_PACKAGES.*?\[(.*?)\]', content, re.DOTALL)
|
|
if m:
|
|
for pkg in re.findall(r'"([^"]+)"', m.group(1)):
|
|
lines.append(f"{pkg}|MVT_RootDetection|forensic")
|
|
|
|
return write_ioc("packages.txt", lines,
|
|
"# Vigil IOC: Malicious/stalkerware Android packages\n"
|
|
"# Format: package_name|threat_name|category")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 2. CERTIFICATES
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_certificates():
|
|
lines = []
|
|
_, certs, _, _ = parse_stalkerware_yaml(
|
|
os.path.join(RESEARCH, "stalkerware-indicators/ioc.yaml"))
|
|
for h, name in certs:
|
|
# Determine hash type by length
|
|
h_clean = h.strip()
|
|
if len(h_clean) == 40:
|
|
htype = "SHA1"
|
|
elif len(h_clean) == 64:
|
|
htype = "SHA256"
|
|
elif len(h_clean) == 32:
|
|
htype = "MD5"
|
|
else:
|
|
htype = "unknown"
|
|
lines.append(f"{h_clean}|{name}|{htype}")
|
|
|
|
return write_ioc("certificates.txt", lines,
|
|
"# Vigil IOC: Malicious signing certificate hashes\n"
|
|
"# Format: hash|threat_name|hash_type")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 3. DOMAINS
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_domains():
|
|
lines = []
|
|
|
|
# Source 1: stalkerware-indicators
|
|
_, _, doms, _ = parse_stalkerware_yaml(
|
|
os.path.join(RESEARCH, "stalkerware-indicators/ioc.yaml"))
|
|
for d, name, cat in doms:
|
|
lines.append(f"{d}|{name}|{cat}")
|
|
|
|
# Source 2: mobiletrackers list.txt
|
|
mt_path = os.path.join(RESEARCH, "mobiletrackers/list.txt")
|
|
if os.path.isfile(mt_path):
|
|
for line in read_file(mt_path).splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
# Should be a domain
|
|
if re.match(r'^[a-zA-Z0-9][\w.-]+\.[a-zA-Z]{2,}$', line):
|
|
lines.append(f"{line}|MobileTracker|tracking")
|
|
|
|
# Source 3: malware-indicators CSVs -- domain types
|
|
mi_dir = os.path.join(RESEARCH, "malware-indicators")
|
|
if os.path.isdir(mi_dir):
|
|
for root, dirs, files in os.walk(mi_dir):
|
|
# Skip .git
|
|
dirs[:] = [d for d in dirs if d != ".git"]
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
# Determine threat from directory name
|
|
parent = os.path.basename(os.path.dirname(fpath))
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("type") or row.get("indicator_type") or "").strip().lower()
|
|
val = (row.get("value") or row.get("indicator_value") or "").strip().strip('"')
|
|
if itype == "domain" and val:
|
|
lines.append(f"{val}|{parent}|c2")
|
|
elif itype == "domain_name" and val:
|
|
lines.append(f"{val}|{parent}|c2")
|
|
except Exception:
|
|
pass
|
|
|
|
# Source 4: threat-research CSVs -- domain_name types
|
|
tr_csv_dir = os.path.join(RESEARCH, "threat-research/indicators/csv")
|
|
if os.path.isdir(tr_csv_dir):
|
|
for root, dirs, files in os.walk(tr_csv_dir):
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("indicator_type") or row.get("type") or "").strip().lower()
|
|
val = (row.get("indicator_value") or row.get("value") or "").strip().strip('"')
|
|
if itype in ("domain_name", "domain") and val:
|
|
threat = os.path.basename(fn).replace(".csv", "")
|
|
lines.append(f"{val}|{threat}|c2")
|
|
except Exception:
|
|
pass
|
|
|
|
return write_ioc("domains.txt", lines,
|
|
"# Vigil IOC: C2 and tracking domains\n"
|
|
"# Format: domain|threat_name|category")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 4. IPS
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_ips():
|
|
lines = []
|
|
|
|
# Source 1: stalkerware-indicators
|
|
_, _, _, ip_list = parse_stalkerware_yaml(
|
|
os.path.join(RESEARCH, "stalkerware-indicators/ioc.yaml"))
|
|
for ip, name in ip_list:
|
|
lines.append(f"{ip}|{name}|c2")
|
|
|
|
# Source 2: malware-indicators CSVs -- ip-dst types
|
|
mi_dir = os.path.join(RESEARCH, "malware-indicators")
|
|
if os.path.isdir(mi_dir):
|
|
for root, dirs, files in os.walk(mi_dir):
|
|
dirs[:] = [d for d in dirs if d != ".git"]
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
parent = os.path.basename(os.path.dirname(fpath))
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("type") or "").strip().lower()
|
|
val = (row.get("value") or "").strip().strip('"')
|
|
if itype in ("ip-dst", "ip-src") and val:
|
|
lines.append(f"{val}|{parent}|c2")
|
|
except Exception:
|
|
pass
|
|
|
|
# Source 3: threat-research CSVs
|
|
tr_csv_dir = os.path.join(RESEARCH, "threat-research/indicators/csv")
|
|
if os.path.isdir(tr_csv_dir):
|
|
for root, dirs, files in os.walk(tr_csv_dir):
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("indicator_type") or row.get("type") or "").strip().lower()
|
|
val = (row.get("indicator_value") or row.get("value") or "").strip().strip('"')
|
|
if itype in ("ip_address", "ip-dst", "ip-src") and val:
|
|
threat = os.path.basename(fn).replace(".csv", "")
|
|
lines.append(f"{val}|{threat}|c2")
|
|
except Exception:
|
|
pass
|
|
|
|
return write_ioc("ips.txt", lines,
|
|
"# Vigil IOC: Malicious IPs\n"
|
|
"# Format: ip|threat_name|category")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 5. HASHES
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_hashes():
|
|
lines = []
|
|
|
|
# Source 1: stalkerware-indicators generated -- look for samples/hashes
|
|
gen_dir = os.path.join(RESEARCH, "stalkerware-indicators/generated")
|
|
if os.path.isdir(gen_dir):
|
|
for fn in os.listdir(gen_dir):
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(gen_dir, fn)
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
# Look for hash-like columns
|
|
for key in row:
|
|
kl = key.lower()
|
|
val = (row[key] or "").strip()
|
|
if "sha256" in kl and len(val) == 64 and re.match(r'^[0-9a-fA-F]+$', val):
|
|
app = row.get("app", row.get("name", fn))
|
|
lines.append(f"{val}|{app}|SHA256")
|
|
elif "sha1" in kl and len(val) == 40 and re.match(r'^[0-9a-fA-F]+$', val):
|
|
app = row.get("app", row.get("name", fn))
|
|
lines.append(f"{val}|{app}|SHA1")
|
|
elif "md5" in kl and len(val) == 32 and re.match(r'^[0-9a-fA-F]+$', val):
|
|
app = row.get("app", row.get("name", fn))
|
|
lines.append(f"{val}|{app}|MD5")
|
|
except Exception:
|
|
pass
|
|
|
|
# Source 2: malware-indicators CSVs -- sha256, md5 types
|
|
mi_dir = os.path.join(RESEARCH, "malware-indicators")
|
|
if os.path.isdir(mi_dir):
|
|
for root, dirs, files in os.walk(mi_dir):
|
|
dirs[:] = [d for d in dirs if d != ".git"]
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
parent = os.path.basename(os.path.dirname(fpath))
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
fields = reader.fieldnames or []
|
|
for row in reader:
|
|
itype = (row.get("type") or "").strip().lower()
|
|
val = (row.get("value") or "").strip().strip('"')
|
|
if itype == "sha256" and len(val) == 64:
|
|
lines.append(f"{val}|{parent}|SHA256")
|
|
elif itype == "md5" and len(val) == 32:
|
|
lines.append(f"{val}|{parent}|MD5")
|
|
elif itype == "sha1" and len(val) == 40:
|
|
lines.append(f"{val}|{parent}|SHA1")
|
|
# Also check named columns (like hashes.csv)
|
|
if "MD5" in fields:
|
|
md5 = (row.get("MD5") or "").strip()
|
|
if len(md5) == 32 and re.match(r'^[0-9a-fA-F]+$', md5):
|
|
lines.append(f"{md5}|{parent}|MD5")
|
|
except Exception:
|
|
pass
|
|
|
|
# Source 3: threat-research CSVs
|
|
tr_csv_dir = os.path.join(RESEARCH, "threat-research/indicators/csv")
|
|
if os.path.isdir(tr_csv_dir):
|
|
for root, dirs, files in os.walk(tr_csv_dir):
|
|
for fn in files:
|
|
if not fn.endswith(".csv"):
|
|
continue
|
|
fpath = os.path.join(root, fn)
|
|
try:
|
|
content = read_file(fpath)
|
|
reader = csv.DictReader(io.StringIO(content))
|
|
for row in reader:
|
|
itype = (row.get("indicator_type") or row.get("type") or "").strip().lower()
|
|
val = (row.get("indicator_value") or row.get("value") or "").strip().strip('"')
|
|
if itype in ("sha256", "hash_sha256") and len(val) == 64:
|
|
threat = os.path.basename(fn).replace(".csv", "")
|
|
lines.append(f"{val}|{threat}|SHA256")
|
|
elif itype in ("md5", "hash_md5") and len(val) == 32:
|
|
threat = os.path.basename(fn).replace(".csv", "")
|
|
lines.append(f"{val}|{threat}|MD5")
|
|
except Exception:
|
|
pass
|
|
|
|
return write_ioc("hashes.txt", lines,
|
|
"# Vigil IOC: Malicious file hashes\n"
|
|
"# Format: hash|threat_name|hash_type")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 6. HOSTS (blocklist format)
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_hosts():
|
|
"""Build hosts file from all domains in domains.txt."""
|
|
domains_path = os.path.join(OUT_DIR, "domains.txt")
|
|
lines = []
|
|
if os.path.isfile(domains_path):
|
|
for line in open(domains_path):
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = line.split("|")
|
|
domain = parts[0].strip()
|
|
if domain and re.match(r'^[a-zA-Z0-9][\w.-]+\.[a-zA-Z]{2,}$', domain):
|
|
lines.append(f"0.0.0.0 {domain}")
|
|
|
|
return write_ioc("hosts.txt", lines,
|
|
"# Vigil IOC: Hosts blocklist (C2 + tracking domains)\n"
|
|
"# Format: 0.0.0.0 domain")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 7. CELLEBRITE HASHES
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def build_cellebrite_hashes():
|
|
lines = []
|
|
|
|
lockup_service = os.path.join(
|
|
RESEARCH, "lockup/app/src/main/java/com/lockup/LockUpService.java")
|
|
if os.path.isfile(lockup_service):
|
|
content = read_file(lockup_service)
|
|
|
|
# Extract CB_ELEVATOR_HASHES array
|
|
m = re.search(r'CB_ELEVATOR_HASHES\s*=\s*new\s+String\[\]\s*\{(.*?)\}', content, re.DOTALL)
|
|
if m:
|
|
for h in re.findall(r'"([0-9a-fA-F]{64})"', m.group(1)):
|
|
lines.append(f"{h}|Cellebrite_UFED_Elevator|SHA256")
|
|
|
|
# Extract bannedKeys array
|
|
m = re.search(r'bannedKeys\s*=\s*new\s+String\[\]\s*\{(.*?)\}', content, re.DOTALL)
|
|
if m:
|
|
for h in re.findall(r'"([0-9a-fA-F]{64})"', m.group(1)):
|
|
lines.append(f"{h}|Cellebrite_BannedKey|SHA256")
|
|
|
|
return write_ioc("cellebrite_hashes.txt", lines,
|
|
"# Vigil IOC: Cellebrite forensic tool hashes\n"
|
|
"# Format: hash|threat_name|hash_type")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Main
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Vigil IOC Database Builder")
|
|
print("=" * 60)
|
|
|
|
counts = {}
|
|
counts["packages.txt"] = build_packages()
|
|
counts["certificates.txt"] = build_certificates()
|
|
counts["domains.txt"] = build_domains()
|
|
counts["ips.txt"] = build_ips()
|
|
counts["hashes.txt"] = build_hashes()
|
|
counts["hosts.txt"] = build_hosts()
|
|
counts["cellebrite_hashes.txt"] = build_cellebrite_hashes()
|
|
|
|
print()
|
|
total = 0
|
|
for fn, count in counts.items():
|
|
print(f" {fn:30s} {count:>6d} indicators")
|
|
total += count
|
|
print(f" {'TOTAL':30s} {total:>6d} indicators")
|
|
print()
|
|
print(f"Output directory: {OUT_DIR}")
|
|
print("Done.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|