Autarch/core/upnp.py

275 lines
9.1 KiB
Python
Raw Normal View History

"""
AUTARCH UPnP Manager
Manages UPnP port forwarding via miniupnpc (upnpc CLI)
"""
import subprocess
import re
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from core.paths import find_tool
class UPnPManager:
"""UPnP port forwarding manager wrapping the upnpc CLI."""
def __init__(self, config=None):
self.config = config
self._upnpc = find_tool('upnpc')
def is_available(self) -> bool:
"""Check if upnpc is installed."""
return self._upnpc is not None
def _run(self, args: list, timeout: int = 15) -> Tuple[bool, str]:
"""Run upnpc with arguments and return (success, output)."""
if not self._upnpc:
return False, "upnpc not found. Install miniupnpc."
try:
result = subprocess.run(
[self._upnpc] + args,
capture_output=True, text=True, timeout=timeout
)
output = result.stdout + result.stderr
return result.returncode == 0, output.strip()
except subprocess.TimeoutExpired:
return False, "Command timed out"
except Exception as e:
return False, str(e)
def list_mappings(self) -> Tuple[bool, str]:
"""List current UPnP port mappings."""
return self._run(['-l'])
def add_mapping(self, internal_ip: str, internal_port: int,
external_port: int, protocol: str,
description: str = "AUTARCH") -> Tuple[bool, str]:
"""Add a UPnP port mapping.
Args:
internal_ip: LAN IP to forward to
internal_port: Internal port number
external_port: External port number
protocol: TCP or UDP
description: Mapping description
"""
protocol = protocol.upper()
if protocol not in ('TCP', 'UDP'):
return False, "Protocol must be TCP or UDP"
return self._run([
'-a', internal_ip,
str(internal_port), str(external_port),
protocol, '0', description
])
def remove_mapping(self, external_port: int, protocol: str) -> Tuple[bool, str]:
"""Remove a UPnP port mapping."""
protocol = protocol.upper()
return self._run(['-d', str(external_port), protocol])
def get_external_ip(self) -> Tuple[bool, str]:
"""Get the external IP via UPnP."""
success, output = self._run(['-e'])
if success:
# Parse "ExternalIPAddress = x.x.x.x" from output
for line in output.splitlines():
if 'ExternalIPAddress' in line:
parts = line.split('=')
if len(parts) >= 2:
return True, parts[-1].strip()
# If no specific line found, return raw output
return True, output
return False, output
def refresh_all(self) -> List[Dict]:
"""Re-add all configured port mappings. Returns list of results."""
mappings = self.load_mappings_from_config()
internal_ip = self._get_internal_ip()
results = []
for mapping in mappings:
port = mapping['port']
proto = mapping['protocol']
desc = mapping.get('description', 'AUTARCH')
success, output = self.add_mapping(
internal_ip, port, port, proto, desc
)
results.append({
'port': port,
'protocol': proto,
'success': success,
'message': output
})
return results
def _get_internal_ip(self) -> str:
"""Get the configured internal IP."""
if self.config:
return self.config.get('upnp', 'internal_ip', fallback='10.0.0.26')
return '10.0.0.26'
def load_mappings_from_config(self) -> List[Dict]:
"""Load port mappings from config file.
Config format: mappings = 443:TCP,51820:UDP,8080:TCP
"""
if not self.config:
return []
mappings_str = self.config.get('upnp', 'mappings', fallback='')
if not mappings_str:
return []
mappings = []
for entry in mappings_str.split(','):
entry = entry.strip()
if ':' in entry:
parts = entry.split(':')
try:
mappings.append({
'port': int(parts[0]),
'protocol': parts[1].upper()
})
except (ValueError, IndexError):
continue
return mappings
def save_mappings_to_config(self, mappings: List[Dict]):
"""Save port mappings to config file."""
if not self.config:
return
mappings_str = ','.join(
f"{m['port']}:{m['protocol']}" for m in mappings
)
self.config.set('upnp', 'mappings', mappings_str)
self.config.save()
# --- Cron Management ---
def _get_autarch_path(self) -> str:
"""Get the path to autarch.py."""
from core.paths import get_app_dir
return str(get_app_dir() / 'autarch.py')
def _get_cron_command(self) -> str:
"""Get the cron command string for UPnP refresh."""
autarch_path = self._get_autarch_path()
return f'/usr/bin/python3 {autarch_path} --upnp-refresh > /dev/null 2>&1'
def get_cron_status(self) -> Dict:
"""Check if UPnP cron job is installed.
Returns:
Dict with 'installed' (bool), 'interval' (str), 'line' (str)
"""
try:
result = subprocess.run(
['crontab', '-l'],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return {'installed': False, 'interval': None, 'line': None}
for line in result.stdout.splitlines():
if 'upnp-refresh' in line and not line.startswith('#'):
# Parse interval from cron expression
match = re.match(r'^\d+\s+\*/(\d+)', line)
interval = match.group(1) if match else '?'
return {
'installed': True,
'interval': f'{interval}h',
'line': line.strip()
}
return {'installed': False, 'interval': None, 'line': None}
except Exception:
return {'installed': False, 'interval': None, 'line': None}
def install_cron(self, interval_hours: int = 12) -> Tuple[bool, str]:
"""Install a crontab entry for periodic UPnP refresh.
Args:
interval_hours: How often to refresh (in hours)
"""
# First remove any existing entry
self.uninstall_cron()
cron_line = f'0 */{interval_hours} * * * {self._get_cron_command()}'
try:
# Get current crontab
result = subprocess.run(
['crontab', '-l'],
capture_output=True, text=True, timeout=5
)
existing = result.stdout if result.returncode == 0 else ''
# Append new entry
new_crontab = existing.rstrip('\n') + '\n' + cron_line + '\n'
# Install
proc = subprocess.run(
['crontab', '-'],
input=new_crontab, capture_output=True, text=True, timeout=5
)
if proc.returncode == 0:
# Save interval to config
if self.config:
self.config.set('upnp', 'refresh_hours', str(interval_hours))
self.config.save()
return True, f"Cron job installed: every {interval_hours} hours"
else:
return False, proc.stderr
except Exception as e:
return False, str(e)
def uninstall_cron(self) -> Tuple[bool, str]:
"""Remove the UPnP refresh cron job."""
try:
result = subprocess.run(
['crontab', '-l'],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return True, "No crontab exists"
# Filter out our line
lines = result.stdout.splitlines()
filtered = [l for l in lines if 'upnp-refresh' not in l]
if len(lines) == len(filtered):
return True, "No UPnP cron job found"
new_crontab = '\n'.join(filtered) + '\n'
proc = subprocess.run(
['crontab', '-'],
input=new_crontab, capture_output=True, text=True, timeout=5
)
if proc.returncode == 0:
return True, "Cron job removed"
else:
return False, proc.stderr
except Exception as e:
return False, str(e)
# Singleton
_upnp_manager = None
def get_upnp_manager(config=None) -> UPnPManager:
"""Get the global UPnP manager instance."""
global _upnp_manager
if _upnp_manager is None:
if config is None:
from core.config import get_config
config = get_config()
_upnp_manager = UPnPManager(config)
return _upnp_manager