Files
setec_cdm/setec-web/audit.py
DigiJ 9e839ee826 Initial commit — SETEC LABS Manager (Setec_CDM)
Flask-based VPS management panel with SSH remote command execution.
Includes E2E encrypted SSH tunnel (AES-256-GCM + Go agent), setup wizard,
security hardening tools, DNS management, firewall configs, monitoring,
backup, and .sec patch update system.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 12:39:02 -07:00

70 lines
2.0 KiB
Python

"""Audit logging — log all actions with timestamp, user, action, target."""
import os
import json
import time
from datetime import datetime
AUDIT_DIR = os.path.join(os.path.expanduser("~"), ".setec-mgr")
AUDIT_FILE = os.path.join(AUDIT_DIR, "audit.log")
MAX_LOG_SIZE = 5 * 1024 * 1024 # 5MB, then rotate
def log(action, target="", details="", user="admin", ip=""):
"""Append an audit entry."""
os.makedirs(AUDIT_DIR, exist_ok=True)
_rotate_if_needed()
entry = {
"ts": datetime.utcnow().isoformat() + "Z",
"user": user,
"ip": ip,
"action": action,
"target": target,
"details": details,
}
with open(AUDIT_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
def _rotate_if_needed():
"""Rotate log if it exceeds MAX_LOG_SIZE."""
if os.path.exists(AUDIT_FILE) and os.path.getsize(AUDIT_FILE) > MAX_LOG_SIZE:
rotated = AUDIT_FILE + f".{int(time.time())}"
os.rename(AUDIT_FILE, rotated)
def get_recent(count=100):
"""Return the last N audit entries."""
if not os.path.exists(AUDIT_FILE):
return []
with open(AUDIT_FILE, "r", encoding="utf-8") as f:
lines = f.readlines()
entries = []
for line in lines[-count:]:
try:
entries.append(json.loads(line.strip()))
except json.JSONDecodeError:
pass
return entries
def search(query="", action_filter="", limit=200):
"""Search audit log."""
if not os.path.exists(AUDIT_FILE):
return []
results = []
with open(AUDIT_FILE, "r", encoding="utf-8") as f:
for line in f:
try:
entry = json.loads(line.strip())
if action_filter and entry.get("action") != action_filter:
continue
if query and query.lower() not in line.lower():
continue
results.append(entry)
if len(results) >= limit:
break
except json.JSONDecodeError:
pass
return results