""" Command-builder module for managing Cowrie SSH/Telnet honeypot on a Linux VPS. Each function returns a bash command string. Cowrie installs to /opt/cowrie/cowrie-git, runs as user 'cowrie', uses a systemd service. """ COWRIE_DIR = "/opt/cowrie/cowrie-git" COWRIE_USER = "cowrie" COWRIE_SERVICE = "cowrie" COWRIE_LOG = f"{COWRIE_DIR}/var/log/cowrie/cowrie.log" COWRIE_JSON = f"{COWRIE_DIR}/var/log/cowrie/cowrie.json" COWRIE_CFG = f"{COWRIE_DIR}/etc/cowrie.cfg" COWRIE_DOWNLOADS = f"{COWRIE_DIR}/var/lib/cowrie/downloads" LISTEN_PORT = 2222 SYSTEMD_UNIT = f"""\ [Unit] Description=Cowrie SSH/Telnet Honeypot After=network.target [Service] Type=simple User={COWRIE_USER} Group={COWRIE_USER} WorkingDirectory={COWRIE_DIR} ExecStart={COWRIE_DIR}/cowrie-env/bin/python3 {COWRIE_DIR}/src/cowrie/scripts/cowrie -n Restart=on-failure RestartSec=5 [Install] WantedBy=multi-user.target """ def status_cmd() -> str: return ( f"systemctl status {COWRIE_SERVICE} --no-pager; " f"echo '---'; " f"ss -tlnp | grep :{LISTEN_PORT}; " f"echo '---'; " f"systemctl show {COWRIE_SERVICE} --property=ActiveEnterTimestamp --no-pager" ) def install_cmd() -> str: unit_content = SYSTEMD_UNIT.replace("\n", "\\n") return ( # Install dependencies "apt-get update && " "apt-get install -y git python3 python3-venv python3-dev " "libssl-dev libffi-dev build-essential libpython3-dev " "python3-minimal authbind virtualenv && " # Create cowrie user f"id -u {COWRIE_USER} &>/dev/null || " f"useradd -r -m -d /opt/cowrie -s /bin/false {COWRIE_USER} && " # Clone cowrie f"mkdir -p /opt/cowrie && " f"git clone https://github.com/cowrie/cowrie.git {COWRIE_DIR} && " f"chown -R {COWRIE_USER}:{COWRIE_USER} /opt/cowrie && " # Create venv and install f"cd {COWRIE_DIR} && " f"python3 -m venv {COWRIE_DIR}/cowrie-env && " f"{COWRIE_DIR}/cowrie-env/bin/pip install --upgrade pip && " f"{COWRIE_DIR}/cowrie-env/bin/pip install -r {COWRIE_DIR}/requirements.txt && " # Set listen port in config f"cp {COWRIE_CFG}.dist {COWRIE_CFG} && " f"sed -i 's/^#\\?\\s*listen_endpoints\\s*=.*/listen_endpoints = tcp:{LISTEN_PORT}:interface=0.0.0.0/' {COWRIE_CFG} && " f"chown -R {COWRIE_USER}:{COWRIE_USER} /opt/cowrie && " # Create systemd service f"printf '{unit_content}' > /etc/systemd/system/{COWRIE_SERVICE}.service && " f"systemctl daemon-reload && " f"systemctl enable {COWRIE_SERVICE} && " f"systemctl start {COWRIE_SERVICE}" ) def start_cmd() -> str: return f"systemctl start {COWRIE_SERVICE}" def stop_cmd() -> str: return f"systemctl stop {COWRIE_SERVICE}" def restart_cmd() -> str: return f"systemctl restart {COWRIE_SERVICE}" def log_cmd(lines: int = 100) -> str: return f"tail -n {lines} {COWRIE_LOG}" def log_json_cmd(lines: int = 50) -> str: return f"tail -n {lines} {COWRIE_JSON}" def sessions_cmd() -> str: return ( f"cat {COWRIE_JSON} | " "jq -r 'select(.eventid == \"cowrie.session.connect\" or " ".eventid == \"cowrie.command.input\") | " "[.timestamp, .src_ip // empty, .input // \"[connect]\", .session] | @tsv' | " "tail -n 200 | column -t -s $'\\t'" ) def top_attackers_cmd() -> str: return ( f"cat {COWRIE_JSON} | " "jq -r 'select(.eventid == \"cowrie.session.connect\") | .src_ip' | " "sort | uniq -c | sort -rn | head -25" ) def credentials_cmd() -> str: return ( f"cat {COWRIE_JSON} | " "jq -r 'select(.eventid == \"cowrie.login.success\" or " ".eventid == \"cowrie.login.failed\") | " "[.timestamp, .username, .password, .src_ip, .eventid] | @tsv' | " "tail -n 200 | column -t -s $'\\t'" ) def downloads_cmd() -> str: return ( f"echo '=== Downloaded files ===' && " f"ls -lhtr {COWRIE_DOWNLOADS}/ 2>/dev/null || echo 'No downloads directory'; " f"echo '---'; " f"cat {COWRIE_JSON} | " "jq -r 'select(.eventid == \"cowrie.session.file_download\") | " "[.timestamp, .url, .shasum, .src_ip] | @tsv' | " "tail -n 100 | column -t -s $'\\t'" ) def config_cmd() -> str: return f"cat {COWRIE_CFG}" def config_save_cmd(content: str) -> str: escaped = content.replace("'", "'\\''") return ( f"cp {COWRIE_CFG} {COWRIE_CFG}.bak.$(date +%Y%m%d%H%M%S) && " f"cat > {COWRIE_CFG} << 'COWRIE_CFG_EOF'\n{content}\nCOWRIE_CFG_EOF\n" f"chown {COWRIE_USER}:{COWRIE_USER} {COWRIE_CFG} && " f"systemctl restart {COWRIE_SERVICE}" ) def port_redirect_cmd(enable: bool = True) -> str: if enable: return ( "iptables -t nat -A PREROUTING -p tcp --dport 22 " f"-j REDIRECT --to-port {LISTEN_PORT} && " "echo 'Port 22 -> 2222 redirect enabled'" ) else: return ( "iptables -t nat -D PREROUTING -p tcp --dport 22 " f"-j REDIRECT --to-port {LISTEN_PORT} && " "echo 'Port 22 -> 2222 redirect removed'" ) def uninstall_cmd() -> str: return ( f"systemctl stop {COWRIE_SERVICE}; " f"systemctl disable {COWRIE_SERVICE}; " f"rm -f /etc/systemd/system/{COWRIE_SERVICE}.service && " f"systemctl daemon-reload && " f"rm -rf /opt/cowrie && " f"userdel -r {COWRIE_USER} 2>/dev/null; " "echo 'Cowrie uninstalled'" )