275 lines
9.1 KiB
Python
275 lines
9.1 KiB
Python
|
|
"""
|
||
|
|
AUTARCH UPnP Manager
|
||
|
|
Manages UPnP port forwarding via miniupnpc (upnpc CLI)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import subprocess
|
||
|
|
import re
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import List, Dict, Optional, Tuple
|
||
|
|
|
||
|
|
from core.paths import find_tool
|
||
|
|
|
||
|
|
|
||
|
|
class UPnPManager:
|
||
|
|
"""UPnP port forwarding manager wrapping the upnpc CLI."""
|
||
|
|
|
||
|
|
def __init__(self, config=None):
|
||
|
|
self.config = config
|
||
|
|
self._upnpc = find_tool('upnpc')
|
||
|
|
|
||
|
|
def is_available(self) -> bool:
|
||
|
|
"""Check if upnpc is installed."""
|
||
|
|
return self._upnpc is not None
|
||
|
|
|
||
|
|
def _run(self, args: list, timeout: int = 15) -> Tuple[bool, str]:
|
||
|
|
"""Run upnpc with arguments and return (success, output)."""
|
||
|
|
if not self._upnpc:
|
||
|
|
return False, "upnpc not found. Install miniupnpc."
|
||
|
|
try:
|
||
|
|
result = subprocess.run(
|
||
|
|
[self._upnpc] + args,
|
||
|
|
capture_output=True, text=True, timeout=timeout
|
||
|
|
)
|
||
|
|
output = result.stdout + result.stderr
|
||
|
|
return result.returncode == 0, output.strip()
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
return False, "Command timed out"
|
||
|
|
except Exception as e:
|
||
|
|
return False, str(e)
|
||
|
|
|
||
|
|
def list_mappings(self) -> Tuple[bool, str]:
|
||
|
|
"""List current UPnP port mappings."""
|
||
|
|
return self._run(['-l'])
|
||
|
|
|
||
|
|
def add_mapping(self, internal_ip: str, internal_port: int,
|
||
|
|
external_port: int, protocol: str,
|
||
|
|
description: str = "AUTARCH") -> Tuple[bool, str]:
|
||
|
|
"""Add a UPnP port mapping.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
internal_ip: LAN IP to forward to
|
||
|
|
internal_port: Internal port number
|
||
|
|
external_port: External port number
|
||
|
|
protocol: TCP or UDP
|
||
|
|
description: Mapping description
|
||
|
|
"""
|
||
|
|
protocol = protocol.upper()
|
||
|
|
if protocol not in ('TCP', 'UDP'):
|
||
|
|
return False, "Protocol must be TCP or UDP"
|
||
|
|
return self._run([
|
||
|
|
'-a', internal_ip,
|
||
|
|
str(internal_port), str(external_port),
|
||
|
|
protocol, '0', description
|
||
|
|
])
|
||
|
|
|
||
|
|
def remove_mapping(self, external_port: int, protocol: str) -> Tuple[bool, str]:
|
||
|
|
"""Remove a UPnP port mapping."""
|
||
|
|
protocol = protocol.upper()
|
||
|
|
return self._run(['-d', str(external_port), protocol])
|
||
|
|
|
||
|
|
def get_external_ip(self) -> Tuple[bool, str]:
|
||
|
|
"""Get the external IP via UPnP."""
|
||
|
|
success, output = self._run(['-e'])
|
||
|
|
if success:
|
||
|
|
# Parse "ExternalIPAddress = x.x.x.x" from output
|
||
|
|
for line in output.splitlines():
|
||
|
|
if 'ExternalIPAddress' in line:
|
||
|
|
parts = line.split('=')
|
||
|
|
if len(parts) >= 2:
|
||
|
|
return True, parts[-1].strip()
|
||
|
|
# If no specific line found, return raw output
|
||
|
|
return True, output
|
||
|
|
return False, output
|
||
|
|
|
||
|
|
def refresh_all(self) -> List[Dict]:
|
||
|
|
"""Re-add all configured port mappings. Returns list of results."""
|
||
|
|
mappings = self.load_mappings_from_config()
|
||
|
|
internal_ip = self._get_internal_ip()
|
||
|
|
results = []
|
||
|
|
|
||
|
|
for mapping in mappings:
|
||
|
|
port = mapping['port']
|
||
|
|
proto = mapping['protocol']
|
||
|
|
desc = mapping.get('description', 'AUTARCH')
|
||
|
|
success, output = self.add_mapping(
|
||
|
|
internal_ip, port, port, proto, desc
|
||
|
|
)
|
||
|
|
results.append({
|
||
|
|
'port': port,
|
||
|
|
'protocol': proto,
|
||
|
|
'success': success,
|
||
|
|
'message': output
|
||
|
|
})
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
def _get_internal_ip(self) -> str:
|
||
|
|
"""Get the configured internal IP."""
|
||
|
|
if self.config:
|
||
|
|
return self.config.get('upnp', 'internal_ip', fallback='10.0.0.26')
|
||
|
|
return '10.0.0.26'
|
||
|
|
|
||
|
|
def load_mappings_from_config(self) -> List[Dict]:
|
||
|
|
"""Load port mappings from config file.
|
||
|
|
|
||
|
|
Config format: mappings = 443:TCP,51820:UDP,8080:TCP
|
||
|
|
"""
|
||
|
|
if not self.config:
|
||
|
|
return []
|
||
|
|
|
||
|
|
mappings_str = self.config.get('upnp', 'mappings', fallback='')
|
||
|
|
if not mappings_str:
|
||
|
|
return []
|
||
|
|
|
||
|
|
mappings = []
|
||
|
|
for entry in mappings_str.split(','):
|
||
|
|
entry = entry.strip()
|
||
|
|
if ':' in entry:
|
||
|
|
parts = entry.split(':')
|
||
|
|
try:
|
||
|
|
mappings.append({
|
||
|
|
'port': int(parts[0]),
|
||
|
|
'protocol': parts[1].upper()
|
||
|
|
})
|
||
|
|
except (ValueError, IndexError):
|
||
|
|
continue
|
||
|
|
return mappings
|
||
|
|
|
||
|
|
def save_mappings_to_config(self, mappings: List[Dict]):
|
||
|
|
"""Save port mappings to config file."""
|
||
|
|
if not self.config:
|
||
|
|
return
|
||
|
|
|
||
|
|
mappings_str = ','.join(
|
||
|
|
f"{m['port']}:{m['protocol']}" for m in mappings
|
||
|
|
)
|
||
|
|
self.config.set('upnp', 'mappings', mappings_str)
|
||
|
|
self.config.save()
|
||
|
|
|
||
|
|
# --- Cron Management ---
|
||
|
|
|
||
|
|
def _get_autarch_path(self) -> str:
|
||
|
|
"""Get the path to autarch.py."""
|
||
|
|
from core.paths import get_app_dir
|
||
|
|
return str(get_app_dir() / 'autarch.py')
|
||
|
|
|
||
|
|
def _get_cron_command(self) -> str:
|
||
|
|
"""Get the cron command string for UPnP refresh."""
|
||
|
|
autarch_path = self._get_autarch_path()
|
||
|
|
return f'/usr/bin/python3 {autarch_path} --upnp-refresh > /dev/null 2>&1'
|
||
|
|
|
||
|
|
def get_cron_status(self) -> Dict:
|
||
|
|
"""Check if UPnP cron job is installed.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with 'installed' (bool), 'interval' (str), 'line' (str)
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
result = subprocess.run(
|
||
|
|
['crontab', '-l'],
|
||
|
|
capture_output=True, text=True, timeout=5
|
||
|
|
)
|
||
|
|
if result.returncode != 0:
|
||
|
|
return {'installed': False, 'interval': None, 'line': None}
|
||
|
|
|
||
|
|
for line in result.stdout.splitlines():
|
||
|
|
if 'upnp-refresh' in line and not line.startswith('#'):
|
||
|
|
# Parse interval from cron expression
|
||
|
|
match = re.match(r'^\d+\s+\*/(\d+)', line)
|
||
|
|
interval = match.group(1) if match else '?'
|
||
|
|
return {
|
||
|
|
'installed': True,
|
||
|
|
'interval': f'{interval}h',
|
||
|
|
'line': line.strip()
|
||
|
|
}
|
||
|
|
|
||
|
|
return {'installed': False, 'interval': None, 'line': None}
|
||
|
|
except Exception:
|
||
|
|
return {'installed': False, 'interval': None, 'line': None}
|
||
|
|
|
||
|
|
def install_cron(self, interval_hours: int = 12) -> Tuple[bool, str]:
|
||
|
|
"""Install a crontab entry for periodic UPnP refresh.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
interval_hours: How often to refresh (in hours)
|
||
|
|
"""
|
||
|
|
# First remove any existing entry
|
||
|
|
self.uninstall_cron()
|
||
|
|
|
||
|
|
cron_line = f'0 */{interval_hours} * * * {self._get_cron_command()}'
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get current crontab
|
||
|
|
result = subprocess.run(
|
||
|
|
['crontab', '-l'],
|
||
|
|
capture_output=True, text=True, timeout=5
|
||
|
|
)
|
||
|
|
existing = result.stdout if result.returncode == 0 else ''
|
||
|
|
|
||
|
|
# Append new entry
|
||
|
|
new_crontab = existing.rstrip('\n') + '\n' + cron_line + '\n'
|
||
|
|
|
||
|
|
# Install
|
||
|
|
proc = subprocess.run(
|
||
|
|
['crontab', '-'],
|
||
|
|
input=new_crontab, capture_output=True, text=True, timeout=5
|
||
|
|
)
|
||
|
|
|
||
|
|
if proc.returncode == 0:
|
||
|
|
# Save interval to config
|
||
|
|
if self.config:
|
||
|
|
self.config.set('upnp', 'refresh_hours', str(interval_hours))
|
||
|
|
self.config.save()
|
||
|
|
return True, f"Cron job installed: every {interval_hours} hours"
|
||
|
|
else:
|
||
|
|
return False, proc.stderr
|
||
|
|
except Exception as e:
|
||
|
|
return False, str(e)
|
||
|
|
|
||
|
|
def uninstall_cron(self) -> Tuple[bool, str]:
|
||
|
|
"""Remove the UPnP refresh cron job."""
|
||
|
|
try:
|
||
|
|
result = subprocess.run(
|
||
|
|
['crontab', '-l'],
|
||
|
|
capture_output=True, text=True, timeout=5
|
||
|
|
)
|
||
|
|
if result.returncode != 0:
|
||
|
|
return True, "No crontab exists"
|
||
|
|
|
||
|
|
# Filter out our line
|
||
|
|
lines = result.stdout.splitlines()
|
||
|
|
filtered = [l for l in lines if 'upnp-refresh' not in l]
|
||
|
|
|
||
|
|
if len(lines) == len(filtered):
|
||
|
|
return True, "No UPnP cron job found"
|
||
|
|
|
||
|
|
new_crontab = '\n'.join(filtered) + '\n'
|
||
|
|
|
||
|
|
proc = subprocess.run(
|
||
|
|
['crontab', '-'],
|
||
|
|
input=new_crontab, capture_output=True, text=True, timeout=5
|
||
|
|
)
|
||
|
|
|
||
|
|
if proc.returncode == 0:
|
||
|
|
return True, "Cron job removed"
|
||
|
|
else:
|
||
|
|
return False, proc.stderr
|
||
|
|
except Exception as e:
|
||
|
|
return False, str(e)
|
||
|
|
|
||
|
|
|
||
|
|
# Singleton
|
||
|
|
_upnp_manager = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_upnp_manager(config=None) -> UPnPManager:
|
||
|
|
"""Get the global UPnP manager instance."""
|
||
|
|
global _upnp_manager
|
||
|
|
if _upnp_manager is None:
|
||
|
|
if config is None:
|
||
|
|
from core.config import get_config
|
||
|
|
config = get_config()
|
||
|
|
_upnp_manager = UPnPManager(config)
|
||
|
|
return _upnp_manager
|