Files
autarch/web/routes/settings.py
SsSnake da53899f66 AUTARCH v1.9 — remote monitoring, SSH manager, daemon, vault, cleanup
- Add Remote Monitoring Station with PIAP device profile system
- Add SSH/SSHD manager with fail2ban integration
- Add privileged daemon architecture for safe root operations
- Add encrypted vault, HAL memory, HAL auto-analyst
- Add network security suite, module creator, codex training
- Add start.sh launcher script and GTK3 desktop launcher
- Remove Output/ build artifacts, installer files, loose docs
- Update .gitignore for runtime data and build artifacts
- Update README for v1.9 with new launch method, screenshots, and features

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 06:59:06 -07:00

839 lines
34 KiB
Python

"""Settings route"""
import collections
import json
import logging
import os
import platform
import re
import subprocess
import threading
import time
from pathlib import Path
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, current_app, Response
from web.auth import login_required, hash_password, save_credentials, load_credentials
# ── Debug Console infrastructure ─────────────────────────────────────────────
_debug_buffer: collections.deque = collections.deque(maxlen=2000)
_debug_enabled: bool = False
_debug_handler_installed: bool = False
def _buf_append(level: str, name: str, raw: str, msg: str, exc: str = '') -> None:
"""Thread-safe append to the debug buffer."""
entry: dict = {'ts': time.time(), 'level': level, 'name': name, 'raw': raw, 'msg': msg}
if exc:
entry['exc'] = exc
_debug_buffer.append(entry)
class _DebugBufferHandler(logging.Handler):
"""Captures ALL log records into the in-memory debug buffer (always active)."""
def emit(self, record: logging.LogRecord) -> None:
try:
exc_text = ''
if record.exc_info:
import traceback as _tb
exc_text = ''.join(_tb.format_exception(*record.exc_info))
_buf_append(
level=record.levelname,
name=record.name,
raw=record.getMessage(),
msg=self.format(record),
exc=exc_text,
)
except Exception:
pass
class _PrintCapture:
"""Wraps sys.stdout or sys.stderr — passes through AND feeds lines to the debug buffer."""
def __init__(self, original, level: str = 'STDOUT'):
self._orig = original
self._level = level
self._line_buf = ''
def write(self, text: str) -> int:
self._orig.write(text)
self._line_buf += text
while '\n' in self._line_buf:
line, self._line_buf = self._line_buf.split('\n', 1)
if line.strip():
_buf_append(self._level, 'print', line, line)
return len(text)
def flush(self) -> None:
self._orig.flush()
def __getattr__(self, name):
return getattr(self._orig, name)
def _ensure_debug_handler() -> None:
"""Install logging handler + stdout/stderr capture once, at startup."""
global _debug_handler_installed
if _debug_handler_installed:
return
# Logging handler
handler = _DebugBufferHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('%(name)s%(message)s'))
root = logging.getLogger()
root.addHandler(handler)
if root.level == logging.NOTSET or root.level > logging.DEBUG:
root.setLevel(logging.DEBUG)
# stdout / stderr capture
import sys as _sys
if not isinstance(_sys.stdout, _PrintCapture):
_sys.stdout = _PrintCapture(_sys.stdout, 'STDOUT')
if not isinstance(_sys.stderr, _PrintCapture):
_sys.stderr = _PrintCapture(_sys.stderr, 'STDERR')
_debug_handler_installed = True
# Install immediately so we capture from process start, not just after toggle
_ensure_debug_handler()
settings_bp = Blueprint('settings', __name__, url_prefix='/settings')
@settings_bp.route('/')
@login_required
def index():
config = current_app.autarch_config
return render_template('settings.html',
llm_backend=config.get('autarch', 'llm_backend', 'local'),
llama=config.get_llama_settings(),
transformers=config.get_transformers_settings(),
claude=config.get_claude_settings(),
huggingface=config.get_huggingface_settings(),
osint=config.get_osint_settings(),
pentest=config.get_pentest_settings(),
upnp=config.get_upnp_settings(),
debug_enabled=_debug_enabled,
)
@settings_bp.route('/password', methods=['POST'])
@login_required
def change_password():
new_pass = request.form.get('new_password', '')
confirm = request.form.get('confirm_password', '')
if not new_pass or len(new_pass) < 4:
flash('Password must be at least 4 characters.', 'error')
return redirect(url_for('settings.index'))
if new_pass != confirm:
flash('Passwords do not match.', 'error')
return redirect(url_for('settings.index'))
creds = load_credentials()
save_credentials(creds['username'], hash_password(new_pass), force_change=False)
flash('Password updated.', 'success')
return redirect(url_for('settings.index'))
@settings_bp.route('/osint', methods=['POST'])
@login_required
def update_osint():
config = current_app.autarch_config
config.set('osint', 'max_threads', request.form.get('max_threads', '8'))
config.set('osint', 'timeout', request.form.get('timeout', '8'))
config.set('osint', 'include_nsfw', 'true' if request.form.get('include_nsfw') else 'false')
config.save()
flash('OSINT settings updated.', 'success')
return redirect(url_for('settings.index'))
@settings_bp.route('/upnp', methods=['POST'])
@login_required
def update_upnp():
config = current_app.autarch_config
config.set('upnp', 'enabled', 'true' if request.form.get('enabled') else 'false')
config.set('upnp', 'internal_ip', request.form.get('internal_ip', '10.0.0.26'))
config.set('upnp', 'refresh_hours', request.form.get('refresh_hours', '12'))
config.set('upnp', 'mappings', request.form.get('mappings', ''))
config.save()
flash('UPnP settings updated.', 'success')
return redirect(url_for('settings.index'))
@settings_bp.route('/llm', methods=['POST'])
@login_required
def update_llm():
config = current_app.autarch_config
backend = request.form.get('backend', 'local')
if backend == 'local':
config.set('llama', 'model_path', request.form.get('model_path', ''))
config.set('llama', 'n_ctx', request.form.get('n_ctx', '4096'))
config.set('llama', 'n_threads', request.form.get('n_threads', '4'))
config.set('llama', 'n_gpu_layers', request.form.get('n_gpu_layers', '0'))
config.set('llama', 'n_batch', request.form.get('n_batch', '512'))
config.set('llama', 'temperature', request.form.get('temperature', '0.7'))
config.set('llama', 'top_p', request.form.get('top_p', '0.9'))
config.set('llama', 'top_k', request.form.get('top_k', '40'))
config.set('llama', 'repeat_penalty', request.form.get('repeat_penalty', '1.1'))
config.set('llama', 'max_tokens', request.form.get('max_tokens', '2048'))
config.set('llama', 'seed', request.form.get('seed', '-1'))
config.set('llama', 'rope_scaling_type', request.form.get('rope_scaling_type', '0'))
config.set('llama', 'mirostat_mode', request.form.get('mirostat_mode', '0'))
config.set('llama', 'mirostat_tau', request.form.get('mirostat_tau', '5.0'))
config.set('llama', 'mirostat_eta', request.form.get('mirostat_eta', '0.1'))
config.set('llama', 'flash_attn', 'true' if request.form.get('flash_attn') else 'false')
config.set('llama', 'gpu_backend', request.form.get('gpu_backend', 'cpu'))
elif backend == 'transformers':
config.set('transformers', 'model_path', request.form.get('model_path', ''))
config.set('transformers', 'device', request.form.get('device', 'auto'))
config.set('transformers', 'torch_dtype', request.form.get('torch_dtype', 'auto'))
config.set('transformers', 'load_in_8bit', 'true' if request.form.get('load_in_8bit') else 'false')
config.set('transformers', 'load_in_4bit', 'true' if request.form.get('load_in_4bit') else 'false')
config.set('transformers', 'llm_int8_enable_fp32_cpu_offload', 'true' if request.form.get('llm_int8_enable_fp32_cpu_offload') else 'false')
config.set('transformers', 'device_map', request.form.get('device_map', 'auto'))
config.set('transformers', 'trust_remote_code', 'true' if request.form.get('trust_remote_code') else 'false')
config.set('transformers', 'use_fast_tokenizer', 'true' if request.form.get('use_fast_tokenizer') else 'false')
config.set('transformers', 'padding_side', request.form.get('padding_side', 'left'))
config.set('transformers', 'do_sample', 'true' if request.form.get('do_sample') else 'false')
config.set('transformers', 'num_beams', request.form.get('num_beams', '1'))
config.set('transformers', 'temperature', request.form.get('temperature', '0.7'))
config.set('transformers', 'top_p', request.form.get('top_p', '0.9'))
config.set('transformers', 'top_k', request.form.get('top_k', '40'))
config.set('transformers', 'repetition_penalty', request.form.get('repetition_penalty', '1.1'))
config.set('transformers', 'max_tokens', request.form.get('max_tokens', '2048'))
elif backend == 'claude':
config.set('claude', 'model', request.form.get('model', 'claude-sonnet-4-20250514'))
api_key = request.form.get('api_key', '')
if api_key:
config._set_secret('claude_api_key', api_key, 'claude', 'api_key')
config.set('claude', 'max_tokens', request.form.get('max_tokens', '4096'))
config.set('claude', 'temperature', request.form.get('temperature', '0.7'))
elif backend == 'huggingface':
config.set('huggingface', 'model', request.form.get('model', 'mistralai/Mistral-7B-Instruct-v0.3'))
api_key = request.form.get('api_key', '')
if api_key:
config._set_secret('huggingface_api_key', api_key, 'huggingface', 'api_key')
config.set('huggingface', 'endpoint', request.form.get('endpoint', ''))
config.set('huggingface', 'provider', request.form.get('provider', 'auto'))
config.set('huggingface', 'max_tokens', request.form.get('max_tokens', '1024'))
config.set('huggingface', 'temperature', request.form.get('temperature', '0.7'))
config.set('huggingface', 'top_p', request.form.get('top_p', '0.9'))
config.set('huggingface', 'top_k', request.form.get('top_k', '40'))
config.set('huggingface', 'repetition_penalty', request.form.get('repetition_penalty', '1.1'))
config.set('huggingface', 'do_sample', 'true' if request.form.get('do_sample') else 'false')
config.set('huggingface', 'seed', request.form.get('seed', '-1'))
config.set('huggingface', 'stop_sequences', request.form.get('stop_sequences', ''))
elif backend == 'openai':
config.set('openai', 'model', request.form.get('model', 'gpt-4o'))
api_key = request.form.get('api_key', '')
if api_key:
config._set_secret('openai_api_key', api_key, 'openai', 'api_key')
config.set('openai', 'base_url', request.form.get('base_url', 'https://api.openai.com/v1'))
config.set('openai', 'max_tokens', request.form.get('max_tokens', '4096'))
config.set('openai', 'temperature', request.form.get('temperature', '0.7'))
config.set('openai', 'top_p', request.form.get('top_p', '1.0'))
config.set('openai', 'frequency_penalty', request.form.get('frequency_penalty', '0.0'))
config.set('openai', 'presence_penalty', request.form.get('presence_penalty', '0.0'))
# Switch active backend
config.set('autarch', 'llm_backend', backend)
config.save()
_log = logging.getLogger('autarch.settings')
_log.info(f"[Settings] LLM backend switched to: {backend}")
# Reset LLM instance so next request triggers fresh load
try:
from core.llm import reset_llm
reset_llm()
_log.info("[Settings] LLM instance reset — will reload on next chat request")
except Exception as exc:
_log.error(f"[Settings] reset_llm() error: {exc}", exc_info=True)
flash(f'LLM backend switched to {backend} and settings saved.', 'success')
return redirect(url_for('settings.llm_settings'))
# ── LLM Settings Sub-Page ─────────────────────────────────────────────────────
@settings_bp.route('/llm')
@login_required
def llm_settings():
config = current_app.autarch_config
from core.paths import get_app_dir
default_models_dir = str(get_app_dir() / 'models')
return render_template('llm_settings.html',
llm_backend=config.get('autarch', 'llm_backend', 'local'),
llama=config.get_llama_settings(),
transformers=config.get_transformers_settings(),
claude=config.get_claude_settings(),
openai=config.get_openai_settings(),
huggingface=config.get_huggingface_settings(),
agents=config.get_agents_settings(),
mcp_port=config.get('web', 'mcp_port', fallback='8081'),
default_models_dir=default_models_dir,
)
@settings_bp.route('/llm/load', methods=['POST'])
@login_required
def llm_load():
"""Force-load the currently configured LLM backend and return status."""
_log = logging.getLogger('autarch.settings')
try:
from core.llm import reset_llm, get_llm
from core.config import get_config
config = get_config()
backend = config.get('autarch', 'llm_backend', 'local')
_log.info(f"[LLM Load] Requested by user — backend: {backend}")
reset_llm()
llm = get_llm()
model_name = llm.model_name if hasattr(llm, 'model_name') else 'unknown'
_log.info(f"[LLM Load] Success — backend: {backend} | model: {model_name}")
return jsonify({'ok': True, 'backend': backend, 'model_name': model_name})
except Exception as exc:
_log.error(f"[LLM Load] Failed: {exc}", exc_info=True)
return jsonify({'ok': False, 'error': str(exc)})
@settings_bp.route('/llm/claude-models', methods=['POST'])
@login_required
def llm_claude_models():
"""Fetch available Claude models from the Anthropic API."""
_log = logging.getLogger('autarch.settings')
try:
import anthropic
except ImportError:
return jsonify({'ok': False, 'error': 'anthropic package not installed'})
config = current_app.autarch_config
api_key = config.get('claude', 'api_key', fallback='') or os.environ.get('ANTHROPIC_API_KEY', '')
if not api_key:
return jsonify({'ok': False, 'error': 'No Claude API key configured'})
try:
client = anthropic.Anthropic(api_key=api_key)
resp = client.models.list(limit=100)
models = []
for m in resp.data:
models.append({
'id': m.id,
'name': getattr(m, 'display_name', m.id),
'created': getattr(m, 'created_at', None),
})
# Sort: newest first, then alphabetical
models.sort(key=lambda x: x['id'])
return jsonify({'ok': True, 'models': models})
except Exception as exc:
_log.error(f"[Claude Models] API error: {exc}", exc_info=True)
return jsonify({'ok': False, 'error': str(exc)})
@settings_bp.route('/agents/save', methods=['POST'])
@login_required
def agents_save():
"""Save agent configuration settings."""
_log = logging.getLogger('autarch.settings')
config = current_app.autarch_config
data = request.get_json(silent=True) or {}
for key in ('backend', 'local_max_steps', 'local_verbose',
'claude_enabled', 'claude_model', 'claude_max_tokens', 'claude_max_steps',
'openai_enabled', 'openai_model', 'openai_base_url', 'openai_max_tokens', 'openai_max_steps'):
if key in data:
val = data[key]
if isinstance(val, bool):
val = 'true' if val else 'false'
config.set('agents', key, str(val))
config.save()
_log.info(f"[Agents] Settings saved — backend: {data.get('backend', '?')}")
return jsonify({'ok': True})
@settings_bp.route('/llm/scan-models', methods=['POST'])
@login_required
def llm_scan_models():
"""Scan a folder for supported local model files and return a list."""
data = request.get_json(silent=True) or {}
folder = data.get('folder', '').strip()
if not folder:
return jsonify({'ok': False, 'error': 'No folder provided'})
folder_path = Path(folder)
if not folder_path.is_dir():
return jsonify({'ok': False, 'error': f'Directory not found: {folder}'})
models = []
try:
# GGUF / GGML / legacy bin files (single-file models)
for ext in ('*.gguf', '*.ggml', '*.bin'):
for p in sorted(folder_path.glob(ext)):
size_mb = p.stat().st_size / (1024 * 1024)
models.append({
'name': p.name,
'path': str(p),
'type': 'gguf' if p.suffix in ('.gguf', '.ggml') else 'bin',
'size_mb': round(size_mb, 1),
})
# SafeTensors model directories (contain config.json + *.safetensors)
for child in sorted(folder_path.iterdir()):
if not child.is_dir():
continue
has_config = (child / 'config.json').exists()
has_st = any(child.glob('*.safetensors'))
has_st_index = (child / 'model.safetensors.index.json').exists()
if has_config and (has_st or has_st_index):
total_mb = sum(
p.stat().st_size for p in child.glob('*.safetensors')
) / (1024 * 1024)
models.append({
'name': child.name + '/',
'path': str(child),
'type': 'safetensors',
'size_mb': round(total_mb, 1),
})
# Also scan one level of subdirectories for GGUF files
for child in sorted(folder_path.iterdir()):
if not child.is_dir():
continue
for ext in ('*.gguf', '*.ggml'):
for p in sorted(child.glob(ext)):
size_mb = p.stat().st_size / (1024 * 1024)
models.append({
'name': child.name + '/' + p.name,
'path': str(p),
'type': 'gguf',
'size_mb': round(size_mb, 1),
})
return jsonify({'ok': True, 'models': models, 'folder': str(folder_path)})
except PermissionError as e:
return jsonify({'ok': False, 'error': f'Permission denied: {e}'})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/llm/hf-verify', methods=['POST'])
@login_required
def llm_hf_verify():
"""Verify a HuggingFace token and return account info."""
data = request.get_json(silent=True) or {}
token = data.get('token', '').strip()
if not token:
return jsonify({'ok': False, 'error': 'No token provided'})
try:
from huggingface_hub import HfApi
api = HfApi(token=token)
info = api.whoami()
return jsonify({'ok': True, 'username': info.get('name', ''), 'email': info.get('email', '')})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
# ── MCP Server API ───────────────────────────────────────────
@settings_bp.route('/mcp/save', methods=['POST'])
@login_required
def mcp_save():
"""Save MCP server configuration."""
_log = logging.getLogger('autarch.settings')
config = current_app.autarch_config
data = request.get_json(silent=True) or {}
mcp_keys = ('enabled', 'auto_start', 'transport', 'host', 'port', 'log_level',
'instructions', 'auth_enabled', 'auth_token', 'rate_limit',
'mask_errors', 'request_timeout', 'max_message_size', 'cors_origins',
'ssl_enabled', 'ssl_cert', 'ssl_key', 'disabled_tools',
'nmap_timeout', 'tcpdump_timeout', 'whois_timeout', 'dns_timeout',
'geoip_timeout', 'geoip_endpoint')
for key in mcp_keys:
if key in data:
val = data[key]
if isinstance(val, bool):
val = 'true' if val else 'false'
config.set('mcp', key, str(val))
config.save()
_log.info(f"[MCP] Settings saved")
return jsonify({'ok': True})
@settings_bp.route('/mcp/generate-token', methods=['POST'])
@login_required
def mcp_generate_token():
"""Generate a new random auth token for MCP."""
import secrets
token = secrets.token_urlsafe(32)
config = current_app.autarch_config
config.set('mcp', 'auth_token', token)
config.save()
return jsonify({'ok': True, 'token': token})
@settings_bp.route('/mcp/status', methods=['POST'])
@login_required
def mcp_status():
try:
from core.mcp_server import get_server_status, get_autarch_tools
status = get_server_status()
tools = [{'name': t['name'], 'description': t['description']} for t in get_autarch_tools()]
return jsonify({'ok': True, 'status': status, 'tools': tools})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/mcp/start', methods=['POST'])
@login_required
def mcp_start():
try:
from core.mcp_server import start_sse_server
config = current_app.autarch_config
port = config.get_int('mcp', 'port', 8081)
host = config.get('mcp', 'host', '0.0.0.0')
result = start_sse_server(host=host, port=port)
return jsonify(result)
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/mcp/stop', methods=['POST'])
@login_required
def mcp_stop():
try:
from core.mcp_server import stop_sse_server
result = stop_sse_server()
return jsonify(result)
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/mcp/config', methods=['POST'])
@login_required
def mcp_config():
try:
from core.mcp_server import get_mcp_config_snippet
return jsonify({'ok': True, 'config': get_mcp_config_snippet()})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
# ── Discovery API ────────────────────────────────────────────
@settings_bp.route('/discovery/status', methods=['POST'])
@login_required
def discovery_status():
try:
from core.discovery import get_discovery_manager
mgr = get_discovery_manager()
return jsonify({'ok': True, 'status': mgr.get_status()})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/discovery/start', methods=['POST'])
@login_required
def discovery_start():
try:
from core.discovery import get_discovery_manager
mgr = get_discovery_manager()
results = mgr.start_all()
return jsonify({'ok': True, 'results': results})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
@settings_bp.route('/discovery/stop', methods=['POST'])
@login_required
def discovery_stop():
try:
from core.discovery import get_discovery_manager
mgr = get_discovery_manager()
results = mgr.stop_all()
return jsonify({'ok': True, 'results': results})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)})
# ── Debug Console API ─────────────────────────────────────────────────────────
@settings_bp.route('/debug/toggle', methods=['POST'])
@login_required
def debug_toggle():
"""Enable or disable the debug console UI (capture always runs)."""
global _debug_enabled
data = request.get_json(silent=True) or {}
_debug_enabled = bool(data.get('enabled', False))
if _debug_enabled:
logging.getLogger('autarch.debug').info('Debug console opened')
return jsonify({'ok': True, 'enabled': _debug_enabled})
@settings_bp.route('/debug/stream')
@login_required
def debug_stream():
"""SSE stream — pushes log records to the browser as they arrive.
On connect: sends the last 200 buffered entries as history, then streams
new entries live. Handles deque wrap-around correctly.
"""
def generate():
buf = list(_debug_buffer)
# Send last 200 entries as catch-up history
history_start = max(0, len(buf) - 200)
for entry in buf[history_start:]:
yield f"data: {json.dumps(entry)}\n\n"
sent = len(buf)
while True:
time.sleep(0.2)
buf = list(_debug_buffer)
n = len(buf)
if sent > n:
# deque wrapped; re-orient to current tail
sent = n
while sent < n:
yield f"data: {json.dumps(buf[sent])}\n\n"
sent += 1
yield ': keepalive\n\n'
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@settings_bp.route('/debug/clear', methods=['POST'])
@login_required
def debug_clear():
"""Clear the in-memory debug buffer."""
_debug_buffer.clear()
return jsonify({'ok': True})
@settings_bp.route('/debug/test', methods=['POST'])
@login_required
def debug_test():
"""Emit one log record at each level so the user can verify the debug window."""
log = logging.getLogger('autarch.test')
log.debug('DEBUG — detailed diagnostic info, variable states')
log.info('INFO — normal operation: module loaded, connection established')
log.warning('WARNING — something unexpected but recoverable')
log.error('ERROR — an operation failed, check the details below')
try:
raise ValueError('Example exception to show stack trace capture')
except ValueError:
log.exception('EXCEPTION — error with full traceback')
return jsonify({'ok': True, 'sent': 5})
# ==================== MCP SERVER ====================
@settings_bp.route('/mcp')
@login_required
def mcp_settings():
"""MCP Server configuration and management page."""
config = current_app.autarch_config
return render_template('mcp_settings.html',
mcp=config.get_mcp_settings(),
)
# ==================== DEPENDENCIES ====================
@settings_bp.route('/deps')
@login_required
def deps_index():
"""Dependencies management page."""
return render_template('system_deps.html')
@settings_bp.route('/deps/system-check', methods=['POST'])
@login_required
def deps_system_check():
"""Check non-Python system tools availability."""
import shutil
tools_to_check = {
'nmap': {'cmd': 'nmap', 'version_flag': '--version'},
'tshark': {'cmd': 'tshark', 'version_flag': '--version'},
'tcpdump': {'cmd': 'tcpdump', 'version_flag': '--version'},
'msfconsole': {'cmd': 'msfconsole', 'version_flag': '--version'},
'wg': {'cmd': 'wg', 'version_flag': '--version'},
'node': {'cmd': 'node', 'version_flag': '--version'},
'go': {'cmd': 'go', 'version_flag': 'version'},
'adb': {'cmd': 'adb', 'version_flag': 'version'},
'upnpc': {'cmd': 'upnpc', 'version_flag': '--help'},
'whois': {'cmd': 'whois', 'version_flag': '--version'},
'aircrack': {'cmd': 'aircrack-ng', 'version_flag': '--version'},
'mdk4': {'cmd': 'mdk4', 'version_flag': '--help'},
'sslstrip': {'cmd': 'sslstrip', 'version_flag': '-h'},
'iw': {'cmd': 'iw', 'version_flag': '--version'},
'nmcli': {'cmd': 'nmcli', 'version_flag': '--version'},
'hostapd': {'cmd': 'hostapd', 'version_flag': '-v'},
'dnsmasq': {'cmd': 'dnsmasq', 'version_flag': '--version'},
'nft': {'cmd': 'nft', 'version_flag': '--version'},
'torch': {'python_check': True},
}
results = {}
for name, info in tools_to_check.items():
if info.get('python_check'):
try:
import importlib
mod = importlib.import_module('torch')
ver = getattr(mod, '__version__', 'unknown')
cuda = 'CUDA' if getattr(mod, 'cuda', None) and mod.cuda.is_available() else 'CPU'
results[name] = {'found': True, 'version': f'{ver} ({cuda})'}
except ImportError:
results[name] = {'found': False}
continue
cmd = info['cmd']
path = shutil.which(cmd)
# Also check bundled tools
if not path:
from core.paths import find_tool
found = find_tool(cmd)
if found:
path = str(found)
if path:
version = ''
try:
r = subprocess.run([path, info['version_flag']],
capture_output=True, text=True, timeout=5)
output = (r.stdout + r.stderr).strip()
# Extract first line with a version-like pattern
for line in output.split('\n')[:3]:
if any(c.isdigit() for c in line):
version = line.strip()[:80]
break
except Exception:
version = 'found'
results[name] = {'found': True, 'version': version or 'found'}
else:
results[name] = {'found': False}
return jsonify({'ok': True, 'tools': results})
@settings_bp.route('/deps/check', methods=['POST'])
@login_required
def deps_check():
"""Check all system dependencies."""
import sys as _sys
groups = {
'core': {
'flask': 'import flask; print(flask.__version__)',
'jinja2': 'import jinja2; print(jinja2.__version__)',
'requests': 'import requests; print(requests.__version__)',
'cryptography': 'import cryptography; print(cryptography.__version__)',
},
'llm': {
'llama-cpp-python': 'import llama_cpp; print(llama_cpp.__version__)',
'transformers': 'import transformers; print(transformers.__version__)',
'anthropic': 'import anthropic; print(anthropic.__version__)',
},
'training': {
'torch': 'import torch; print(torch.__version__)',
'peft': 'import peft; print(peft.__version__)',
'datasets': 'import datasets; print(datasets.__version__)',
'trl': 'import trl; print(trl.__version__)',
'accelerate': 'import accelerate; print(accelerate.__version__)',
'bitsandbytes': 'import bitsandbytes; print(bitsandbytes.__version__)',
'unsloth': 'import unsloth; print(unsloth.__version__)',
},
'network': {
'scapy': 'import scapy; print(scapy.VERSION)',
'pyshark': 'import pyshark; print(pyshark.__version__)',
'miniupnpc': 'import miniupnpc; print("installed")',
'msgpack': 'import msgpack; print(msgpack.version)',
'paramiko': 'import paramiko; print(paramiko.__version__)',
},
'hardware': {
'pyserial': 'import serial; print(serial.__version__)',
'esptool': 'import esptool; print(esptool.__version__)',
'adb-shell': 'import adb_shell; print("installed")',
},
}
results = {}
for group, packages in groups.items():
results[group] = {}
for name, cmd in packages.items():
try:
result = subprocess.run(
[_sys.executable, '-c', cmd],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0:
results[group][name] = {'installed': True, 'version': result.stdout.strip()}
else:
results[group][name] = {'installed': False, 'version': None}
except Exception:
results[group][name] = {'installed': False, 'version': None}
# GPU info
gpu = {}
try:
result = subprocess.run(
[_sys.executable, '-c',
'import torch; print(torch.cuda.is_available()); '
'print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else "none"); '
'print(torch.version.cuda or "none")'],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
gpu['cuda_available'] = lines[0].strip() == 'True'
gpu['device'] = lines[1].strip() if len(lines) > 1 else 'none'
gpu['cuda_version'] = lines[2].strip() if len(lines) > 2 else 'none'
except Exception:
gpu['cuda_available'] = False
results['gpu'] = gpu
# Python info
import sys as _s
results['python'] = {
'version': _s.version.split()[0],
'executable': _s.executable,
'platform': platform.platform(),
}
return jsonify(results)
@settings_bp.route('/deps/install', methods=['POST'])
@login_required
def deps_install():
"""Install packages."""
import sys as _sys
data = request.get_json(silent=True) or {}
packages = data.get('packages', [])
if not packages:
return jsonify({'error': 'No packages specified'}), 400
results = []
for pkg in packages:
# Sanitize package name
if not re.match(r'^[a-zA-Z0-9_\-\[\]]+$', pkg):
results.append({'package': pkg, 'success': False, 'output': 'Invalid package name'})
continue
try:
result = subprocess.run(
[_sys.executable, '-m', 'pip', 'install', pkg, '--quiet'],
capture_output=True, text=True, timeout=300
)
results.append({
'package': pkg,
'success': result.returncode == 0,
'output': result.stdout.strip() or result.stderr.strip()[:200],
})
except Exception as e:
results.append({'package': pkg, 'success': False, 'output': str(e)[:200]})
return jsonify({'results': results})