Files

100 lines
2.5 KiB
Python
Raw Permalink Normal View History

"""Shared logging and hex formatting utilities"""
import os
import threading
from datetime import datetime
from collections import deque
lock = threading.Lock()
log_lines = deque(maxlen=2000)
_logfile = None
_logfile_path = None
LOG_MAX_BYTES = 1024 * 1024 * 1024 # 1 GiB
_log_rotate_lock = threading.Lock()
# Color codes for TUI
C_NONE = 0
C_ERROR = 1
C_SUCCESS = 2
C_INFO = 3
C_TRAFFIC = 4
C_IMPORTANT = 5
def init_logfile(path):
global _logfile, _logfile_path
os.makedirs(os.path.dirname(path), exist_ok=True)
_logfile_path = path
_logfile = open(path, "a")
def close_logfile():
global _logfile
if _logfile:
_logfile.close()
_logfile = None
def _maybe_rotate():
"""Rotate the active log file if it exceeds LOG_MAX_BYTES."""
global _logfile
if not _logfile or not _logfile_path:
return
try:
size = os.fstat(_logfile.fileno()).st_size
except OSError:
return
if size < LOG_MAX_BYTES:
return
with _log_rotate_lock:
try:
size = os.fstat(_logfile.fileno()).st_size
if size < LOG_MAX_BYTES:
return
_logfile.close()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
os.rename(_logfile_path, f"{_logfile_path}.{ts}")
_logfile = open(_logfile_path, "a")
_logfile.write(f"[{datetime.now().strftime('%H:%M:%S')}] log rotated (>1GB)\n")
_logfile.flush()
except Exception as e:
try:
_logfile = open(_logfile_path, "a")
except Exception:
_logfile = None
def log(msg, color=C_NONE):
ts = datetime.now().strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
with lock:
log_lines.append((line, color))
if _logfile:
try:
_logfile.write(line + "\n")
_logfile.flush()
except Exception:
pass
_maybe_rotate()
def hexdump(data, max_bytes=128):
lines = []
for i in range(0, min(len(data), max_bytes), 16):
chunk = data[i:i + 16]
hx = " ".join(f"{b:02x}" for b in chunk)
asc = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f" {i:04x} {hx:<48} {asc}")
if len(data) > max_bytes:
lines.append(f" ... ({len(data)} bytes total)")
return "\n".join(lines)
def save_raw(log_dir, name, data):
import time
os.makedirs(log_dir, exist_ok=True)
path = f"{log_dir}/{name}_{int(time.time())}.bin"
with open(path, "wb") as f:
f.write(data)
return path