Files
spy_hunter/modules/deep_scan.py

415 lines
14 KiB
Python
Raw Normal View History

2026-03-13 12:59:54 -07:00
# -*- coding: utf-8 -*-
# path: C:\Users\mdavi\PycharmProjects\SpyHunter\modules\deep_scan.py
import os
import subprocess
import time
import requests
import hashlib
from datetime import datetime
# ---------------------------------------------------------------------------
# Optional Androguard Imports
# ---------------------------------------------------------------------------
try:
from androguard.core.bytecodes.apk import APK
from androguard.core.bytecodes.dvm import DalvikVMFormat
from androguard.core.analysis.analysis import Analysis
ANDROGUARD_AVAILABLE = True
except (ImportError, ModuleNotFoundError):
print("[WARN] Androguard not available; heuristic scans will be skipped.")
ANDROGUARD_AVAILABLE = False
# ---------------------------------------------------------------------------
# Optional YARA Imports
# ---------------------------------------------------------------------------
try:
import yara
YARA_AVAILABLE = True
except ImportError:
print("[WARN] YARA not available; signature scanning will be skipped.")
YARA_AVAILABLE = False
# ---------------------------------------------------------------------------
# Constants: Directories for pulled APKs and signature rules
# ---------------------------------------------------------------------------
APK_PULL_DIR = os.path.join(os.getcwd(), "pulled_apks")
SIGNATURES_DIR = os.path.join(os.getcwd(), "signatures")
LOG_DIR = os.path.join(os.getcwd(), "logs")
LOG_FILE = os.path.join(LOG_DIR, "action_log.txt")
# ---------------------------------------------------------------------------
# Dynamic Discovery via ADB
# ---------------------------------------------------------------------------
def get_installed_apks():
"""
Discover installed APKs via ADB and pull them locally for analysis.
Returns a list of filepaths to the pulled APKs.
"""
os.makedirs(APK_PULL_DIR, exist_ok=True)
try:
proc = subprocess.run(
["adb", "shell", "pm", "list", "packages", "-f"],
capture_output=True, text=True, check=True
)
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to list packages: {e}")
return []
apk_paths = []
for line in proc.stdout.splitlines():
if not line.startswith("package:"):
continue
# format: package:/data/app/.../base.apk=com.example.app
try:
left, pkg = line[len("package:"):].split("=", 1)
remote_path = left.strip()
package_name = pkg.strip()
local_name = package_name.replace(".", "_") + ".apk"
local_path = os.path.join(APK_PULL_DIR, local_name)
print(f"[INFO] Pulling {package_name} from {remote_path}...")
subprocess.run(["adb", "pull", remote_path, local_path], check=True)
apk_paths.append(local_path)
except Exception as e:
print(f"[WARN] Could not pull '{line}': {e}")
return apk_paths
# ---------------------------------------------------------------------------
# Signature Scanning (YARA/CSV)
# ---------------------------------------------------------------------------
def signature_scan(apk_paths):
"""
Scan each APK with optional YARA rules.
Returns list of dicts:
{
app_name: str,
package: str or None,
sha256: str or None,
apk_path: str,
yara_matches: List[str]
}
"""
# compile YARA rules if available
rules = None
if YARA_AVAILABLE and os.path.isdir(SIGNATURES_DIR):
files = [
os.path.join(SIGNATURES_DIR, f)
for f in os.listdir(SIGNATURES_DIR)
if f.endswith((".yar", ".yara"))
]
if files:
try:
rules = yara.compile(filepaths={os.path.basename(f): f for f in files})
except Exception as e:
print(f"[ERROR] Failed to compile YARA rules: {e}")
results = []
for path in apk_paths:
app_name = os.path.basename(path)
# compute SHA256
try:
with open(path, "rb") as f:
sha256 = hashlib.sha256(f.read()).hexdigest()
except Exception as e:
print(f"[WARN] Could not hash {path}: {e}")
sha256 = None
# extract package via Androguard if available
package = None
if ANDROGUARD_AVAILABLE:
try:
a = APK(path)
package = a.get_package()
except Exception:
pass
# run YARA
yara_matches = []
if rules:
try:
matches = rules.match(path)
yara_matches = [m.rule for m in matches] if matches else []
except Exception as e:
print(f"[WARN] YARA scan error for {path}: {e}")
results.append({
"app_name": app_name,
"package": package,
"sha256": sha256,
"apk_path": path,
"yara_matches": yara_matches,
})
return results
# ---------------------------------------------------------------------------
# Logging Setup
# ---------------------------------------------------------------------------
def ensure_log_dir():
os.makedirs(LOG_DIR, exist_ok=True)
def log_action(action, package, status, message=""):
"""
Append a timestamped action entry to the log file.
"""
ensure_log_dir()
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{ts}] {action.upper()} | {package} | {status} | {message}\n")
# ---------------------------------------------------------------------------
# abuse.ch Integration
# ---------------------------------------------------------------------------
def query_abuse_ch(sha256: str) -> bool:
"""
Return True if abuse.ch confirms this hash, False otherwise.
"""
try:
resp = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": sha256},
timeout=10
)
return resp.ok and bool(resp.json().get("data"))
except Exception as e:
print(f"[ERROR] abuse.ch query failed: {e}")
return False
# ---------------------------------------------------------------------------
# Heuristics / Behavioral Detection
# ---------------------------------------------------------------------------
SUSPICIOUS_PERMS = {
"android.permission.READ_SMS",
"android.permission.READ_CONTACTS",
"android.permission.RECORD_AUDIO",
"android.permission.CAMERA",
"android.permission.ACCESS_FINE_LOCATION",
}
SUSPICIOUS_COMPONENT_KEYWORDS = ("tracker", "spy", "monitor", "surveil")
def heuristics_scan(apk_paths):
"""
Perform heuristic analysis on each APK.
Returns list of dicts: { apk_path, reason }.
Skipped entirely if Androguard is unavailable.
"""
if not ANDROGUARD_AVAILABLE:
return []
hits = []
for path in apk_paths:
try:
a = APK(path)
perms = set(a.get_permissions())
bad = perms & SUSPICIOUS_PERMS
if bad:
hits.append({
"apk_path": path,
"reason": f"Suspicious perms: {', '.join(bad)}"
})
continue
for comp in a.get_services() + a.get_receivers():
if any(k in comp.lower() for k in SUSPICIOUS_COMPONENT_KEYWORDS):
hits.append({
"apk_path": path,
"reason": f"Suspicious component: {comp}"
})
raise StopIteration
dex = DalvikVMFormat(a.get_dex())
for method in dex.get_methods():
src = method.get_source()
if "forName(" in src or "loadLibrary(" in src:
hits.append({
"apk_path": path,
"reason": "Uses reflection/dynamic load"
})
raise StopIteration
except StopIteration:
continue
except Exception as e:
print(f"[WARN] Heuristic error on {path}: {e}")
return hits
# ---------------------------------------------------------------------------
# Freeze / Uninstall with Retry + Logging
# ---------------------------------------------------------------------------
def prompt_reboot_and_retry(pkg, action):
print(f"\n[NOTE] {action.title()} failed for {pkg}.")
if input("Reboot device and retry? [y/N]: ").strip().lower() != "y":
return
try:
subprocess.run(["adb", "reboot"], check=True)
print("[INFO] Rebooting... press Enter once device is back online.")
input()
cmd = (
["adb","shell","pm","disable-user","--user","0",pkg]
if action == "freeze"
else ["adb","uninstall",pkg]
)
subprocess.run(cmd, check=True)
print(f"[SUCCESS] Retry {action} succeeded for {pkg}.")
log_action(action, pkg, "success_after_reboot")
except subprocess.CalledProcessError:
print(f"[FAIL] Still unable to {action} {pkg}.")
log_action(action, pkg, "final_fail_after_reboot")
def freeze_single_apk(info):
pkg = info.get("package")
if not pkg:
print("[ERROR] No package name.")
return
print(f"[INFO] Freezing {pkg}...")
for attempt in range(3):
try:
subprocess.run(
["adb","shell","pm","disable-user","--user","0",pkg],
check=True
)
print(f"[SUCCESS] {pkg} frozen.")
log_action("freeze", pkg, "success")
return
except subprocess.CalledProcessError:
print(f"[WARN] Attempt {attempt+1} failed.")
time.sleep(1)
print(f"[FAIL] Could not freeze {pkg}.")
log_action("freeze", pkg, "fail", "3 attempts")
prompt_reboot_and_retry(pkg, "freeze")
def uninstall_single_apk(info):
pkg = info.get("package")
if not pkg:
print("[ERROR] No package name.")
return
print(f"[INFO] Uninstalling {pkg}...")
for attempt in range(3):
try:
subprocess.run(["adb","uninstall",pkg], check=True)
print(f"[SUCCESS] {pkg} uninstalled.")
log_action("uninstall", pkg, "success")
return
except subprocess.CalledProcessError:
print(f"[WARN] Attempt {attempt+1} failed.")
time.sleep(1)
print(f"[FAIL] Could not uninstall {pkg}.")
log_action("uninstall", pkg, "fail", "3 attempts")
prompt_reboot_and_retry(pkg, "uninstall")
def freeze_apks(results):
for app in results:
freeze_single_apk(app)
def uninstall_apks(results):
for app in results:
uninstall_single_apk(app)
# ---------------------------------------------------------------------------
# Results Display
# ---------------------------------------------------------------------------
def show_result_detail(r):
print("\n--- App Details ---")
for k, v in r.items():
print(f"{k}: {v}")
sha = r.get("sha256")
if sha:
print("\nQuerying abuse.ch…")
print("[ALERT] Confirmed" if query_abuse_ch(sha)
else "[WARNING] Not found")
while True:
cmd = input("[a] Freeze | [u] Uninstall | [b] Back: ").strip().lower()
if cmd == "a":
freeze_single_apk(r)
elif cmd == "u":
uninstall_single_apk(r)
elif cmd == "b":
break
def show_results_info(results):
per_page, page = 10, 0
total = len(results)
while True:
start, end = page * per_page, (page + 1) * per_page
print(f"\n--- Page {page+1}/{(total-1)//per_page+1} ---")
print("{:<3}{:<30}{:<20}{:<8}".format("#", "App", "Package", "Heur"))
for idx, item in enumerate(results[start:end], start+1):
h = "Y" if item.get("heuristic") else ""
print(
f"{idx:<3}"
f"{item.get('app_name','-')[:28]:<30}"
f"{item.get('package','-'):<20}"
f"{h:<8}"
)
cmd = input("[n]Next [p]Prev [#]Detail [b]Back: ").strip().lower()
if cmd == "n" and end < total:
page += 1
elif cmd == "p" and page > 0:
page -= 1
elif cmd.isdigit() and 1 <= int(cmd) <= total:
show_result_detail(results[int(cmd) - 1])
elif cmd == "b":
break
# ---------------------------------------------------------------------------
# End-of-Scan Menu
# ---------------------------------------------------------------------------
def show_post_scan_menu(static_results, apk_paths):
"""
Merge static (YARA/CSV) hits with heuristics and launch the menu.
static_results: list of dicts from signature_scan()
apk_paths: list of pulled APK file paths
"""
heur_hits = heuristics_scan(apk_paths)
combined = []
for r in static_results:
r["heuristic"] = False
combined.append(r)
for h in heur_hits:
combined.append({
"app_name": os.path.basename(h["apk_path"]),
"package": None,
"sha256": None,
"heuristic": True,
"reason": h["reason"],
"apk_path": h["apk_path"],
})
while True:
print("\n--- End of Scan Menu ---")
print("1) Show Results and Information")
print("2) Freeze Possible Infected APK")
print("3) Uninstall Possible Infected APKs")
print("4) Exit")
choice = input("Select: ").strip()
if choice == "1":
show_results_info(combined)
elif choice == "2":
freeze_apks(combined)
elif choice == "3":
uninstall_apks(combined)
elif choice == "4":
break
else:
print("Invalid selection.")
# ---------------------------------------------------------------------------
# Entry Point
# ---------------------------------------------------------------------------
def main():
"""
Invoked by spyhunter.py as modules.scan.main()
"""
apk_paths = get_installed_apks()
static_results = signature_scan(apk_paths)
show_post_scan_menu(static_results, apk_paths)