Files

176 lines
5.5 KiB
Python
Raw Permalink Normal View History

"""
Command-builder module for managing Cowrie SSH/Telnet honeypot on a Linux VPS.
Each function returns a bash command string. Cowrie installs to /opt/cowrie/cowrie-git,
runs as user 'cowrie', uses a systemd service.
"""
COWRIE_DIR = "/opt/cowrie/cowrie-git"
COWRIE_USER = "cowrie"
COWRIE_SERVICE = "cowrie"
COWRIE_LOG = f"{COWRIE_DIR}/var/log/cowrie/cowrie.log"
COWRIE_JSON = f"{COWRIE_DIR}/var/log/cowrie/cowrie.json"
COWRIE_CFG = f"{COWRIE_DIR}/etc/cowrie.cfg"
COWRIE_DOWNLOADS = f"{COWRIE_DIR}/var/lib/cowrie/downloads"
LISTEN_PORT = 2222
SYSTEMD_UNIT = f"""\
[Unit]
Description=Cowrie SSH/Telnet Honeypot
After=network.target
[Service]
Type=simple
User={COWRIE_USER}
Group={COWRIE_USER}
WorkingDirectory={COWRIE_DIR}
ExecStart={COWRIE_DIR}/cowrie-env/bin/python3 {COWRIE_DIR}/src/cowrie/scripts/cowrie -n
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
"""
def status_cmd() -> str:
return (
f"systemctl status {COWRIE_SERVICE} --no-pager; "
f"echo '---'; "
f"ss -tlnp | grep :{LISTEN_PORT}; "
f"echo '---'; "
f"systemctl show {COWRIE_SERVICE} --property=ActiveEnterTimestamp --no-pager"
)
def install_cmd() -> str:
unit_content = SYSTEMD_UNIT.replace("\n", "\\n")
return (
# Install dependencies
"apt-get update && "
"apt-get install -y git python3 python3-venv python3-dev "
"libssl-dev libffi-dev build-essential libpython3-dev "
"python3-minimal authbind virtualenv && "
# Create cowrie user
f"id -u {COWRIE_USER} &>/dev/null || "
f"useradd -r -m -d /opt/cowrie -s /bin/false {COWRIE_USER} && "
# Clone cowrie
f"mkdir -p /opt/cowrie && "
f"git clone https://github.com/cowrie/cowrie.git {COWRIE_DIR} && "
f"chown -R {COWRIE_USER}:{COWRIE_USER} /opt/cowrie && "
# Create venv and install
f"cd {COWRIE_DIR} && "
f"python3 -m venv {COWRIE_DIR}/cowrie-env && "
f"{COWRIE_DIR}/cowrie-env/bin/pip install --upgrade pip && "
f"{COWRIE_DIR}/cowrie-env/bin/pip install -r {COWRIE_DIR}/requirements.txt && "
# Set listen port in config
f"cp {COWRIE_CFG}.dist {COWRIE_CFG} && "
f"sed -i 's/^#\\?\\s*listen_endpoints\\s*=.*/listen_endpoints = tcp:{LISTEN_PORT}:interface=0.0.0.0/' {COWRIE_CFG} && "
f"chown -R {COWRIE_USER}:{COWRIE_USER} /opt/cowrie && "
# Create systemd service
f"printf '{unit_content}' > /etc/systemd/system/{COWRIE_SERVICE}.service && "
f"systemctl daemon-reload && "
f"systemctl enable {COWRIE_SERVICE} && "
f"systemctl start {COWRIE_SERVICE}"
)
def start_cmd() -> str:
return f"systemctl start {COWRIE_SERVICE}"
def stop_cmd() -> str:
return f"systemctl stop {COWRIE_SERVICE}"
def restart_cmd() -> str:
return f"systemctl restart {COWRIE_SERVICE}"
def log_cmd(lines: int = 100) -> str:
return f"tail -n {lines} {COWRIE_LOG}"
def log_json_cmd(lines: int = 50) -> str:
return f"tail -n {lines} {COWRIE_JSON}"
def sessions_cmd() -> str:
return (
f"cat {COWRIE_JSON} | "
"jq -r 'select(.eventid == \"cowrie.session.connect\" or "
".eventid == \"cowrie.command.input\") | "
"[.timestamp, .src_ip // empty, .input // \"[connect]\", .session] | @tsv' | "
"tail -n 200 | column -t -s $'\\t'"
)
def top_attackers_cmd() -> str:
return (
f"cat {COWRIE_JSON} | "
"jq -r 'select(.eventid == \"cowrie.session.connect\") | .src_ip' | "
"sort | uniq -c | sort -rn | head -25"
)
def credentials_cmd() -> str:
return (
f"cat {COWRIE_JSON} | "
"jq -r 'select(.eventid == \"cowrie.login.success\" or "
".eventid == \"cowrie.login.failed\") | "
"[.timestamp, .username, .password, .src_ip, .eventid] | @tsv' | "
"tail -n 200 | column -t -s $'\\t'"
)
def downloads_cmd() -> str:
return (
f"echo '=== Downloaded files ===' && "
f"ls -lhtr {COWRIE_DOWNLOADS}/ 2>/dev/null || echo 'No downloads directory'; "
f"echo '---'; "
f"cat {COWRIE_JSON} | "
"jq -r 'select(.eventid == \"cowrie.session.file_download\") | "
"[.timestamp, .url, .shasum, .src_ip] | @tsv' | "
"tail -n 100 | column -t -s $'\\t'"
)
def config_cmd() -> str:
return f"cat {COWRIE_CFG}"
def config_save_cmd(content: str) -> str:
escaped = content.replace("'", "'\\''")
return (
f"cp {COWRIE_CFG} {COWRIE_CFG}.bak.$(date +%Y%m%d%H%M%S) && "
f"cat > {COWRIE_CFG} << 'COWRIE_CFG_EOF'\n{content}\nCOWRIE_CFG_EOF\n"
f"chown {COWRIE_USER}:{COWRIE_USER} {COWRIE_CFG} && "
f"systemctl restart {COWRIE_SERVICE}"
)
def port_redirect_cmd(enable: bool = True) -> str:
if enable:
return (
"iptables -t nat -A PREROUTING -p tcp --dport 22 "
f"-j REDIRECT --to-port {LISTEN_PORT} && "
"echo 'Port 22 -> 2222 redirect enabled'"
)
else:
return (
"iptables -t nat -D PREROUTING -p tcp --dport 22 "
f"-j REDIRECT --to-port {LISTEN_PORT} && "
"echo 'Port 22 -> 2222 redirect removed'"
)
def uninstall_cmd() -> str:
return (
f"systemctl stop {COWRIE_SERVICE}; "
f"systemctl disable {COWRIE_SERVICE}; "
f"rm -f /etc/systemd/system/{COWRIE_SERVICE}.service && "
f"systemctl daemon-reload && "
f"rm -rf /opt/cowrie && "
f"userdel -r {COWRIE_USER} 2>/dev/null; "
"echo 'Cowrie uninstalled'"
)