Autarch/modules/upnp_manager.py

332 lines
12 KiB
Python
Raw Permalink Normal View History

"""
AUTARCH UPnP Port Manager Module
Manage UPnP port forwarding and cron refresh jobs
Requires: miniupnpc (upnpc command)
"""
import sys
from pathlib import Path
# Module metadata
DESCRIPTION = "UPnP port forwarding manager"
AUTHOR = "darkHal"
VERSION = "1.0"
CATEGORY = "defense"
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.banner import Colors, clear_screen
from core.config import get_config
from core.upnp import get_upnp_manager
def print_status(message: str, status: str = "info"):
colors = {"info": Colors.CYAN, "success": Colors.GREEN, "warning": Colors.YELLOW, "error": Colors.RED}
symbols = {"info": "*", "success": "+", "warning": "!", "error": "X"}
print(f"{colors.get(status, Colors.WHITE)}[{symbols.get(status, '*')}] {message}{Colors.RESET}")
def show_menu(upnp):
"""Display the UPnP manager menu."""
cron = upnp.get_cron_status()
cron_str = f"every {cron['interval']}" if cron['installed'] else "not installed"
internal_ip = upnp._get_internal_ip()
print(f"\n{Colors.BOLD}{Colors.BLUE}UPnP Port Manager{Colors.RESET}")
print(f"{Colors.DIM}{'' * 40}{Colors.RESET}")
print(f" Internal IP: {Colors.CYAN}{internal_ip}{Colors.RESET}")
print(f" Cron: {Colors.GREEN if cron['installed'] else Colors.YELLOW}{cron_str}{Colors.RESET}")
print(f"{Colors.DIM}{'' * 40}{Colors.RESET}")
print(f" {Colors.BLUE}[1]{Colors.RESET} Show Current Mappings")
print(f" {Colors.BLUE}[2]{Colors.RESET} Add Port Mapping")
print(f" {Colors.BLUE}[3]{Colors.RESET} Remove Port Mapping")
print(f" {Colors.BLUE}[4]{Colors.RESET} Refresh All Mappings")
print(f" {Colors.BLUE}[5]{Colors.RESET} Show External IP")
print(f" {Colors.BLUE}[6]{Colors.RESET} Cron Job Settings")
print(f" {Colors.BLUE}[7]{Colors.RESET} Edit Internal IP")
print(f" {Colors.BLUE}[8]{Colors.RESET} Edit Port Mappings Config")
print(f" {Colors.RED}[0]{Colors.RESET} Back")
print()
def show_mappings(upnp):
"""Show current UPnP port mappings."""
print(f"\n{Colors.BOLD}Current UPnP Mappings{Colors.RESET}")
success, output = upnp.list_mappings()
if success:
print(output)
else:
print_status(output, "error")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def add_mapping(upnp):
"""Add a new port mapping."""
print(f"\n{Colors.BOLD}Add Port Mapping{Colors.RESET}")
try:
internal_ip = upnp._get_internal_ip()
ext_port = input(f" External port: ").strip()
if not ext_port:
return
ext_port = int(ext_port)
int_port_str = input(f" Internal port [{ext_port}]: ").strip()
int_port = int(int_port_str) if int_port_str else ext_port
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
if not proto:
proto = 'TCP'
if proto not in ('TCP', 'UDP'):
print_status("Invalid protocol", "error")
return
desc = input(f" Description [AUTARCH]: ").strip()
if not desc:
desc = 'AUTARCH'
success, output = upnp.add_mapping(internal_ip, int_port, ext_port, proto, desc)
if success:
print_status(f"Mapping added: {ext_port}/{proto} -> {internal_ip}:{int_port}", "success")
# Offer to save to config
save = input(f"\n Save to config? (y/n) [y]: ").strip().lower()
if save != 'n':
mappings = upnp.load_mappings_from_config()
# Check if already exists
exists = any(m['port'] == ext_port and m['protocol'] == proto for m in mappings)
if not exists:
mappings.append({'port': ext_port, 'protocol': proto})
upnp.save_mappings_to_config(mappings)
print_status("Saved to config", "success")
else:
print_status("Already in config", "info")
else:
print_status(f"Failed: {output}", "error")
except ValueError:
print_status("Invalid port number", "error")
except KeyboardInterrupt:
print()
def remove_mapping(upnp):
"""Remove a port mapping."""
print(f"\n{Colors.BOLD}Remove Port Mapping{Colors.RESET}")
try:
ext_port = input(f" External port: ").strip()
if not ext_port:
return
ext_port = int(ext_port)
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
if not proto:
proto = 'TCP'
success, output = upnp.remove_mapping(ext_port, proto)
if success:
print_status(f"Mapping removed: {ext_port}/{proto}", "success")
# Offer to remove from config
remove = input(f"\n Remove from config? (y/n) [y]: ").strip().lower()
if remove != 'n':
mappings = upnp.load_mappings_from_config()
mappings = [m for m in mappings if not (m['port'] == ext_port and m['protocol'] == proto)]
upnp.save_mappings_to_config(mappings)
print_status("Removed from config", "success")
else:
print_status(f"Failed: {output}", "error")
except ValueError:
print_status("Invalid port number", "error")
except KeyboardInterrupt:
print()
def refresh_all(upnp):
"""Refresh all configured mappings."""
mappings = upnp.load_mappings_from_config()
if not mappings:
print_status("No mappings configured. Use option [8] to edit.", "warning")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
return
print(f"\n{Colors.BOLD}Refreshing {len(mappings)} mapping(s)...{Colors.RESET}")
results = upnp.refresh_all()
for r in results:
if r['success']:
print_status(f"{r['port']}/{r['protocol']}: OK", "success")
else:
print_status(f"{r['port']}/{r['protocol']}: {r['message']}", "error")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def show_external_ip(upnp):
"""Show external IP."""
success, ip = upnp.get_external_ip()
if success:
print(f"\n {Colors.BOLD}External IP:{Colors.RESET} {Colors.GREEN}{ip}{Colors.RESET}")
else:
print_status(f"Failed: {ip}", "error")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
def cron_settings(upnp):
"""Manage cron job settings."""
cron = upnp.get_cron_status()
print(f"\n{Colors.BOLD}Cron Job Settings{Colors.RESET}")
print(f"{Colors.DIM}{'' * 40}{Colors.RESET}")
if cron['installed']:
print(f" Status: {Colors.GREEN}Installed{Colors.RESET}")
print(f" Interval: every {cron['interval']}")
print(f" Entry: {Colors.DIM}{cron['line']}{Colors.RESET}")
print()
print(f" {Colors.BLUE}[1]{Colors.RESET} Change interval")
print(f" {Colors.RED}[2]{Colors.RESET} Uninstall cron job")
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
else:
print(f" Status: {Colors.YELLOW}Not installed{Colors.RESET}")
print()
print(f" {Colors.BLUE}[1]{Colors.RESET} Install cron job")
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
print()
try:
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip()
if choice == '0':
return
if cron['installed']:
if choice == '1':
hours = input(f" Refresh interval (hours) [12]: ").strip()
hours = int(hours) if hours else 12
if hours < 1 or hours > 24:
print_status("Interval must be 1-24 hours", "error")
return
success, msg = upnp.install_cron(hours)
print_status(msg, "success" if success else "error")
elif choice == '2':
success, msg = upnp.uninstall_cron()
print_status(msg, "success" if success else "error")
else:
if choice == '1':
hours = input(f" Refresh interval (hours) [12]: ").strip()
hours = int(hours) if hours else 12
if hours < 1 or hours > 24:
print_status("Interval must be 1-24 hours", "error")
return
success, msg = upnp.install_cron(hours)
print_status(msg, "success" if success else "error")
except (ValueError, KeyboardInterrupt):
print()
def edit_internal_ip(upnp):
"""Edit the internal IP address."""
config = get_config()
current = upnp._get_internal_ip()
print(f"\n Current internal IP: {Colors.CYAN}{current}{Colors.RESET}")
try:
new_ip = input(f" New internal IP [{current}]: ").strip()
if new_ip and new_ip != current:
config.set('upnp', 'internal_ip', new_ip)
config.save()
print_status(f"Internal IP set to {new_ip}", "success")
elif not new_ip:
print_status("Unchanged", "info")
except KeyboardInterrupt:
print()
def edit_mappings_config(upnp):
"""Edit configured port mappings."""
mappings = upnp.load_mappings_from_config()
print(f"\n{Colors.BOLD}Configured Port Mappings{Colors.RESET}")
print(f"{Colors.DIM}{'' * 40}{Colors.RESET}")
if mappings:
for i, m in enumerate(mappings, 1):
print(f" {Colors.BLUE}[{i}]{Colors.RESET} {m['port']}/{m['protocol']}")
else:
print(f" {Colors.DIM}(none configured){Colors.RESET}")
print()
print(f" {Colors.GREEN}[a]{Colors.RESET} Add mapping to config")
if mappings:
print(f" {Colors.RED}[d]{Colors.RESET} Delete mapping from config")
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
print()
try:
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip().lower()
if choice == '0':
return
elif choice == 'a':
port = input(f" Port: ").strip()
if not port:
return
port = int(port)
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
if not proto:
proto = 'TCP'
if proto not in ('TCP', 'UDP'):
print_status("Invalid protocol", "error")
return
exists = any(m['port'] == port and m['protocol'] == proto for m in mappings)
if exists:
print_status("Already in config", "info")
return
mappings.append({'port': port, 'protocol': proto})
upnp.save_mappings_to_config(mappings)
print_status(f"Added {port}/{proto}", "success")
elif choice == 'd' and mappings:
idx = input(f" Number to delete: ").strip()
idx = int(idx) - 1
if 0 <= idx < len(mappings):
removed = mappings.pop(idx)
upnp.save_mappings_to_config(mappings)
print_status(f"Removed {removed['port']}/{removed['protocol']}", "success")
else:
print_status("Invalid selection", "error")
except (ValueError, KeyboardInterrupt):
print()
def run():
"""Main entry point for the UPnP manager module."""
config = get_config()
upnp = get_upnp_manager(config)
if not upnp.is_available():
print_status("upnpc (miniupnpc) is not installed!", "error")
print(f" {Colors.DIM}Install with: sudo apt install miniupnpc{Colors.RESET}")
input(f"\n{Colors.DIM}Press Enter to go back...{Colors.RESET}")
return
while True:
try:
clear_screen()
show_menu(upnp)
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip()
if choice == '0':
break
elif choice == '1':
show_mappings(upnp)
elif choice == '2':
add_mapping(upnp)
elif choice == '3':
remove_mapping(upnp)
elif choice == '4':
refresh_all(upnp)
elif choice == '5':
show_external_ip(upnp)
elif choice == '6':
cron_settings(upnp)
elif choice == '7':
edit_internal_ip(upnp)
elif choice == '8':
edit_mappings_config(upnp)
except KeyboardInterrupt:
break