"""Audit logging — log all actions with timestamp, user, action, target.""" import os import json import time from datetime import datetime AUDIT_DIR = os.path.join(os.path.expanduser("~"), ".setec-mgr") AUDIT_FILE = os.path.join(AUDIT_DIR, "audit.log") MAX_LOG_SIZE = 5 * 1024 * 1024 # 5MB, then rotate def log(action, target="", details="", user="admin", ip=""): """Append an audit entry.""" os.makedirs(AUDIT_DIR, exist_ok=True) _rotate_if_needed() entry = { "ts": datetime.utcnow().isoformat() + "Z", "user": user, "ip": ip, "action": action, "target": target, "details": details, } with open(AUDIT_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") def _rotate_if_needed(): """Rotate log if it exceeds MAX_LOG_SIZE.""" if os.path.exists(AUDIT_FILE) and os.path.getsize(AUDIT_FILE) > MAX_LOG_SIZE: rotated = AUDIT_FILE + f".{int(time.time())}" os.rename(AUDIT_FILE, rotated) def get_recent(count=100): """Return the last N audit entries.""" if not os.path.exists(AUDIT_FILE): return [] with open(AUDIT_FILE, "r", encoding="utf-8") as f: lines = f.readlines() entries = [] for line in lines[-count:]: try: entries.append(json.loads(line.strip())) except json.JSONDecodeError: pass return entries def search(query="", action_filter="", limit=200): """Search audit log.""" if not os.path.exists(AUDIT_FILE): return [] results = [] with open(AUDIT_FILE, "r", encoding="utf-8") as f: for line in f: try: entry = json.loads(line.strip()) if action_filter and entry.get("action") != action_filter: continue if query and query.lower() not in line.lower(): continue results.append(entry) if len(results) >= limit: break except json.JSONDecodeError: pass return results