Files

210 lines
7.4 KiB
Python
Raw Permalink Normal View History

# iptables firewall management commands
# Each function returns a bash command string that app.py executes via ssh_run()
import re
BUILTIN_CHAINS = {"INPUT", "OUTPUT", "FORWARD", "PREROUTING", "POSTROUTING"}
POLICY_TARGETS = {"ACCEPT", "DROP", "REJECT"}
def _validate_chain(chain):
"""Validate chain name: built-in or alphanumeric custom chain."""
chain = chain.strip()
if chain.upper() in BUILTIN_CHAINS:
return chain.upper()
if re.match(r'^[A-Za-z0-9_-]+$', chain) and len(chain) <= 30:
return chain
raise ValueError(f"Invalid chain name: {chain}")
def _validate_ip(ip):
"""Validate an IPv4 or IPv6 address (no CIDR for block/unblock)."""
ip = ip.strip()
# IPv4
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
parts = ip.split('.')
if all(0 <= int(p) <= 255 for p in parts):
return ip
# IPv4 CIDR
if re.match(r'^(\d{1,3}\.){3}\d{1,3}/\d{1,2}$', ip):
addr, prefix = ip.rsplit('/', 1)
parts = addr.split('.')
if all(0 <= int(p) <= 255 for p in parts) and 0 <= int(prefix) <= 32:
return ip
# IPv6 (basic check)
if re.match(r'^[0-9a-fA-F:]+$', ip) or re.match(r'^[0-9a-fA-F:]+/\d{1,3}$', ip):
return ip
raise ValueError(f"Invalid IP address: {ip}")
def status_cmd():
"""Return bash cmd to check if iptables is installed and show version."""
return (
"echo '=== iptables Installation ===' && "
"which iptables >/dev/null 2>&1 && iptables -V 2>&1 || echo 'iptables not installed' && "
"echo '' && echo '=== ip6tables ===' && "
"which ip6tables >/dev/null 2>&1 && ip6tables -V 2>&1 || echo 'ip6tables not installed' && "
"echo '' && echo '=== iptables-persistent ===' && "
"dpkg -l | grep iptables-persistent | awk '{print $2, $3}' 2>/dev/null || echo 'iptables-persistent not installed'"
)
def list_cmd():
"""Return bash cmd to list all iptables rules with line numbers."""
return (
"echo '=== iptables -L (filter) ===' && "
"iptables -L -v -n --line-numbers 2>&1"
)
def list_nat_cmd():
"""Return bash cmd to list NAT table rules."""
return (
"echo '=== iptables -t nat ===' && "
"iptables -t nat -L -v -n --line-numbers 2>&1"
)
def list_mangle_cmd():
"""Return bash cmd to list mangle table rules."""
return (
"echo '=== iptables -t mangle ===' && "
"iptables -t mangle -L -v -n 2>&1"
)
def add_rule_cmd(chain, rule):
"""Return bash cmd to append a rule to a chain."""
chain = _validate_chain(chain)
return f"iptables -A {chain} {rule} 2>&1 && echo 'Rule appended to {chain}'"
def insert_rule_cmd(chain, position, rule):
"""Return bash cmd to insert a rule at a position in a chain."""
chain = _validate_chain(chain)
position = int(position)
if position < 1:
raise ValueError(f"Invalid position: {position}")
return f"iptables -I {chain} {position} {rule} 2>&1 && echo 'Rule inserted in {chain} at position {position}'"
def delete_rule_cmd(chain, rule_num):
"""Return bash cmd to delete a rule by number from a chain."""
chain = _validate_chain(chain)
rule_num = int(rule_num)
if rule_num < 1:
raise ValueError(f"Invalid rule number: {rule_num}")
return f"iptables -D {chain} {rule_num} 2>&1 && echo 'Rule {rule_num} deleted from {chain}'"
def flush_cmd(chain=None):
"""Return bash cmd to flush all rules or a specific chain."""
if chain is not None:
chain = _validate_chain(chain)
return f"iptables -F {chain} 2>&1 && echo 'Flushed chain {chain}'"
return "iptables -F 2>&1 && echo 'All chains flushed'"
def save_cmd():
"""Return bash cmd to save current rules to /etc/iptables/rules.v4."""
return (
"if ! dpkg -l | grep -q iptables-persistent; then "
" DEBIAN_FRONTEND=noninteractive apt-get update -qq && "
" DEBIAN_FRONTEND=noninteractive apt-get install -y iptables-persistent 2>&1; "
"fi && "
"mkdir -p /etc/iptables && "
"iptables-save > /etc/iptables/rules.v4 2>&1 && "
"echo 'Rules saved to /etc/iptables/rules.v4'"
)
def restore_cmd():
"""Return bash cmd to restore rules from /etc/iptables/rules.v4."""
return (
"if [ -f /etc/iptables/rules.v4 ]; then "
" iptables-restore < /etc/iptables/rules.v4 2>&1 && "
" echo 'Rules restored from /etc/iptables/rules.v4'; "
"else "
" echo 'No saved rules found at /etc/iptables/rules.v4'; "
"fi"
)
def policy_cmd(chain, target):
"""Return bash cmd to set default policy for a chain."""
chain = _validate_chain(chain)
target = target.strip().upper()
if target not in POLICY_TARGETS:
raise ValueError(f"Invalid policy target: {target} (must be ACCEPT, DROP, or REJECT)")
if chain not in {"INPUT", "OUTPUT", "FORWARD"}:
raise ValueError(f"Policy can only be set on INPUT, OUTPUT, or FORWARD (got {chain})")
return f"iptables -P {chain} {target} 2>&1 && echo 'Policy for {chain} set to {target}'"
def block_ip_cmd(ip):
"""Return bash cmd to block an IP on INPUT and FORWARD chains."""
ip = _validate_ip(ip)
return (
f"iptables -A INPUT -s {ip} -j DROP 2>&1 && "
f"iptables -A FORWARD -s {ip} -j DROP 2>&1 && "
f"echo 'Blocked {ip} on INPUT and FORWARD'"
)
def unblock_ip_cmd(ip):
"""Return bash cmd to remove all block rules for an IP."""
ip = _validate_ip(ip)
return (
f"while iptables -D INPUT -s {ip} -j DROP 2>/dev/null; do :; done && "
f"while iptables -D FORWARD -s {ip} -j DROP 2>/dev/null; do :; done && "
f"while iptables -D INPUT -s {ip} -j REJECT 2>/dev/null; do :; done && "
f"while iptables -D FORWARD -s {ip} -j REJECT 2>/dev/null; do :; done && "
f"echo 'Unblocked {ip} from INPUT and FORWARD'"
)
def list_blocked_cmd():
"""Return bash cmd to show all DROP and REJECT rules."""
return (
"echo '=== Blocked (DROP/REJECT) Rules ===' && "
"iptables -L -v -n --line-numbers 2>&1 | grep -E 'DROP|REJECT' || echo 'No blocked rules found'"
)
def log_cmd(lines=50):
"""Return bash cmd to grep iptables entries from kernel/syslog."""
lines = int(lines)
return (
"echo '=== iptables Log Entries ===' && "
f"if [ -f /var/log/kern.log ]; then "
f" grep -i 'iptables\\|netfilter\\|\\[UFW' /var/log/kern.log 2>/dev/null | tail -{lines}; "
f"elif [ -f /var/log/syslog ]; then "
f" grep -i 'iptables\\|netfilter\\|\\[UFW' /var/log/syslog 2>/dev/null | tail -{lines}; "
f"elif [ -f /var/log/messages ]; then "
f" grep -i 'iptables\\|netfilter' /var/log/messages 2>/dev/null | tail -{lines}; "
f"else "
f" dmesg | grep -i 'iptables\\|netfilter' 2>/dev/null | tail -{lines} || echo 'No iptables log entries found'; "
f"fi"
)
def ip6_list_cmd():
"""Return bash cmd to list all ip6tables rules with line numbers."""
return (
"echo '=== ip6tables -L (filter) ===' && "
"ip6tables -L -v -n --line-numbers 2>&1"
)
def counters_cmd():
"""Return bash cmd to show packet and byte counters per rule."""
return (
"echo '=== Packet/Byte Counters ===' && "
"iptables -L -v -n -x 2>&1"
)
def zero_counters_cmd():
"""Return bash cmd to zero all packet and byte counters."""
return "iptables -Z 2>&1 && echo 'All counters zeroed'"