Full security platform with web dashboard, 16 Flask blueprints, 26 modules, autonomous AI agent, WebUSB hardware support, and Archon Android companion app. Includes Hash Toolkit, debug console, anti-stalkerware shield, Metasploit/RouterSploit integration, WireGuard VPN, OSINT reconnaissance, and multi-backend LLM support. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
332 lines
12 KiB
Python
332 lines
12 KiB
Python
"""
|
|
AUTARCH UPnP Port Manager Module
|
|
Manage UPnP port forwarding and cron refresh jobs
|
|
|
|
Requires: miniupnpc (upnpc command)
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Module metadata
|
|
DESCRIPTION = "UPnP port forwarding manager"
|
|
AUTHOR = "darkHal"
|
|
VERSION = "1.0"
|
|
CATEGORY = "defense"
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from core.banner import Colors, clear_screen
|
|
from core.config import get_config
|
|
from core.upnp import get_upnp_manager
|
|
|
|
|
|
def print_status(message: str, status: str = "info"):
|
|
colors = {"info": Colors.CYAN, "success": Colors.GREEN, "warning": Colors.YELLOW, "error": Colors.RED}
|
|
symbols = {"info": "*", "success": "+", "warning": "!", "error": "X"}
|
|
print(f"{colors.get(status, Colors.WHITE)}[{symbols.get(status, '*')}] {message}{Colors.RESET}")
|
|
|
|
|
|
def show_menu(upnp):
|
|
"""Display the UPnP manager menu."""
|
|
cron = upnp.get_cron_status()
|
|
cron_str = f"every {cron['interval']}" if cron['installed'] else "not installed"
|
|
internal_ip = upnp._get_internal_ip()
|
|
|
|
print(f"\n{Colors.BOLD}{Colors.BLUE}UPnP Port Manager{Colors.RESET}")
|
|
print(f"{Colors.DIM}{'─' * 40}{Colors.RESET}")
|
|
print(f" Internal IP: {Colors.CYAN}{internal_ip}{Colors.RESET}")
|
|
print(f" Cron: {Colors.GREEN if cron['installed'] else Colors.YELLOW}{cron_str}{Colors.RESET}")
|
|
print(f"{Colors.DIM}{'─' * 40}{Colors.RESET}")
|
|
print(f" {Colors.BLUE}[1]{Colors.RESET} Show Current Mappings")
|
|
print(f" {Colors.BLUE}[2]{Colors.RESET} Add Port Mapping")
|
|
print(f" {Colors.BLUE}[3]{Colors.RESET} Remove Port Mapping")
|
|
print(f" {Colors.BLUE}[4]{Colors.RESET} Refresh All Mappings")
|
|
print(f" {Colors.BLUE}[5]{Colors.RESET} Show External IP")
|
|
print(f" {Colors.BLUE}[6]{Colors.RESET} Cron Job Settings")
|
|
print(f" {Colors.BLUE}[7]{Colors.RESET} Edit Internal IP")
|
|
print(f" {Colors.BLUE}[8]{Colors.RESET} Edit Port Mappings Config")
|
|
print(f" {Colors.RED}[0]{Colors.RESET} Back")
|
|
print()
|
|
|
|
|
|
def show_mappings(upnp):
|
|
"""Show current UPnP port mappings."""
|
|
print(f"\n{Colors.BOLD}Current UPnP Mappings{Colors.RESET}")
|
|
success, output = upnp.list_mappings()
|
|
if success:
|
|
print(output)
|
|
else:
|
|
print_status(output, "error")
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def add_mapping(upnp):
|
|
"""Add a new port mapping."""
|
|
print(f"\n{Colors.BOLD}Add Port Mapping{Colors.RESET}")
|
|
try:
|
|
internal_ip = upnp._get_internal_ip()
|
|
ext_port = input(f" External port: ").strip()
|
|
if not ext_port:
|
|
return
|
|
ext_port = int(ext_port)
|
|
|
|
int_port_str = input(f" Internal port [{ext_port}]: ").strip()
|
|
int_port = int(int_port_str) if int_port_str else ext_port
|
|
|
|
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
|
|
if not proto:
|
|
proto = 'TCP'
|
|
if proto not in ('TCP', 'UDP'):
|
|
print_status("Invalid protocol", "error")
|
|
return
|
|
|
|
desc = input(f" Description [AUTARCH]: ").strip()
|
|
if not desc:
|
|
desc = 'AUTARCH'
|
|
|
|
success, output = upnp.add_mapping(internal_ip, int_port, ext_port, proto, desc)
|
|
if success:
|
|
print_status(f"Mapping added: {ext_port}/{proto} -> {internal_ip}:{int_port}", "success")
|
|
# Offer to save to config
|
|
save = input(f"\n Save to config? (y/n) [y]: ").strip().lower()
|
|
if save != 'n':
|
|
mappings = upnp.load_mappings_from_config()
|
|
# Check if already exists
|
|
exists = any(m['port'] == ext_port and m['protocol'] == proto for m in mappings)
|
|
if not exists:
|
|
mappings.append({'port': ext_port, 'protocol': proto})
|
|
upnp.save_mappings_to_config(mappings)
|
|
print_status("Saved to config", "success")
|
|
else:
|
|
print_status("Already in config", "info")
|
|
else:
|
|
print_status(f"Failed: {output}", "error")
|
|
except ValueError:
|
|
print_status("Invalid port number", "error")
|
|
except KeyboardInterrupt:
|
|
print()
|
|
|
|
|
|
def remove_mapping(upnp):
|
|
"""Remove a port mapping."""
|
|
print(f"\n{Colors.BOLD}Remove Port Mapping{Colors.RESET}")
|
|
try:
|
|
ext_port = input(f" External port: ").strip()
|
|
if not ext_port:
|
|
return
|
|
ext_port = int(ext_port)
|
|
|
|
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
|
|
if not proto:
|
|
proto = 'TCP'
|
|
|
|
success, output = upnp.remove_mapping(ext_port, proto)
|
|
if success:
|
|
print_status(f"Mapping removed: {ext_port}/{proto}", "success")
|
|
# Offer to remove from config
|
|
remove = input(f"\n Remove from config? (y/n) [y]: ").strip().lower()
|
|
if remove != 'n':
|
|
mappings = upnp.load_mappings_from_config()
|
|
mappings = [m for m in mappings if not (m['port'] == ext_port and m['protocol'] == proto)]
|
|
upnp.save_mappings_to_config(mappings)
|
|
print_status("Removed from config", "success")
|
|
else:
|
|
print_status(f"Failed: {output}", "error")
|
|
except ValueError:
|
|
print_status("Invalid port number", "error")
|
|
except KeyboardInterrupt:
|
|
print()
|
|
|
|
|
|
def refresh_all(upnp):
|
|
"""Refresh all configured mappings."""
|
|
mappings = upnp.load_mappings_from_config()
|
|
if not mappings:
|
|
print_status("No mappings configured. Use option [8] to edit.", "warning")
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
return
|
|
|
|
print(f"\n{Colors.BOLD}Refreshing {len(mappings)} mapping(s)...{Colors.RESET}")
|
|
results = upnp.refresh_all()
|
|
for r in results:
|
|
if r['success']:
|
|
print_status(f"{r['port']}/{r['protocol']}: OK", "success")
|
|
else:
|
|
print_status(f"{r['port']}/{r['protocol']}: {r['message']}", "error")
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def show_external_ip(upnp):
|
|
"""Show external IP."""
|
|
success, ip = upnp.get_external_ip()
|
|
if success:
|
|
print(f"\n {Colors.BOLD}External IP:{Colors.RESET} {Colors.GREEN}{ip}{Colors.RESET}")
|
|
else:
|
|
print_status(f"Failed: {ip}", "error")
|
|
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
|
|
|
|
|
|
def cron_settings(upnp):
|
|
"""Manage cron job settings."""
|
|
cron = upnp.get_cron_status()
|
|
|
|
print(f"\n{Colors.BOLD}Cron Job Settings{Colors.RESET}")
|
|
print(f"{Colors.DIM}{'─' * 40}{Colors.RESET}")
|
|
|
|
if cron['installed']:
|
|
print(f" Status: {Colors.GREEN}Installed{Colors.RESET}")
|
|
print(f" Interval: every {cron['interval']}")
|
|
print(f" Entry: {Colors.DIM}{cron['line']}{Colors.RESET}")
|
|
print()
|
|
print(f" {Colors.BLUE}[1]{Colors.RESET} Change interval")
|
|
print(f" {Colors.RED}[2]{Colors.RESET} Uninstall cron job")
|
|
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
|
|
else:
|
|
print(f" Status: {Colors.YELLOW}Not installed{Colors.RESET}")
|
|
print()
|
|
print(f" {Colors.BLUE}[1]{Colors.RESET} Install cron job")
|
|
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
|
|
|
|
print()
|
|
try:
|
|
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip()
|
|
|
|
if choice == '0':
|
|
return
|
|
|
|
if cron['installed']:
|
|
if choice == '1':
|
|
hours = input(f" Refresh interval (hours) [12]: ").strip()
|
|
hours = int(hours) if hours else 12
|
|
if hours < 1 or hours > 24:
|
|
print_status("Interval must be 1-24 hours", "error")
|
|
return
|
|
success, msg = upnp.install_cron(hours)
|
|
print_status(msg, "success" if success else "error")
|
|
elif choice == '2':
|
|
success, msg = upnp.uninstall_cron()
|
|
print_status(msg, "success" if success else "error")
|
|
else:
|
|
if choice == '1':
|
|
hours = input(f" Refresh interval (hours) [12]: ").strip()
|
|
hours = int(hours) if hours else 12
|
|
if hours < 1 or hours > 24:
|
|
print_status("Interval must be 1-24 hours", "error")
|
|
return
|
|
success, msg = upnp.install_cron(hours)
|
|
print_status(msg, "success" if success else "error")
|
|
except (ValueError, KeyboardInterrupt):
|
|
print()
|
|
|
|
|
|
def edit_internal_ip(upnp):
|
|
"""Edit the internal IP address."""
|
|
config = get_config()
|
|
current = upnp._get_internal_ip()
|
|
print(f"\n Current internal IP: {Colors.CYAN}{current}{Colors.RESET}")
|
|
try:
|
|
new_ip = input(f" New internal IP [{current}]: ").strip()
|
|
if new_ip and new_ip != current:
|
|
config.set('upnp', 'internal_ip', new_ip)
|
|
config.save()
|
|
print_status(f"Internal IP set to {new_ip}", "success")
|
|
elif not new_ip:
|
|
print_status("Unchanged", "info")
|
|
except KeyboardInterrupt:
|
|
print()
|
|
|
|
|
|
def edit_mappings_config(upnp):
|
|
"""Edit configured port mappings."""
|
|
mappings = upnp.load_mappings_from_config()
|
|
|
|
print(f"\n{Colors.BOLD}Configured Port Mappings{Colors.RESET}")
|
|
print(f"{Colors.DIM}{'─' * 40}{Colors.RESET}")
|
|
|
|
if mappings:
|
|
for i, m in enumerate(mappings, 1):
|
|
print(f" {Colors.BLUE}[{i}]{Colors.RESET} {m['port']}/{m['protocol']}")
|
|
else:
|
|
print(f" {Colors.DIM}(none configured){Colors.RESET}")
|
|
|
|
print()
|
|
print(f" {Colors.GREEN}[a]{Colors.RESET} Add mapping to config")
|
|
if mappings:
|
|
print(f" {Colors.RED}[d]{Colors.RESET} Delete mapping from config")
|
|
print(f" {Colors.DIM}[0]{Colors.RESET} Back")
|
|
print()
|
|
|
|
try:
|
|
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip().lower()
|
|
|
|
if choice == '0':
|
|
return
|
|
elif choice == 'a':
|
|
port = input(f" Port: ").strip()
|
|
if not port:
|
|
return
|
|
port = int(port)
|
|
proto = input(f" Protocol (TCP/UDP) [TCP]: ").strip().upper()
|
|
if not proto:
|
|
proto = 'TCP'
|
|
if proto not in ('TCP', 'UDP'):
|
|
print_status("Invalid protocol", "error")
|
|
return
|
|
exists = any(m['port'] == port and m['protocol'] == proto for m in mappings)
|
|
if exists:
|
|
print_status("Already in config", "info")
|
|
return
|
|
mappings.append({'port': port, 'protocol': proto})
|
|
upnp.save_mappings_to_config(mappings)
|
|
print_status(f"Added {port}/{proto}", "success")
|
|
elif choice == 'd' and mappings:
|
|
idx = input(f" Number to delete: ").strip()
|
|
idx = int(idx) - 1
|
|
if 0 <= idx < len(mappings):
|
|
removed = mappings.pop(idx)
|
|
upnp.save_mappings_to_config(mappings)
|
|
print_status(f"Removed {removed['port']}/{removed['protocol']}", "success")
|
|
else:
|
|
print_status("Invalid selection", "error")
|
|
except (ValueError, KeyboardInterrupt):
|
|
print()
|
|
|
|
|
|
def run():
|
|
"""Main entry point for the UPnP manager module."""
|
|
config = get_config()
|
|
upnp = get_upnp_manager(config)
|
|
|
|
if not upnp.is_available():
|
|
print_status("upnpc (miniupnpc) is not installed!", "error")
|
|
print(f" {Colors.DIM}Install with: sudo apt install miniupnpc{Colors.RESET}")
|
|
input(f"\n{Colors.DIM}Press Enter to go back...{Colors.RESET}")
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
clear_screen()
|
|
show_menu(upnp)
|
|
choice = input(f" {Colors.BOLD}>{Colors.RESET} ").strip()
|
|
|
|
if choice == '0':
|
|
break
|
|
elif choice == '1':
|
|
show_mappings(upnp)
|
|
elif choice == '2':
|
|
add_mapping(upnp)
|
|
elif choice == '3':
|
|
remove_mapping(upnp)
|
|
elif choice == '4':
|
|
refresh_all(upnp)
|
|
elif choice == '5':
|
|
show_external_ip(upnp)
|
|
elif choice == '6':
|
|
cron_settings(upnp)
|
|
elif choice == '7':
|
|
edit_internal_ip(upnp)
|
|
elif choice == '8':
|
|
edit_mappings_config(upnp)
|
|
except KeyboardInterrupt:
|
|
break
|