Major RCS/SMS exploitation rewrite (v2.0): - bugle_db direct extraction (plaintext messages, no decryption needed) - CVE-2024-0044 run-as privilege escalation (Android 12-13) - AOSP RCS provider queries (content://rcs/) - Archon app relay for Shizuku-elevated bugle_db access - 7-tab web UI: Extract, Database, Forge, Modify, Exploit, Backup, Monitor - SQL query interface for extracted databases - Full backup/restore/clone with SMS Backup & Restore XML support - Known CVE database (CVE-2023-24033, CVE-2024-49415, CVE-2025-48593) - IMS/RCS diagnostics, Phenotype verbose logging, Pixel tools New modules: Starlink hack, SMS forge, SDR drone detection Archon Android app: RCS messaging module with Shizuku integration Updated manuals to v2.3, 60 web blueprints confirmed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1483 lines
60 KiB
Python
1483 lines
60 KiB
Python
"""AUTARCH Container Security
|
|
|
|
Docker auditing, Kubernetes assessment, container image scanning,
|
|
escape detection, Dockerfile linting, and runtime monitoring.
|
|
"""
|
|
|
|
DESCRIPTION = "Container security — Docker & Kubernetes auditing"
|
|
AUTHOR = "darkHal"
|
|
VERSION = "1.0"
|
|
CATEGORY = "defense"
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
import platform
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
try:
|
|
from core.paths import get_data_dir, find_tool
|
|
except ImportError:
|
|
def get_data_dir():
|
|
return str(Path(__file__).parent.parent / 'data')
|
|
|
|
import shutil
|
|
|
|
def find_tool(name):
|
|
return shutil.which(name)
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
from core.banner import Colors, clear_screen, display_banner
|
|
except ImportError:
|
|
class Colors:
|
|
RED = YELLOW = GREEN = CYAN = WHITE = DIM = RESET = BOLD = ''
|
|
|
|
def clear_screen():
|
|
pass
|
|
|
|
def display_banner():
|
|
pass
|
|
|
|
|
|
# ── Dangerous Docker capabilities ───────────────────────────────────────────
|
|
|
|
DANGEROUS_CAPS = [
|
|
'SYS_ADMIN', 'NET_ADMIN', 'SYS_PTRACE', 'SYS_RAWIO',
|
|
'DAC_OVERRIDE', 'FOWNER', 'NET_RAW', 'MKNOD', 'SYS_CHROOT',
|
|
'AUDIT_WRITE', 'SETFCAP', 'MAC_OVERRIDE', 'MAC_ADMIN',
|
|
'SYSLOG', 'DAC_READ_SEARCH', 'LINUX_IMMUTABLE', 'SYS_BOOT',
|
|
'SYS_MODULE', 'SYS_TIME', 'KILL',
|
|
]
|
|
|
|
SENSITIVE_MOUNTS = [
|
|
'/var/run/docker.sock', '/run/docker.sock',
|
|
'/proc', '/sys', '/dev', '/etc/shadow', '/etc/passwd',
|
|
'/root', '/home', '/var/log',
|
|
]
|
|
|
|
DEFAULT_SECCOMP_PROFILE = 'runtime/default'
|
|
|
|
# ── Dockerfile Lint Rules ───────────────────────────────────────────────────
|
|
|
|
DOCKERFILE_RULES = {
|
|
'DL001': {'severity': 'high', 'title': 'FROM uses :latest tag',
|
|
'desc': 'Pin image versions for reproducible builds.'},
|
|
'DL002': {'severity': 'high', 'title': 'No USER directive',
|
|
'desc': 'Container runs as root by default. Add a USER directive.'},
|
|
'DL003': {'severity': 'medium', 'title': 'ADD used instead of COPY',
|
|
'desc': 'Use COPY for local files. ADD auto-extracts and supports URLs.'},
|
|
'DL004': {'severity': 'high', 'title': 'Secrets in ENV/ARG',
|
|
'desc': 'Avoid passing secrets via ENV or ARG. Use build secrets.'},
|
|
'DL005': {'severity': 'low', 'title': 'Missing HEALTHCHECK',
|
|
'desc': 'Add HEALTHCHECK for container orchestration readiness.'},
|
|
'DL006': {'severity': 'medium', 'title': 'apt-get without --no-install-recommends',
|
|
'desc': 'Use --no-install-recommends to reduce image size.'},
|
|
'DL007': {'severity': 'low', 'title': 'Missing cache cleanup',
|
|
'desc': 'Run apt-get clean / rm -rf /var/lib/apt/lists/* after install.'},
|
|
'DL008': {'severity': 'medium', 'title': 'EXPOSE all interfaces',
|
|
'desc': 'Avoid EXPOSE with 0.0.0.0; bind to specific interfaces.'},
|
|
'DL009': {'severity': 'high', 'title': 'COPY / ADD of sensitive files',
|
|
'desc': 'Avoid copying .env, credentials, or private keys into image.'},
|
|
'DL010': {'severity': 'medium', 'title': 'Using sudo in RUN',
|
|
'desc': 'Avoid sudo in Dockerfiles. Use USER directive instead.'},
|
|
'DL011': {'severity': 'low', 'title': 'Multiple consecutive RUN commands',
|
|
'desc': 'Chain RUN commands with && to reduce layers.'},
|
|
}
|
|
|
|
SECRET_PATTERNS = re.compile(
|
|
r'(password|secret|token|api_key|apikey|access_key|private_key|'
|
|
r'aws_secret|db_pass|database_url|auth_token)',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
SENSITIVE_FILE_PATTERNS = re.compile(
|
|
r'\.(pem|key|p12|pfx|env|credentials|htpasswd|pgpass)$',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
|
|
# ── ContainerSecurity Class ─────────────────────────────────────────────────
|
|
|
|
class ContainerSecurity:
|
|
"""Docker and Kubernetes security auditing engine."""
|
|
|
|
_instance = None
|
|
|
|
def __init__(self):
|
|
data = Path(str(get_data_dir())) / 'container_sec'
|
|
data.mkdir(parents=True, exist_ok=True)
|
|
self._data_dir = data
|
|
self._results_path = data / 'results.json'
|
|
self._results = {
|
|
'docker_host': [],
|
|
'container_audits': {},
|
|
'image_scans': {},
|
|
'dockerfile_lints': [],
|
|
'k8s_audits': {},
|
|
'escape_checks': {},
|
|
'timestamp': None,
|
|
}
|
|
self._is_win = platform.system() == 'Windows'
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def _run(self, cmd: str, timeout: int = 30) -> tuple:
|
|
"""Run a shell command. Returns (success: bool, stdout: str)."""
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, shell=True, capture_output=True, text=True, timeout=timeout
|
|
)
|
|
return result.returncode == 0, result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return False, 'Command timed out'
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def _run_json(self, cmd: str, timeout: int = 30) -> tuple:
|
|
"""Run command expecting JSON output. Returns (success, parsed_data)."""
|
|
ok, raw = self._run(cmd, timeout=timeout)
|
|
if not ok:
|
|
return False, raw
|
|
try:
|
|
return True, json.loads(raw)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return False, raw
|
|
|
|
def _save_results(self):
|
|
self._results['timestamp'] = datetime.utcnow().isoformat()
|
|
try:
|
|
with open(self._results_path, 'w') as f:
|
|
json.dump(self._results, f, indent=2, default=str)
|
|
except Exception:
|
|
pass
|
|
|
|
# ── tool checks ──────────────────────────────────────────────────────────
|
|
|
|
def check_docker_installed(self) -> dict:
|
|
"""Check if Docker CLI is available."""
|
|
docker = find_tool('docker')
|
|
if not docker:
|
|
return {'installed': False, 'path': None, 'version': None}
|
|
ok, ver = self._run(f'"{docker}" --version')
|
|
return {
|
|
'installed': True,
|
|
'path': docker,
|
|
'version': ver if ok else 'unknown',
|
|
}
|
|
|
|
def check_kubectl_installed(self) -> dict:
|
|
"""Check if kubectl CLI is available."""
|
|
kubectl = find_tool('kubectl')
|
|
if not kubectl:
|
|
return {'installed': False, 'path': None, 'version': None, 'context': None}
|
|
ok, ver = self._run(f'"{kubectl}" version --client --short 2>/dev/null || "{kubectl}" version --client')
|
|
ctx_ok, ctx = self._run(f'"{kubectl}" config current-context 2>/dev/null')
|
|
return {
|
|
'installed': True,
|
|
'path': kubectl,
|
|
'version': ver if ok else 'unknown',
|
|
'context': ctx if ctx_ok else None,
|
|
}
|
|
|
|
# ── Docker Host Audit ────────────────────────────────────────────────────
|
|
|
|
def audit_docker_host(self) -> list:
|
|
"""Comprehensive Docker host security audit."""
|
|
findings = []
|
|
docker = find_tool('docker')
|
|
if not docker:
|
|
return [{'check': 'Docker CLI', 'severity': 'critical',
|
|
'status': 'fail', 'detail': 'Docker not found on system'}]
|
|
|
|
# 1. Daemon configuration
|
|
daemon_cfg_path = '/etc/docker/daemon.json'
|
|
if self._is_win:
|
|
daemon_cfg_path = os.path.expandvars(r'%ProgramData%\docker\config\daemon.json')
|
|
|
|
daemon_cfg = {}
|
|
if os.path.isfile(daemon_cfg_path):
|
|
try:
|
|
with open(daemon_cfg_path) as f:
|
|
daemon_cfg = json.load(f)
|
|
findings.append({
|
|
'check': 'Daemon Config',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': f'Found {daemon_cfg_path}',
|
|
})
|
|
except Exception as e:
|
|
findings.append({
|
|
'check': 'Daemon Config',
|
|
'severity': 'medium',
|
|
'status': 'warn',
|
|
'detail': f'Cannot parse {daemon_cfg_path}: {e}',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'Daemon Config',
|
|
'severity': 'medium',
|
|
'status': 'warn',
|
|
'detail': f'No daemon.json found at {daemon_cfg_path}',
|
|
})
|
|
|
|
# 2. Docker socket permissions (Linux only)
|
|
if not self._is_win:
|
|
sock = '/var/run/docker.sock'
|
|
if os.path.exists(sock):
|
|
try:
|
|
stat = os.stat(sock)
|
|
mode = oct(stat.st_mode)[-3:]
|
|
world_rw = mode[2] in ('6', '7', '2', '3')
|
|
if world_rw:
|
|
findings.append({
|
|
'check': 'Docker Socket Permissions',
|
|
'severity': 'high',
|
|
'status': 'fail',
|
|
'detail': f'{sock} is world-accessible (mode {mode}). Restrict to docker group.',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'Docker Socket Permissions',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': f'{sock} permissions: {mode}',
|
|
})
|
|
except Exception:
|
|
findings.append({
|
|
'check': 'Docker Socket Permissions',
|
|
'severity': 'low',
|
|
'status': 'warn',
|
|
'detail': 'Cannot stat docker socket',
|
|
})
|
|
|
|
# 3. TLS configuration
|
|
tls_verify = daemon_cfg.get('tls', False) or daemon_cfg.get('tlsverify', False)
|
|
if tls_verify:
|
|
findings.append({
|
|
'check': 'TLS Configuration',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': 'Docker daemon TLS is enabled',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'TLS Configuration',
|
|
'severity': 'medium',
|
|
'status': 'warn',
|
|
'detail': 'Docker daemon TLS is not configured in daemon.json',
|
|
})
|
|
|
|
# 4. User namespace remapping
|
|
userns = daemon_cfg.get('userns-remap', '')
|
|
if userns:
|
|
findings.append({
|
|
'check': 'User Namespace Remapping',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': f'Remapped to: {userns}',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'User Namespace Remapping',
|
|
'severity': 'medium',
|
|
'status': 'warn',
|
|
'detail': 'Not enabled. Containers run as host UID 0.',
|
|
})
|
|
|
|
# 5. Content trust
|
|
content_trust = os.environ.get('DOCKER_CONTENT_TRUST', '0')
|
|
if content_trust == '1':
|
|
findings.append({
|
|
'check': 'Content Trust (DCT)',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': 'DOCKER_CONTENT_TRUST=1 — signed images enforced',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'Content Trust (DCT)',
|
|
'severity': 'low',
|
|
'status': 'warn',
|
|
'detail': 'DOCKER_CONTENT_TRUST not set. Unsigned images accepted.',
|
|
})
|
|
|
|
# 6. Live restore
|
|
live_restore = daemon_cfg.get('live-restore', False)
|
|
if live_restore:
|
|
findings.append({
|
|
'check': 'Live Restore',
|
|
'severity': 'info',
|
|
'status': 'pass',
|
|
'detail': 'Containers survive daemon restarts',
|
|
})
|
|
else:
|
|
findings.append({
|
|
'check': 'Live Restore',
|
|
'severity': 'low',
|
|
'status': 'warn',
|
|
'detail': 'live-restore not enabled in daemon.json',
|
|
})
|
|
|
|
# 7. Logging driver
|
|
log_driver = daemon_cfg.get('log-driver', 'json-file')
|
|
log_opts = daemon_cfg.get('log-opts', {})
|
|
max_size = log_opts.get('max-size', 'unlimited')
|
|
findings.append({
|
|
'check': 'Logging Driver',
|
|
'severity': 'low' if log_driver == 'json-file' and max_size == 'unlimited' else 'info',
|
|
'status': 'warn' if max_size == 'unlimited' else 'pass',
|
|
'detail': f'Driver: {log_driver}, max-size: {max_size}',
|
|
})
|
|
|
|
# 8. Docker info — check swarm, runtimes
|
|
ok, info_raw = self._run(f'"{docker}" info --format "{{{{json .}}}}"')
|
|
if ok:
|
|
try:
|
|
info = json.loads(info_raw)
|
|
# Check default runtime
|
|
rt = info.get('DefaultRuntime', 'runc')
|
|
findings.append({
|
|
'check': 'Default Runtime',
|
|
'severity': 'info',
|
|
'status': 'pass' if rt in ('runc', 'crun') else 'info',
|
|
'detail': f'Runtime: {rt}',
|
|
})
|
|
# Swarm mode
|
|
swarm = info.get('Swarm', {})
|
|
swarm_active = swarm.get('LocalNodeState', 'inactive') == 'active'
|
|
if swarm_active:
|
|
findings.append({
|
|
'check': 'Swarm Mode',
|
|
'severity': 'info',
|
|
'status': 'info',
|
|
'detail': 'Swarm is active. Ensure manager auto-lock is enabled.',
|
|
})
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
self._results['docker_host'] = findings
|
|
self._save_results()
|
|
return findings
|
|
|
|
# ── Container Listing / Inspection ───────────────────────────────────────
|
|
|
|
def list_containers(self, all: bool = True) -> list:
|
|
"""List Docker containers."""
|
|
docker = find_tool('docker')
|
|
if not docker:
|
|
return []
|
|
|
|
flag = '-a' if all else ''
|
|
fmt = '{{json .}}'
|
|
ok, raw = self._run(f'"{docker}" ps {flag} --format "{fmt}"')
|
|
if not ok:
|
|
return []
|
|
|
|
containers = []
|
|
for line in raw.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
c = json.loads(line)
|
|
containers.append({
|
|
'id': c.get('ID', ''),
|
|
'name': c.get('Names', ''),
|
|
'image': c.get('Image', ''),
|
|
'status': c.get('Status', ''),
|
|
'ports': c.get('Ports', ''),
|
|
'created': c.get('CreatedAt', ''),
|
|
'state': c.get('State', ''),
|
|
})
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
return containers
|
|
|
|
def inspect_container(self, container_id: str) -> dict:
|
|
"""Inspect a container and extract security-relevant config."""
|
|
docker = find_tool('docker')
|
|
if not docker:
|
|
return {'error': 'Docker not found'}
|
|
|
|
ok, data = self._run_json(f'"{docker}" inspect {container_id}')
|
|
if not ok or not isinstance(data, list) or len(data) == 0:
|
|
return {'error': f'Cannot inspect container {container_id}'}
|
|
|
|
info = data[0]
|
|
host_cfg = info.get('HostConfig', {})
|
|
cfg = info.get('Config', {})
|
|
|
|
# Capabilities
|
|
cap_add = host_cfg.get('CapAdd') or []
|
|
cap_drop = host_cfg.get('CapDrop') or []
|
|
|
|
# Mounts
|
|
mounts = []
|
|
for m in info.get('Mounts', []):
|
|
mounts.append({
|
|
'source': m.get('Source', ''),
|
|
'destination': m.get('Destination', ''),
|
|
'mode': m.get('Mode', ''),
|
|
'rw': m.get('RW', True),
|
|
'type': m.get('Type', ''),
|
|
})
|
|
|
|
# Security options
|
|
sec_opts = host_cfg.get('SecurityOpt') or []
|
|
|
|
return {
|
|
'id': info.get('Id', '')[:12],
|
|
'name': info.get('Name', '').lstrip('/'),
|
|
'image': cfg.get('Image', ''),
|
|
'privileged': host_cfg.get('Privileged', False),
|
|
'cap_add': cap_add,
|
|
'cap_drop': cap_drop,
|
|
'mounts': mounts,
|
|
'network_mode': host_cfg.get('NetworkMode', ''),
|
|
'user': cfg.get('User', '') or 'root',
|
|
'pid_mode': host_cfg.get('PidMode', ''),
|
|
'ipc_mode': host_cfg.get('IpcMode', ''),
|
|
'read_only_rootfs': host_cfg.get('ReadonlyRootfs', False),
|
|
'security_opt': sec_opts,
|
|
'memory_limit': host_cfg.get('Memory', 0),
|
|
'cpu_shares': host_cfg.get('CpuShares', 0),
|
|
'pids_limit': host_cfg.get('PidsLimit', 0),
|
|
'restart_policy': host_cfg.get('RestartPolicy', {}).get('Name', ''),
|
|
'env': cfg.get('Env', []),
|
|
}
|
|
|
|
# ── Container Security Audit ─────────────────────────────────────────────
|
|
|
|
def audit_container(self, container_id: str) -> dict:
|
|
"""Full security audit of a running container."""
|
|
info = self.inspect_container(container_id)
|
|
if 'error' in info:
|
|
return info
|
|
|
|
findings = []
|
|
passed = 0
|
|
total = 0
|
|
|
|
def check(name, ok, detail='', severity='medium'):
|
|
nonlocal passed, total
|
|
total += 1
|
|
if ok:
|
|
passed += 1
|
|
findings.append({
|
|
'check': name,
|
|
'status': 'pass' if ok else 'fail',
|
|
'severity': severity if not ok else 'info',
|
|
'detail': detail,
|
|
})
|
|
|
|
# 1. Privileged mode
|
|
check('Privileged Mode',
|
|
not info['privileged'],
|
|
'Container is running in privileged mode!' if info['privileged']
|
|
else 'Not privileged',
|
|
severity='critical')
|
|
|
|
# 2. Dangerous capabilities
|
|
dangerous_found = [c for c in info['cap_add'] if c in DANGEROUS_CAPS]
|
|
check('Capabilities',
|
|
len(dangerous_found) == 0,
|
|
f'Dangerous capabilities added: {", ".join(dangerous_found)}' if dangerous_found
|
|
else f'No dangerous capabilities ({len(info["cap_drop"])} dropped)',
|
|
severity='high')
|
|
|
|
# 3. Sensitive mounts
|
|
sensitive_found = []
|
|
for m in info['mounts']:
|
|
for s in SENSITIVE_MOUNTS:
|
|
if m['destination'].startswith(s) or m['source'].startswith(s):
|
|
sensitive_found.append(f'{m["source"]} -> {m["destination"]}')
|
|
break
|
|
check('Sensitive Mounts',
|
|
len(sensitive_found) == 0,
|
|
f'Sensitive paths mounted: {"; ".join(sensitive_found)}' if sensitive_found
|
|
else 'No sensitive host paths mounted',
|
|
severity='high')
|
|
|
|
# 4. Running as root
|
|
check('User',
|
|
info['user'] not in ('', 'root', '0'),
|
|
f'Running as: {info["user"]}' if info['user'] not in ('', 'root', '0')
|
|
else 'Running as root. Use USER directive.',
|
|
severity='medium')
|
|
|
|
# 5. Read-only root filesystem
|
|
check('Read-only Rootfs',
|
|
info['read_only_rootfs'],
|
|
'Root filesystem is read-only' if info['read_only_rootfs']
|
|
else 'Root filesystem is writable. Consider --read-only.',
|
|
severity='low')
|
|
|
|
# 6. Resource limits — memory
|
|
check('Memory Limit',
|
|
info['memory_limit'] > 0,
|
|
f'Memory limit: {info["memory_limit"] // (1024*1024)}MB' if info['memory_limit'] > 0
|
|
else 'No memory limit set. Container can exhaust host memory.',
|
|
severity='medium')
|
|
|
|
# 7. Resource limits — PID
|
|
pids = info['pids_limit']
|
|
has_pids = pids is not None and pids > 0 and pids != -1
|
|
check('PID Limit',
|
|
has_pids,
|
|
f'PID limit: {pids}' if has_pids
|
|
else 'No PID limit. Fork bomb possible.',
|
|
severity='low')
|
|
|
|
# 8. Seccomp profile
|
|
seccomp_set = any('seccomp' in opt for opt in info['security_opt'])
|
|
no_seccomp = any('seccomp=unconfined' in opt for opt in info['security_opt'])
|
|
check('Seccomp Profile',
|
|
seccomp_set and not no_seccomp,
|
|
'Seccomp profile disabled (unconfined)!' if no_seccomp
|
|
else ('Custom seccomp profile applied' if seccomp_set
|
|
else 'Default seccomp profile (OK for Docker default)'),
|
|
severity='high' if no_seccomp else 'low')
|
|
|
|
# 9. AppArmor profile
|
|
apparmor_set = any('apparmor' in opt for opt in info['security_opt'])
|
|
no_apparmor = any('apparmor=unconfined' in opt for opt in info['security_opt'])
|
|
check('AppArmor Profile',
|
|
not no_apparmor,
|
|
'AppArmor disabled (unconfined)!' if no_apparmor
|
|
else ('AppArmor profile applied' if apparmor_set
|
|
else 'No explicit AppArmor profile (using Docker default)'),
|
|
severity='medium' if no_apparmor else 'low')
|
|
|
|
# 10. Network mode
|
|
check('Network Mode',
|
|
info['network_mode'] not in ('host',),
|
|
f'Network mode: {info["network_mode"]}',
|
|
severity='high' if info['network_mode'] == 'host' else 'info')
|
|
|
|
# 11. PID mode
|
|
check('PID Mode',
|
|
info['pid_mode'] != 'host',
|
|
'PID namespace shared with host!' if info['pid_mode'] == 'host'
|
|
else f'PID mode: {info["pid_mode"] or "container (isolated)"}',
|
|
severity='high')
|
|
|
|
# 12. Secrets in environment
|
|
env_secrets = []
|
|
for e in info.get('env', []):
|
|
key = e.split('=', 1)[0] if '=' in e else e
|
|
if SECRET_PATTERNS.search(key):
|
|
env_secrets.append(key)
|
|
check('Environment Secrets',
|
|
len(env_secrets) == 0,
|
|
f'Possible secrets in ENV: {", ".join(env_secrets)}' if env_secrets
|
|
else 'No obvious secrets in environment variables',
|
|
severity='medium')
|
|
|
|
score = int((passed / total) * 100) if total > 0 else 0
|
|
|
|
result = {
|
|
'container_id': container_id,
|
|
'name': info.get('name', ''),
|
|
'image': info.get('image', ''),
|
|
'score': score,
|
|
'passed': passed,
|
|
'total': total,
|
|
'findings': findings,
|
|
}
|
|
|
|
self._results['container_audits'][container_id] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
# ── Image Operations ─────────────────────────────────────────────────────
|
|
|
|
def list_images(self) -> list:
|
|
"""List local Docker images."""
|
|
docker = find_tool('docker')
|
|
if not docker:
|
|
return []
|
|
|
|
fmt = '{{json .}}'
|
|
ok, raw = self._run(f'"{docker}" images --format "{fmt}"')
|
|
if not ok:
|
|
return []
|
|
|
|
images = []
|
|
for line in raw.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
img = json.loads(line)
|
|
images.append({
|
|
'id': img.get('ID', ''),
|
|
'repo': img.get('Repository', ''),
|
|
'tag': img.get('Tag', ''),
|
|
'size': img.get('Size', ''),
|
|
'created': img.get('CreatedAt', img.get('CreatedSince', '')),
|
|
})
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
return images
|
|
|
|
def scan_image(self, image_name: str) -> dict:
|
|
"""Scan a container image for CVEs using trivy or grype."""
|
|
# Try trivy first
|
|
trivy = find_tool('trivy')
|
|
if trivy:
|
|
ok, raw = self._run(
|
|
f'"{trivy}" image --format json --severity CRITICAL,HIGH,MEDIUM,LOW '
|
|
f'--quiet "{image_name}"',
|
|
timeout=120
|
|
)
|
|
if ok:
|
|
return self._parse_trivy(raw, image_name)
|
|
|
|
# Fallback to grype
|
|
grype = find_tool('grype')
|
|
if grype:
|
|
ok, raw = self._run(
|
|
f'"{grype}" "{image_name}" -o json --quiet',
|
|
timeout=120
|
|
)
|
|
if ok:
|
|
return self._parse_grype(raw, image_name)
|
|
|
|
return {
|
|
'image': image_name,
|
|
'scanner': None,
|
|
'error': 'No scanner available. Install trivy or grype.',
|
|
'vulnerabilities': [],
|
|
'summary': {},
|
|
}
|
|
|
|
def _parse_trivy(self, raw: str, image_name: str) -> dict:
|
|
"""Parse Trivy JSON output."""
|
|
vulns = []
|
|
summary = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
|
|
try:
|
|
data = json.loads(raw)
|
|
results = data.get('Results', [])
|
|
for r in results:
|
|
for v in r.get('Vulnerabilities', []):
|
|
sev = v.get('Severity', 'UNKNOWN').upper()
|
|
entry = {
|
|
'cve': v.get('VulnerabilityID', ''),
|
|
'severity': sev,
|
|
'package': v.get('PkgName', ''),
|
|
'installed_version': v.get('InstalledVersion', ''),
|
|
'fixed_version': v.get('FixedVersion', ''),
|
|
'title': v.get('Title', ''),
|
|
}
|
|
vulns.append(entry)
|
|
if sev in summary:
|
|
summary[sev] += 1
|
|
except (json.JSONDecodeError, ValueError):
|
|
return {'image': image_name, 'scanner': 'trivy',
|
|
'error': 'Failed to parse trivy output', 'vulnerabilities': [], 'summary': {}}
|
|
|
|
result = {
|
|
'image': image_name,
|
|
'scanner': 'trivy',
|
|
'vulnerabilities': vulns,
|
|
'summary': summary,
|
|
'total': len(vulns),
|
|
}
|
|
self._results['image_scans'][image_name] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
def _parse_grype(self, raw: str, image_name: str) -> dict:
|
|
"""Parse Grype JSON output."""
|
|
vulns = []
|
|
summary = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
|
|
try:
|
|
data = json.loads(raw)
|
|
for m in data.get('matches', []):
|
|
v = m.get('vulnerability', {})
|
|
sev = v.get('severity', 'Unknown').upper()
|
|
pkg = m.get('artifact', {})
|
|
fixed = ''
|
|
fix_vers = v.get('fix', {}).get('versions', [])
|
|
if fix_vers:
|
|
fixed = fix_vers[0]
|
|
entry = {
|
|
'cve': v.get('id', ''),
|
|
'severity': sev,
|
|
'package': pkg.get('name', ''),
|
|
'installed_version': pkg.get('version', ''),
|
|
'fixed_version': fixed,
|
|
'title': v.get('description', '')[:120],
|
|
}
|
|
vulns.append(entry)
|
|
if sev in summary:
|
|
summary[sev] += 1
|
|
except (json.JSONDecodeError, ValueError):
|
|
return {'image': image_name, 'scanner': 'grype',
|
|
'error': 'Failed to parse grype output', 'vulnerabilities': [], 'summary': {}}
|
|
|
|
result = {
|
|
'image': image_name,
|
|
'scanner': 'grype',
|
|
'vulnerabilities': vulns,
|
|
'summary': summary,
|
|
'total': len(vulns),
|
|
}
|
|
self._results['image_scans'][image_name] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
# ── Dockerfile Linting ───────────────────────────────────────────────────
|
|
|
|
def lint_dockerfile(self, content: str) -> list:
|
|
"""Lint a Dockerfile for security issues."""
|
|
findings = []
|
|
lines = content.splitlines()
|
|
has_user = False
|
|
has_healthcheck = False
|
|
consecutive_run = 0
|
|
max_consecutive_run = 0
|
|
|
|
for i, raw_line in enumerate(lines, 1):
|
|
line = raw_line.strip()
|
|
if not line or line.startswith('#'):
|
|
consecutive_run = 0
|
|
continue
|
|
|
|
upper = line.upper()
|
|
|
|
# FROM :latest
|
|
if upper.startswith('FROM '):
|
|
img = line[5:].strip().split(' ')[0]
|
|
if ':' not in img or img.endswith(':latest'):
|
|
findings.append({
|
|
'rule': 'DL001', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL001']['severity'],
|
|
'title': DOCKERFILE_RULES['DL001']['title'],
|
|
'detail': f'Image "{img}" — pin a specific version tag.',
|
|
})
|
|
|
|
# USER directive
|
|
if upper.startswith('USER '):
|
|
has_user = True
|
|
|
|
# HEALTHCHECK
|
|
if upper.startswith('HEALTHCHECK '):
|
|
has_healthcheck = True
|
|
|
|
# ADD vs COPY
|
|
if upper.startswith('ADD ') and not line.strip().startswith('ADD --from'):
|
|
parts = line[4:].strip()
|
|
# Skip if it's a URL (ADD has valid URL use)
|
|
if not parts.startswith('http://') and not parts.startswith('https://'):
|
|
findings.append({
|
|
'rule': 'DL003', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL003']['severity'],
|
|
'title': DOCKERFILE_RULES['DL003']['title'],
|
|
'detail': f'Line {i}: prefer COPY over ADD for local files.',
|
|
})
|
|
|
|
# Secrets in ENV/ARG
|
|
if upper.startswith('ENV ') or upper.startswith('ARG '):
|
|
key = line.split()[1] if len(line.split()) > 1 else ''
|
|
key = key.split('=')[0]
|
|
if SECRET_PATTERNS.search(key):
|
|
findings.append({
|
|
'rule': 'DL004', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL004']['severity'],
|
|
'title': DOCKERFILE_RULES['DL004']['title'],
|
|
'detail': f'Line {i}: "{key}" looks like a secret. Use --secret instead.',
|
|
})
|
|
|
|
# apt-get without --no-install-recommends
|
|
if 'apt-get install' in line and '--no-install-recommends' not in line:
|
|
findings.append({
|
|
'rule': 'DL006', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL006']['severity'],
|
|
'title': DOCKERFILE_RULES['DL006']['title'],
|
|
'detail': f'Line {i}: add --no-install-recommends to reduce image size.',
|
|
})
|
|
|
|
# COPY/ADD of sensitive files
|
|
if upper.startswith('COPY ') or upper.startswith('ADD '):
|
|
if SENSITIVE_FILE_PATTERNS.search(line):
|
|
findings.append({
|
|
'rule': 'DL009', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL009']['severity'],
|
|
'title': DOCKERFILE_RULES['DL009']['title'],
|
|
'detail': f'Line {i}: copying potentially sensitive file into image.',
|
|
})
|
|
|
|
# sudo in RUN
|
|
if upper.startswith('RUN ') and 'sudo ' in line:
|
|
findings.append({
|
|
'rule': 'DL010', 'line': i,
|
|
'severity': DOCKERFILE_RULES['DL010']['severity'],
|
|
'title': DOCKERFILE_RULES['DL010']['title'],
|
|
'detail': f'Line {i}: avoid sudo in Dockerfiles.',
|
|
})
|
|
|
|
# Consecutive RUN
|
|
if upper.startswith('RUN '):
|
|
consecutive_run += 1
|
|
if consecutive_run > max_consecutive_run:
|
|
max_consecutive_run = consecutive_run
|
|
else:
|
|
consecutive_run = 0
|
|
|
|
# Post-scan checks
|
|
if not has_user:
|
|
findings.append({
|
|
'rule': 'DL002', 'line': 0,
|
|
'severity': DOCKERFILE_RULES['DL002']['severity'],
|
|
'title': DOCKERFILE_RULES['DL002']['title'],
|
|
'detail': 'No USER directive found. Container will run as root.',
|
|
})
|
|
|
|
if not has_healthcheck:
|
|
findings.append({
|
|
'rule': 'DL005', 'line': 0,
|
|
'severity': DOCKERFILE_RULES['DL005']['severity'],
|
|
'title': DOCKERFILE_RULES['DL005']['title'],
|
|
'detail': 'No HEALTHCHECK instruction. Add one for orchestration.',
|
|
})
|
|
|
|
if max_consecutive_run >= 3:
|
|
findings.append({
|
|
'rule': 'DL011', 'line': 0,
|
|
'severity': DOCKERFILE_RULES['DL011']['severity'],
|
|
'title': DOCKERFILE_RULES['DL011']['title'],
|
|
'detail': f'{max_consecutive_run} consecutive RUN commands. Chain with && to reduce layers.',
|
|
})
|
|
|
|
# Check for missing cache cleanup
|
|
if 'apt-get install' in content and 'rm -rf /var/lib/apt/lists' not in content:
|
|
findings.append({
|
|
'rule': 'DL007', 'line': 0,
|
|
'severity': DOCKERFILE_RULES['DL007']['severity'],
|
|
'title': DOCKERFILE_RULES['DL007']['title'],
|
|
'detail': 'apt-get install used without cleaning /var/lib/apt/lists/*.',
|
|
})
|
|
|
|
self._results['dockerfile_lints'] = findings
|
|
self._save_results()
|
|
return findings
|
|
|
|
# ── Container Escape Detection ───────────────────────────────────────────
|
|
|
|
def check_escape_vectors(self, container_id: str) -> dict:
|
|
"""Check for container escape possibilities."""
|
|
info = self.inspect_container(container_id)
|
|
if 'error' in info:
|
|
return info
|
|
|
|
vectors = []
|
|
|
|
def vec(name, risk, exploitable, detail):
|
|
vectors.append({
|
|
'vector': name,
|
|
'risk': risk,
|
|
'exploitable': exploitable,
|
|
'detail': detail,
|
|
})
|
|
|
|
# 1. Privileged mode — full escape
|
|
if info['privileged']:
|
|
vec('Privileged Mode', 'critical', True,
|
|
'Container has full access to host devices and kernel. '
|
|
'Trivial escape via mounting host filesystem.')
|
|
|
|
# 2. Docker socket mount
|
|
sock_mounted = any(
|
|
'/var/run/docker.sock' in m.get('source', '') or
|
|
'/run/docker.sock' in m.get('source', '')
|
|
for m in info['mounts']
|
|
)
|
|
if sock_mounted:
|
|
vec('Docker Socket Mount', 'critical', True,
|
|
'Docker socket mounted inside container. Attacker can spawn '
|
|
'privileged containers on the host.')
|
|
|
|
# 3. SYS_ADMIN capability
|
|
if 'SYS_ADMIN' in info.get('cap_add', []):
|
|
vec('SYS_ADMIN Capability', 'high', True,
|
|
'SYS_ADMIN allows mounting filesystems, modifying cgroups. '
|
|
'Combined with other misconfigs, can lead to escape.')
|
|
|
|
# 4. SYS_PTRACE capability
|
|
if 'SYS_PTRACE' in info.get('cap_add', []):
|
|
vec('SYS_PTRACE Capability', 'high', True,
|
|
'SYS_PTRACE allows process injection and debugging. '
|
|
'Can be used to escape via process injection into host PID.')
|
|
|
|
# 5. Host PID namespace
|
|
if info.get('pid_mode') == 'host':
|
|
vec('Host PID Namespace', 'high', True,
|
|
'Container shares PID namespace with host. Processes visible '
|
|
'and injectable from container.')
|
|
|
|
# 6. Host network namespace
|
|
if info.get('network_mode') == 'host':
|
|
vec('Host Network Namespace', 'medium', False,
|
|
'Container shares host network stack. Can sniff host traffic '
|
|
'and access services on localhost.')
|
|
|
|
# 7. /proc write access
|
|
proc_mounted = any(
|
|
m.get('destination', '').startswith('/proc') and m.get('rw', True)
|
|
for m in info['mounts']
|
|
)
|
|
if proc_mounted:
|
|
vec('/proc Write Access', 'high', True,
|
|
'Writable /proc mount can enable kernel parameter modification '
|
|
'and cgroup escape techniques.')
|
|
|
|
# 8. Kernel version (check for known container escape CVEs)
|
|
ok, uname = self._run('uname -r 2>/dev/null')
|
|
if ok and uname:
|
|
kernel = uname.strip()
|
|
# Known vulnerable kernel ranges (simplified check)
|
|
vec('Kernel Version', 'info', False,
|
|
f'Host kernel: {kernel}. Check against CVE-2022-0185, '
|
|
f'CVE-2022-0847 (DirtyPipe), CVE-2021-22555.')
|
|
|
|
# 9. Cgroup escape
|
|
if info['privileged'] or 'SYS_ADMIN' in info.get('cap_add', []):
|
|
vec('Cgroup Escape', 'critical' if info['privileged'] else 'high', True,
|
|
'Privileged + cgroup v1 release_agent technique enables full '
|
|
'host command execution.')
|
|
|
|
# 10. Seccomp disabled
|
|
if any('seccomp=unconfined' in opt for opt in info.get('security_opt', [])):
|
|
vec('Seccomp Disabled', 'medium', False,
|
|
'No seccomp filter. All syscalls available including '
|
|
'those needed for escape techniques.')
|
|
|
|
# 11. AppArmor disabled
|
|
if any('apparmor=unconfined' in opt for opt in info.get('security_opt', [])):
|
|
vec('AppArmor Disabled', 'medium', False,
|
|
'No AppArmor confinement. Reduced protection against '
|
|
'filesystem and network abuse.')
|
|
|
|
risk_score = 0
|
|
for v in vectors:
|
|
w = {'critical': 40, 'high': 25, 'medium': 10, 'low': 5, 'info': 0}
|
|
risk_score += w.get(v['risk'], 0)
|
|
risk_score = min(risk_score, 100)
|
|
|
|
result = {
|
|
'container_id': container_id,
|
|
'name': info.get('name', ''),
|
|
'vectors': vectors,
|
|
'risk_score': risk_score,
|
|
'total_vectors': len(vectors),
|
|
'exploitable': sum(1 for v in vectors if v['exploitable']),
|
|
}
|
|
|
|
self._results['escape_checks'][container_id] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
# ── Kubernetes Operations ────────────────────────────────────────────────
|
|
|
|
def _kubectl(self, args: str, timeout: int = 30) -> tuple:
|
|
kubectl = find_tool('kubectl')
|
|
if not kubectl:
|
|
return False, 'kubectl not found'
|
|
return self._run(f'"{kubectl}" {args}', timeout=timeout)
|
|
|
|
def _kubectl_json(self, args: str, timeout: int = 30) -> tuple:
|
|
kubectl = find_tool('kubectl')
|
|
if not kubectl:
|
|
return False, 'kubectl not found'
|
|
return self._run_json(f'"{kubectl}" {args} -o json', timeout=timeout)
|
|
|
|
def k8s_get_namespaces(self) -> list:
|
|
"""List Kubernetes namespaces."""
|
|
ok, data = self._kubectl_json('get namespaces')
|
|
if not ok:
|
|
return []
|
|
namespaces = []
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
namespaces.append({
|
|
'name': meta.get('name', ''),
|
|
'status': item.get('status', {}).get('phase', ''),
|
|
'age': meta.get('creationTimestamp', ''),
|
|
})
|
|
return namespaces
|
|
|
|
def k8s_get_pods(self, namespace: str = 'default') -> list:
|
|
"""List pods in a namespace."""
|
|
ok, data = self._kubectl_json(f'get pods -n {namespace}')
|
|
if not ok:
|
|
return []
|
|
pods = []
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
spec = item.get('spec', {})
|
|
status = item.get('status', {})
|
|
containers = [c.get('name', '') for c in spec.get('containers', [])]
|
|
pod_status = status.get('phase', 'Unknown')
|
|
conditions = status.get('conditions', [])
|
|
ready = any(c.get('type') == 'Ready' and c.get('status') == 'True'
|
|
for c in conditions)
|
|
pods.append({
|
|
'name': meta.get('name', ''),
|
|
'namespace': meta.get('namespace', namespace),
|
|
'status': pod_status,
|
|
'ready': ready,
|
|
'containers': containers,
|
|
'node': spec.get('nodeName', ''),
|
|
'age': meta.get('creationTimestamp', ''),
|
|
'restart_count': sum(
|
|
cs.get('restartCount', 0)
|
|
for cs in status.get('containerStatuses', [])
|
|
),
|
|
})
|
|
return pods
|
|
|
|
def k8s_audit_rbac(self, namespace: Optional[str] = None) -> dict:
|
|
"""Audit RBAC for overly permissive bindings."""
|
|
findings = []
|
|
|
|
# Cluster role bindings
|
|
ok, data = self._kubectl_json('get clusterrolebindings')
|
|
if ok:
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
role_ref = item.get('roleRef', {})
|
|
subjects = item.get('subjects', [])
|
|
|
|
if role_ref.get('name') == 'cluster-admin':
|
|
for subj in subjects:
|
|
findings.append({
|
|
'severity': 'critical',
|
|
'type': 'cluster-admin binding',
|
|
'binding': meta.get('name', ''),
|
|
'subject': f'{subj.get("kind", "")}/{subj.get("name", "")}',
|
|
'detail': 'cluster-admin grants full cluster access',
|
|
})
|
|
|
|
# Check for wildcard permissions in cluster roles
|
|
ok, data = self._kubectl_json('get clusterroles')
|
|
if ok:
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
role_name = meta.get('name', '')
|
|
for rule in item.get('rules', []):
|
|
verbs = rule.get('verbs', [])
|
|
resources = rule.get('resources', [])
|
|
api_groups = rule.get('apiGroups', [])
|
|
if '*' in verbs and '*' in resources:
|
|
findings.append({
|
|
'severity': 'high',
|
|
'type': 'wildcard permissions',
|
|
'binding': role_name,
|
|
'subject': '',
|
|
'detail': f'Role "{role_name}" has wildcard verbs and resources '
|
|
f'on apiGroups: {api_groups}',
|
|
})
|
|
|
|
# Check service account token automount
|
|
ns_flag = f'-n {namespace}' if namespace else '--all-namespaces'
|
|
ok, data = self._kubectl_json(f'get serviceaccounts {ns_flag}')
|
|
if ok:
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
automount = item.get('automountServiceAccountToken', True)
|
|
if automount and meta.get('name') != 'default':
|
|
findings.append({
|
|
'severity': 'low',
|
|
'type': 'token automount',
|
|
'binding': meta.get('name', ''),
|
|
'subject': f'namespace/{meta.get("namespace", "")}',
|
|
'detail': f'SA "{meta.get("name")}" has automountServiceAccountToken enabled',
|
|
})
|
|
|
|
result = {'findings': findings, 'total': len(findings)}
|
|
self._results['k8s_audits']['rbac'] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
def k8s_check_secrets(self, namespace: str = 'default') -> dict:
|
|
"""Check for exposed or unencrypted secrets."""
|
|
findings = []
|
|
|
|
ok, data = self._kubectl_json(f'get secrets -n {namespace}')
|
|
if not ok:
|
|
return {'error': 'Cannot list secrets', 'findings': []}
|
|
|
|
for item in data.get('items', []):
|
|
meta = item.get('metadata', {})
|
|
secret_type = item.get('type', '')
|
|
secret_name = meta.get('name', '')
|
|
data_keys = list((item.get('data') or {}).keys())
|
|
|
|
# Check for default token (legacy, pre-1.24)
|
|
if secret_type == 'kubernetes.io/service-account-token':
|
|
findings.append({
|
|
'severity': 'info',
|
|
'name': secret_name,
|
|
'type': secret_type,
|
|
'detail': f'SA token secret with keys: {", ".join(data_keys)}',
|
|
})
|
|
|
|
# Check for Opaque secrets with suspicious names
|
|
if secret_type == 'Opaque':
|
|
for key in data_keys:
|
|
if SECRET_PATTERNS.search(key):
|
|
findings.append({
|
|
'severity': 'medium',
|
|
'name': secret_name,
|
|
'type': secret_type,
|
|
'detail': f'Key "{key}" may contain credentials',
|
|
})
|
|
|
|
# Check which pods mount secrets
|
|
ok, pod_data = self._kubectl_json(f'get pods -n {namespace}')
|
|
if ok:
|
|
for pod in pod_data.get('items', []):
|
|
pod_name = pod.get('metadata', {}).get('name', '')
|
|
volumes = pod.get('spec', {}).get('volumes', [])
|
|
for vol in volumes:
|
|
if vol.get('secret'):
|
|
findings.append({
|
|
'severity': 'info',
|
|
'name': vol['secret'].get('secretName', ''),
|
|
'type': 'mounted',
|
|
'detail': f'Secret mounted in pod "{pod_name}"',
|
|
})
|
|
|
|
result = {'findings': findings, 'total': len(findings), 'namespace': namespace}
|
|
self._results['k8s_audits']['secrets'] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
def k8s_check_network_policies(self, namespace: str = 'default') -> dict:
|
|
"""Check if network policies exist and find unprotected pods."""
|
|
findings = []
|
|
|
|
ok, data = self._kubectl_json(f'get networkpolicies -n {namespace}')
|
|
policies = data.get('items', []) if ok else []
|
|
|
|
if not policies:
|
|
findings.append({
|
|
'severity': 'high',
|
|
'type': 'no_policies',
|
|
'detail': f'No NetworkPolicies found in namespace "{namespace}". '
|
|
f'All pod-to-pod traffic is allowed.',
|
|
})
|
|
return {'findings': findings, 'total': 1, 'namespace': namespace,
|
|
'policy_count': 0, 'unprotected_pods': []}
|
|
|
|
# Collect pod selectors covered by policies
|
|
covered_labels = set()
|
|
for pol in policies:
|
|
spec = pol.get('spec', {})
|
|
selector = spec.get('podSelector', {})
|
|
match_labels = selector.get('matchLabels', {})
|
|
if not match_labels:
|
|
covered_labels.add('__all__')
|
|
else:
|
|
for k, v in match_labels.items():
|
|
covered_labels.add(f'{k}={v}')
|
|
|
|
# Check pods without matching policies
|
|
unprotected = []
|
|
if '__all__' not in covered_labels:
|
|
ok, pod_data = self._kubectl_json(f'get pods -n {namespace}')
|
|
if ok:
|
|
for pod in pod_data.get('items', []):
|
|
meta = pod.get('metadata', {})
|
|
labels = meta.get('labels', {})
|
|
pod_labels = {f'{k}={v}' for k, v in labels.items()}
|
|
if not pod_labels.intersection(covered_labels):
|
|
unprotected.append(meta.get('name', ''))
|
|
|
|
if unprotected:
|
|
findings.append({
|
|
'severity': 'medium',
|
|
'type': 'unprotected_pods',
|
|
'detail': f'{len(unprotected)} pod(s) not covered by any NetworkPolicy',
|
|
})
|
|
|
|
result = {
|
|
'findings': findings,
|
|
'total': len(findings),
|
|
'namespace': namespace,
|
|
'policy_count': len(policies),
|
|
'unprotected_pods': unprotected,
|
|
}
|
|
self._results['k8s_audits']['network_policies'] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
def k8s_audit_pod(self, pod_name: str, namespace: str = 'default') -> dict:
|
|
"""Security audit of a Kubernetes pod."""
|
|
ok, data = self._kubectl_json(f'get pod {pod_name} -n {namespace}')
|
|
if not ok:
|
|
return {'error': f'Cannot get pod {pod_name}'}
|
|
|
|
spec = data.get('spec', {})
|
|
findings = []
|
|
passed = 0
|
|
total = 0
|
|
|
|
def check(name, ok, detail='', severity='medium'):
|
|
nonlocal passed, total
|
|
total += 1
|
|
if ok:
|
|
passed += 1
|
|
findings.append({
|
|
'check': name,
|
|
'status': 'pass' if ok else 'fail',
|
|
'severity': severity if not ok else 'info',
|
|
'detail': detail,
|
|
})
|
|
|
|
# Host namespaces
|
|
check('Host Network',
|
|
not spec.get('hostNetwork', False),
|
|
'Pod uses host network namespace!' if spec.get('hostNetwork') else 'Isolated',
|
|
severity='high')
|
|
check('Host PID',
|
|
not spec.get('hostPID', False),
|
|
'Pod uses host PID namespace!' if spec.get('hostPID') else 'Isolated',
|
|
severity='high')
|
|
check('Host IPC',
|
|
not spec.get('hostIPC', False),
|
|
'Pod uses host IPC namespace!' if spec.get('hostIPC') else 'Isolated',
|
|
severity='high')
|
|
|
|
# Per-container checks
|
|
for container in spec.get('containers', []):
|
|
c_name = container.get('name', 'unknown')
|
|
sec_ctx = container.get('securityContext', {})
|
|
|
|
# Privileged
|
|
priv = sec_ctx.get('privileged', False)
|
|
check(f'{c_name}: Privileged',
|
|
not priv,
|
|
'Container is privileged!' if priv else 'Not privileged',
|
|
severity='critical')
|
|
|
|
# Run as root
|
|
run_as_user = sec_ctx.get('runAsUser')
|
|
run_as_non_root = sec_ctx.get('runAsNonRoot', False)
|
|
is_root = run_as_user == 0 or (run_as_user is None and not run_as_non_root)
|
|
check(f'{c_name}: Root User',
|
|
not is_root,
|
|
f'Runs as UID {run_as_user}' if run_as_user and run_as_user != 0
|
|
else ('runAsNonRoot=true' if run_as_non_root else 'May run as root'),
|
|
severity='medium')
|
|
|
|
# Read-only root filesystem
|
|
ro = sec_ctx.get('readOnlyRootFilesystem', False)
|
|
check(f'{c_name}: Read-only Rootfs',
|
|
ro,
|
|
'Root filesystem is read-only' if ro else 'Writable root filesystem',
|
|
severity='low')
|
|
|
|
# Resource limits
|
|
resources = container.get('resources', {})
|
|
limits = resources.get('limits', {})
|
|
has_limits = bool(limits.get('memory') or limits.get('cpu'))
|
|
check(f'{c_name}: Resource Limits',
|
|
has_limits,
|
|
f'Limits: {limits}' if has_limits else 'No resource limits set',
|
|
severity='medium')
|
|
|
|
# Capabilities
|
|
caps = sec_ctx.get('capabilities', {})
|
|
cap_add = caps.get('add', [])
|
|
dangerous = [c for c in cap_add if c in DANGEROUS_CAPS]
|
|
all_dropped = 'ALL' in caps.get('drop', [])
|
|
check(f'{c_name}: Capabilities',
|
|
len(dangerous) == 0 and (all_dropped or not cap_add),
|
|
f'Dangerous caps: {", ".join(dangerous)}' if dangerous
|
|
else ('All capabilities dropped' if all_dropped else 'Default capabilities'),
|
|
severity='high' if dangerous else 'info')
|
|
|
|
# Privilege escalation
|
|
allow_escalation = sec_ctx.get('allowPrivilegeEscalation', True)
|
|
check(f'{c_name}: Privilege Escalation',
|
|
not allow_escalation,
|
|
'allowPrivilegeEscalation=true' if allow_escalation
|
|
else 'Privilege escalation disabled',
|
|
severity='medium')
|
|
|
|
# Service account
|
|
sa = spec.get('serviceAccountName', 'default')
|
|
automount = spec.get('automountServiceAccountToken', True)
|
|
check('Service Account',
|
|
sa != 'default' or not automount,
|
|
f'SA: {sa}, automount: {automount}',
|
|
severity='low')
|
|
|
|
score = int((passed / total) * 100) if total > 0 else 0
|
|
result = {
|
|
'pod': pod_name,
|
|
'namespace': namespace,
|
|
'score': score,
|
|
'passed': passed,
|
|
'total': total,
|
|
'findings': findings,
|
|
}
|
|
self._results['k8s_audits'][f'pod:{namespace}/{pod_name}'] = result
|
|
self._save_results()
|
|
return result
|
|
|
|
# ── Export ────────────────────────────────────────────────────────────────
|
|
|
|
def export_results(self, fmt: str = 'json') -> dict:
|
|
"""Export all audit results."""
|
|
self._results['timestamp'] = datetime.utcnow().isoformat()
|
|
if fmt == 'json':
|
|
path = self._data_dir / f'container_sec_export_{int(time.time())}.json'
|
|
with open(path, 'w') as f:
|
|
json.dump(self._results, f, indent=2, default=str)
|
|
return {'path': str(path), 'format': 'json', 'success': True}
|
|
return {'error': f'Unsupported format: {fmt}'}
|
|
|
|
|
|
# ── Singleton ────────────────────────────────────────────────────────────────
|
|
|
|
_instance = None
|
|
|
|
|
|
def get_container_sec() -> ContainerSecurity:
|
|
global _instance
|
|
if _instance is None:
|
|
_instance = ContainerSecurity()
|
|
return _instance
|
|
|
|
|
|
# ── CLI Entry Point ──────────────────────────────────────────────────────────
|
|
|
|
def run():
|
|
"""CLI entry point for Container Security module."""
|
|
cs = get_container_sec()
|
|
|
|
while True:
|
|
print(f"\n{'='*60}")
|
|
print(f" Container Security")
|
|
print(f"{'='*60}")
|
|
print()
|
|
print(" 1 — Audit Docker Host")
|
|
print(" 2 — List Containers")
|
|
print(" 3 — Audit Container")
|
|
print(" 4 — Scan Image")
|
|
print(" 5 — Lint Dockerfile")
|
|
print(" 6 — K8s Pods")
|
|
print(" 7 — K8s RBAC Audit")
|
|
print(" 0 — Back")
|
|
print()
|
|
|
|
choice = input(" > ").strip()
|
|
|
|
if choice == '0':
|
|
break
|
|
|
|
elif choice == '1':
|
|
print("\n [*] Auditing Docker host...")
|
|
findings = cs.audit_docker_host()
|
|
if not findings:
|
|
print(" [-] No findings.")
|
|
for f in findings:
|
|
sev = f.get('severity', 'info').upper()
|
|
status = f.get('status', 'info').upper()
|
|
color = {'CRITICAL': Colors.RED, 'HIGH': Colors.RED,
|
|
'MEDIUM': Colors.YELLOW, 'LOW': Colors.CYAN,
|
|
'INFO': Colors.GREEN}.get(sev, Colors.WHITE)
|
|
print(f" {color}[{sev}]{Colors.RESET} {f['check']}: {f['detail']}")
|
|
|
|
elif choice == '2':
|
|
containers = cs.list_containers(all=True)
|
|
if not containers:
|
|
print(" [-] No containers found.")
|
|
else:
|
|
print(f"\n {'ID':<14} {'Name':<25} {'Image':<30} {'Status':<15}")
|
|
print(f" {'-'*14} {'-'*25} {'-'*30} {'-'*15}")
|
|
for c in containers:
|
|
print(f" {c['id']:<14} {c['name']:<25} {c['image']:<30} {c['status']:<15}")
|
|
|
|
elif choice == '3':
|
|
cid = input(" Container ID or name: ").strip()
|
|
if cid:
|
|
print(f"\n [*] Auditing container {cid}...")
|
|
result = cs.audit_container(cid)
|
|
if 'error' in result:
|
|
print(f" [!] {result['error']}")
|
|
else:
|
|
print(f"\n Security Score: {result['score']}% ({result['passed']}/{result['total']})")
|
|
for f in result['findings']:
|
|
sym = '+' if f['status'] == 'pass' else '!'
|
|
color = Colors.GREEN if f['status'] == 'pass' else Colors.YELLOW
|
|
print(f" {color}[{sym}]{Colors.RESET} {f['check']}: {f['detail']}")
|
|
|
|
elif choice == '4':
|
|
img = input(" Image name (e.g., nginx:latest): ").strip()
|
|
if img:
|
|
print(f"\n [*] Scanning {img} for vulnerabilities...")
|
|
result = cs.scan_image(img)
|
|
if result.get('error'):
|
|
print(f" [!] {result['error']}")
|
|
else:
|
|
s = result.get('summary', {})
|
|
print(f" Scanner: {result.get('scanner', '?')}")
|
|
print(f" Total: {result.get('total', 0)} vulnerabilities")
|
|
print(f" Critical: {s.get('CRITICAL', 0)} High: {s.get('HIGH', 0)} "
|
|
f"Medium: {s.get('MEDIUM', 0)} Low: {s.get('LOW', 0)}")
|
|
for v in result.get('vulnerabilities', [])[:20]:
|
|
print(f" {v['severity']:<8} {v['cve']:<18} {v['package']:<20} "
|
|
f"{v['installed_version']} -> {v.get('fixed_version', 'n/a')}")
|
|
|
|
elif choice == '5':
|
|
path = input(" Path to Dockerfile: ").strip()
|
|
if path and os.path.isfile(path):
|
|
with open(path) as f:
|
|
content = f.read()
|
|
findings = cs.lint_dockerfile(content)
|
|
if not findings:
|
|
print(" [+] No issues found.")
|
|
else:
|
|
print(f"\n Found {len(findings)} issue(s):")
|
|
for f in findings:
|
|
sev = f.get('severity', 'info').upper()
|
|
line = f"line {f['line']}" if f.get('line') else 'general'
|
|
print(f" [{sev}] {f['rule']}: {f['title']} ({line})")
|
|
print(f" {f['detail']}")
|
|
else:
|
|
print(" [!] File not found.")
|
|
|
|
elif choice == '6':
|
|
ns = input(" Namespace (default): ").strip() or 'default'
|
|
pods = cs.k8s_get_pods(namespace=ns)
|
|
if not pods:
|
|
print(" [-] No pods found.")
|
|
else:
|
|
print(f"\n {'Name':<35} {'Status':<12} {'Node':<20} {'Restarts':<10}")
|
|
print(f" {'-'*35} {'-'*12} {'-'*20} {'-'*10}")
|
|
for p in pods:
|
|
print(f" {p['name']:<35} {p['status']:<12} {p['node']:<20} {p['restart_count']:<10}")
|
|
|
|
elif choice == '7':
|
|
ns = input(" Namespace (blank for all): ").strip() or None
|
|
print("\n [*] Auditing RBAC...")
|
|
result = cs.k8s_audit_rbac(namespace=ns)
|
|
if not result.get('findings'):
|
|
print(" [+] No RBAC issues found.")
|
|
else:
|
|
print(f" Found {result['total']} issue(s):")
|
|
for f in result['findings']:
|
|
sev = f.get('severity', 'info').upper()
|
|
print(f" [{sev}] {f['type']}: {f.get('binding', '')} — {f['detail']}")
|