Files
autarch/web/routes/remote_monitor.py

275 lines
9.4 KiB
Python
Raw Permalink Normal View History

"""Remote Monitoring Station — load .piap device profiles and control remote radios."""
import configparser
import logging
import os
import subprocess
import threading
import time
from datetime import datetime
from pathlib import Path
from flask import Blueprint, render_template, request, jsonify
logger = logging.getLogger(__name__)
remote_monitor_bp = Blueprint('remote_monitor', __name__, url_prefix='/remote-monitor')
PIAP_DIR = Path(__file__).parent.parent.parent / 'data' / 'piap'
CAPTURE_DIR = Path(__file__).parent.parent.parent / 'data' / 'captures'
_ssh_sessions = {}
_capture_threads = {}
def _parse_piap(filepath):
"""Parse a .piap file into a dict structure."""
cfg = configparser.ConfigParser(interpolation=None)
cfg.read(filepath)
device = dict(cfg['device']) if 'device' in cfg else {}
connection = dict(cfg['connection']) if 'connection' in cfg else {}
radios = []
i = 0
while f'radio_{i}' in cfg:
radio = dict(cfg[f'radio_{i}'])
radio['index'] = i
if 'channels' in radio:
radio['channel_list'] = [c.strip() for c in radio['channels'].split(',')]
if 'modes' in radio:
radio['mode_list'] = [m.strip() for m in radio['modes'].split(',')]
radios.append(radio)
i += 1
features = dict(cfg['features']) if 'features' in cfg else {}
info_cmds = dict(cfg['info']) if 'info' in cfg else {}
return {
'device': device,
'connection': connection,
'radios': radios,
'features': features,
'info': info_cmds,
'filename': os.path.basename(filepath),
}
def _ssh_cmd(conn, cmd, timeout=15):
"""Run a command on the remote device over SSH."""
host = conn.get('host', '')
port = conn.get('port', '22')
user = conn.get('user', 'root')
auth = conn.get('auth', 'key')
key_path = conn.get('key_path', '')
password = conn.get('password', '')
ssh_timeout = conn.get('timeout', '10')
ssh_args = ['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'ConnectTimeout=' + ssh_timeout,
'-p', port]
if auth == 'key' and key_path:
ssh_args += ['-i', key_path]
ssh_args.append(f'{user}@{host}')
ssh_args.append(cmd)
try:
r = subprocess.run(ssh_args, capture_output=True, text=True, timeout=timeout)
return {'ok': r.returncode == 0, 'stdout': r.stdout.strip(), 'stderr': r.stderr.strip(), 'code': r.returncode}
except subprocess.TimeoutExpired:
return {'ok': False, 'stdout': '', 'stderr': 'timeout', 'code': -1}
except Exception as e:
return {'ok': False, 'stdout': '', 'stderr': str(e), 'code': -1}
def _expand_cmd(cmd_template, radio=None, channel=None, bssid=None, count=None, timestamp=None):
"""Replace {variables} in a command template."""
if not cmd_template:
return ''
cmd = cmd_template
if radio:
cmd = cmd.replace('{phy}', radio.get('phy', ''))
cmd = cmd.replace('{interface}', radio.get('interface', ''))
cmd = cmd.replace('{mon}', radio.get('monitor_interface', ''))
cmd = cmd.replace('{channels}', radio.get('channels', ''))
if channel:
cmd = cmd.replace('{channel}', str(channel))
elif radio:
cmd = cmd.replace('{channel}', radio.get('default_channel', '1'))
if bssid:
cmd = cmd.replace('{bssid}', bssid)
if count:
cmd = cmd.replace('{count}', str(count))
if timestamp is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
cmd = cmd.replace('{timestamp}', timestamp)
return cmd
# ── Routes ──────────────────────────────────────────────────────────────────
@remote_monitor_bp.route('/')
def index():
"""Main page — loads available .piap files for dropdown."""
piap_files = []
for f in sorted(PIAP_DIR.glob('*.piap')):
if f.name == 'template.piap':
continue
try:
p = _parse_piap(f)
piap_files.append({'filename': f.name, 'name': p['device'].get('name', f.stem)})
except Exception as e:
logger.warning("Failed to parse %s: %s", f, e)
return render_template('remote_monitor.html', piap_files=piap_files)
@remote_monitor_bp.route('/api/load', methods=['POST'])
def load_piap():
"""Load a .piap file and return its full config."""
filename = request.json.get('filename', '')
filepath = PIAP_DIR / filename
if not filepath.exists() or not filepath.suffix == '.piap':
return jsonify({'ok': False, 'error': 'File not found'}), 404
try:
data = _parse_piap(filepath)
return jsonify({'ok': True, 'data': data})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)}), 500
@remote_monitor_bp.route('/api/connect', methods=['POST'])
def connect():
"""Test SSH connection to the remote device."""
conn = request.json.get('connection', {})
result = _ssh_cmd(conn, 'echo ok')
return jsonify(result)
@remote_monitor_bp.route('/api/info', methods=['POST'])
def device_info():
"""Get device info (uptime, memory, kernel, etc)."""
conn = request.json.get('connection', {})
info_cmds = request.json.get('info', {})
results = {}
for key, cmd in info_cmds.items():
if key.startswith('cmd_'):
label = key[4:]
r = _ssh_cmd(conn, cmd)
results[label] = r.get('stdout', r.get('stderr', ''))
return jsonify({'ok': True, 'info': results})
@remote_monitor_bp.route('/api/radio/status', methods=['POST'])
def radio_status():
"""Get status of a specific radio."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
cmd = _expand_cmd(radio.get('cmd_status', 'iw dev'), radio)
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/radio/monitor-on', methods=['POST'])
def monitor_on():
"""Enable monitor mode on a radio."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
channel = request.json.get('channel', radio.get('default_channel', '1'))
cmd = _expand_cmd(radio.get('cmd_monitor_on', ''), radio, channel=channel)
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/radio/monitor-off', methods=['POST'])
def monitor_off():
"""Disable monitor mode on a radio."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
cmd = _expand_cmd(radio.get('cmd_monitor_off', ''), radio)
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/radio/set-channel', methods=['POST'])
def set_channel():
"""Set channel on a monitor interface."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
channel = request.json.get('channel', '1')
cmd = _expand_cmd(radio.get('cmd_set_channel', ''), radio, channel=channel)
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/capture/start', methods=['POST'])
def capture_start():
"""Start packet capture on remote device."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
features = request.json.get('features', {})
cmd = _expand_cmd(features.get('cmd_capture_start', ''), radio)
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/capture/stop', methods=['POST'])
def capture_stop():
"""Stop packet capture on remote device."""
conn = request.json.get('connection', {})
features = request.json.get('features', {})
cmd = features.get('cmd_capture_stop', 'killall tcpdump 2>/dev/null')
result = _ssh_cmd(conn, cmd)
return jsonify(result)
@remote_monitor_bp.route('/api/scan', methods=['POST'])
def wifi_scan():
"""Run passive WiFi scan."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
features = request.json.get('features', {})
cmd = _expand_cmd(features.get('cmd_wifi_scan', ''), radio)
result = _ssh_cmd(conn, cmd, timeout=30)
return jsonify(result)
@remote_monitor_bp.route('/api/deauth', methods=['POST'])
def deauth():
"""Send deauth frames."""
conn = request.json.get('connection', {})
radio = request.json.get('radio', {})
features = request.json.get('features', {})
bssid = request.json.get('bssid', '')
count = request.json.get('count', '10')
cmd = _expand_cmd(features.get('cmd_deauth', ''), radio, bssid=bssid, count=count)
result = _ssh_cmd(conn, cmd, timeout=30)
return jsonify(result)
@remote_monitor_bp.route('/api/exec', methods=['POST'])
def exec_cmd():
"""Execute an arbitrary command on the remote device."""
conn = request.json.get('connection', {})
radio = request.json.get('radio')
cmd = request.json.get('cmd', '')
if radio:
cmd = _expand_cmd(cmd, radio)
result = _ssh_cmd(conn, cmd, timeout=30)
return jsonify(result)
@remote_monitor_bp.route('/api/piap/list')
def list_piaps():
"""List available .piap files."""
piap_files = []
for f in sorted(PIAP_DIR.glob('*.piap')):
if f.name == 'template.piap':
continue
try:
p = _parse_piap(f)
piap_files.append({'filename': f.name, 'name': p['device'].get('name', f.stem)})
except:
pass
return jsonify({'ok': True, 'files': piap_files})