Files
autarch/core/discovery.py

405 lines
14 KiB
Python
Raw Permalink Normal View History

2026-03-13 15:17:15 -07:00
"""
AUTARCH Network Discovery
Advertises AUTARCH on the local network so companion apps can find it.
Discovery methods (priority order):
1. mDNS/Zeroconf LAN service advertisement (_autarch._tcp.local.)
2. Bluetooth RFCOMM service advertisement (requires BT adapter + security enabled)
Dependencies:
- mDNS: pip install zeroconf (optional, graceful fallback)
- Bluetooth: system bluetoothctl + hcitool (no pip package needed)
"""
import json
import socket
import subprocess
import threading
import time
import logging
from pathlib import Path
from typing import Dict, Optional, Tuple
from core.daemon import root_exec
2026-03-13 15:17:15 -07:00
logger = logging.getLogger(__name__)
# Service constants
MDNS_SERVICE_TYPE = "_autarch._tcp.local."
MDNS_SERVICE_NAME = "AUTARCH._autarch._tcp.local."
BT_SERVICE_NAME = "AUTARCH"
BT_SERVICE_UUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
def _get_local_ip() -> str:
"""Get the primary local IP address."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
class DiscoveryManager:
"""Manages network discovery advertising for AUTARCH."""
def __init__(self, config=None):
self._config = config or {}
self._web_port = int(self._config.get('web_port', 8181))
self._hostname = socket.gethostname()
# mDNS state
self._zeroconf = None
self._mdns_info = None
self._mdns_running = False
# Bluetooth state
self._bt_running = False
self._bt_thread = None
self._bt_process = None
# Settings
self._mdns_enabled = self._config.get('mdns_enabled', 'true').lower() == 'true'
self._bt_enabled = self._config.get('bluetooth_enabled', 'true').lower() == 'true'
self._bt_require_security = self._config.get('bt_require_security', 'true').lower() == 'true'
# ------------------------------------------------------------------
# Status
# ------------------------------------------------------------------
def get_status(self) -> Dict:
"""Get current discovery status for all methods."""
return {
'local_ip': _get_local_ip(),
'hostname': self._hostname,
'web_port': self._web_port,
'mdns': {
'available': self._is_zeroconf_available(),
'enabled': self._mdns_enabled,
'running': self._mdns_running,
'service_type': MDNS_SERVICE_TYPE,
},
'bluetooth': {
'available': self._is_bt_available(),
'adapter_present': self._bt_adapter_present(),
'enabled': self._bt_enabled,
'running': self._bt_running,
'secure': self._bt_is_secure() if self._bt_adapter_present() else False,
'require_security': self._bt_require_security,
'service_name': BT_SERVICE_NAME,
}
}
# ------------------------------------------------------------------
# mDNS / Zeroconf
# ------------------------------------------------------------------
def _is_zeroconf_available(self) -> bool:
"""Check if the zeroconf Python package is installed."""
try:
import zeroconf # noqa: F401
return True
except ImportError:
return False
def start_mdns(self) -> Tuple[bool, str]:
"""Start mDNS service advertisement."""
if self._mdns_running:
return True, "mDNS already running"
if not self._is_zeroconf_available():
return False, "zeroconf not installed. Run: pip install zeroconf"
try:
from zeroconf import Zeroconf, ServiceInfo
import socket as sock
local_ip = _get_local_ip()
self._mdns_info = ServiceInfo(
MDNS_SERVICE_TYPE,
MDNS_SERVICE_NAME,
addresses=[sock.inet_aton(local_ip)],
port=self._web_port,
properties={
'version': '1.0',
'hostname': self._hostname,
'platform': 'autarch',
},
server=f"{self._hostname}.local.",
)
self._zeroconf = Zeroconf()
self._zeroconf.register_service(self._mdns_info)
self._mdns_running = True
logger.info(f"mDNS: advertising {MDNS_SERVICE_NAME} at {local_ip}:{self._web_port}")
return True, f"mDNS started — {local_ip}:{self._web_port}"
except Exception as e:
logger.error(f"mDNS start failed: {e}")
return False, f"mDNS failed: {e}"
def stop_mdns(self) -> Tuple[bool, str]:
"""Stop mDNS service advertisement."""
if not self._mdns_running:
return True, "mDNS not running"
try:
if self._zeroconf and self._mdns_info:
self._zeroconf.unregister_service(self._mdns_info)
self._zeroconf.close()
self._zeroconf = None
self._mdns_info = None
self._mdns_running = False
logger.info("mDNS: stopped")
return True, "mDNS stopped"
except Exception as e:
self._mdns_running = False
return False, f"mDNS stop error: {e}"
# ------------------------------------------------------------------
# Bluetooth
# ------------------------------------------------------------------
def _is_bt_available(self) -> bool:
"""Check if Bluetooth CLI tools are available."""
try:
result = subprocess.run(
['which', 'bluetoothctl'],
capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except Exception:
return False
def _bt_adapter_present(self) -> bool:
"""Check if a Bluetooth adapter is physically present."""
try:
result = subprocess.run(
['hciconfig'],
capture_output=True, text=True, timeout=5
)
return 'hci0' in result.stdout
except Exception:
return False
def _bt_is_secure(self) -> bool:
"""Check if Bluetooth security (SSP/authentication) is enabled."""
try:
# Check if adapter requires authentication
result = subprocess.run(
['hciconfig', 'hci0', 'auth'],
capture_output=True, text=True, timeout=5
)
# Also check hciconfig output for AUTH flag
status = subprocess.run(
['hciconfig', 'hci0'],
capture_output=True, text=True, timeout=5
)
# Look for AUTH in flags
return 'AUTH' in status.stdout
except Exception:
return False
def _bt_enable_security(self) -> Tuple[bool, str]:
"""Enable Bluetooth authentication/security on the adapter."""
try:
# Enable authentication
root_exec(['hciconfig', 'hci0', 'auth'], timeout=5)
2026-03-13 15:17:15 -07:00
# Enable encryption
root_exec(['hciconfig', 'hci0', 'encrypt'], timeout=5)
2026-03-13 15:17:15 -07:00
# Enable SSP (Secure Simple Pairing)
root_exec(['hciconfig', 'hci0', 'sspmode', '1'], timeout=5)
2026-03-13 15:17:15 -07:00
if self._bt_is_secure():
return True, "Bluetooth security enabled (AUTH + ENCRYPT + SSP)"
return False, "Security flags set but AUTH not confirmed"
except Exception as e:
return False, f"Failed to enable BT security: {e}"
def start_bluetooth(self) -> Tuple[bool, str]:
"""Start Bluetooth service advertisement.
Only advertises if:
1. Bluetooth adapter is present
2. bluetoothctl is available
3. Security is enabled (if bt_require_security is true)
"""
if self._bt_running:
return True, "Bluetooth already advertising"
if not self._is_bt_available():
return False, "bluetoothctl not found"
if not self._bt_adapter_present():
return False, "No Bluetooth adapter detected"
# Ensure adapter is up
try:
root_exec(['hciconfig', 'hci0', 'up'], timeout=5)
2026-03-13 15:17:15 -07:00
except Exception:
pass
# Security check
if self._bt_require_security:
if not self._bt_is_secure():
ok, msg = self._bt_enable_security()
if not ok:
return False, f"Bluetooth security required but not available: {msg}"
# Make discoverable and set name
try:
# Set device name
root_exec(['hciconfig', 'hci0', 'name', BT_SERVICE_NAME], timeout=5)
2026-03-13 15:17:15 -07:00
# Enable discoverable mode
root_exec(['hciconfig', 'hci0', 'piscan'], timeout=5)
2026-03-13 15:17:15 -07:00
# Use bluetoothctl to set discoverable with timeout 0 (always)
# and set the alias
cmds = [
'power on',
f'system-alias {BT_SERVICE_NAME}',
'discoverable on',
'discoverable-timeout 0',
'pairable on',
]
for cmd in cmds:
subprocess.run(
['bluetoothctl', cmd.split()[0]] + cmd.split()[1:],
capture_output=True, text=True, timeout=5
)
# Start an RFCOMM advertisement thread so the app can find us
# and read connection info (IP + port) after pairing
self._bt_running = True
self._bt_thread = threading.Thread(
target=self._bt_rfcomm_server,
daemon=True,
name="autarch-bt-discovery"
)
self._bt_thread.start()
logger.info("Bluetooth: advertising as AUTARCH")
return True, f"Bluetooth advertising — name: {BT_SERVICE_NAME}"
except Exception as e:
self._bt_running = False
return False, f"Bluetooth start failed: {e}"
def _bt_rfcomm_server(self):
"""Background thread: RFCOMM server that sends connection info to paired clients.
When a paired device connects, we send them a JSON blob with our IP and port
so the companion app can auto-configure.
"""
try:
# Use a simple TCP socket on a known port as a Bluetooth-adjacent info service
# (full RFCOMM requires pybluez which may not be installed)
# Instead, we'll use sdptool to register the service and bluetoothctl for visibility
#
# The companion app discovers us via BT name "AUTARCH", then connects via
# the IP it gets from the BT device info or mDNS
while self._bt_running:
time.sleep(5)
except Exception as e:
logger.error(f"BT RFCOMM server error: {e}")
finally:
self._bt_running = False
def stop_bluetooth(self) -> Tuple[bool, str]:
"""Stop Bluetooth advertisement."""
if not self._bt_running:
return True, "Bluetooth not advertising"
self._bt_running = False
try:
# Disable discoverable
subprocess.run(
['bluetoothctl', 'discoverable', 'off'],
capture_output=True, text=True, timeout=5
)
root_exec(['hciconfig', 'hci0', 'noscan'], timeout=5)
2026-03-13 15:17:15 -07:00
if self._bt_thread:
self._bt_thread.join(timeout=3)
self._bt_thread = None
logger.info("Bluetooth: stopped advertising")
return True, "Bluetooth advertising stopped"
except Exception as e:
return False, f"Bluetooth stop error: {e}"
# ------------------------------------------------------------------
# Start / Stop All
# ------------------------------------------------------------------
def start_all(self) -> Dict:
"""Start all enabled discovery methods."""
results = {}
if self._mdns_enabled:
ok, msg = self.start_mdns()
results['mdns'] = {'ok': ok, 'message': msg}
else:
results['mdns'] = {'ok': False, 'message': 'Disabled in config'}
if self._bt_enabled:
ok, msg = self.start_bluetooth()
results['bluetooth'] = {'ok': ok, 'message': msg}
else:
results['bluetooth'] = {'ok': False, 'message': 'Disabled in config'}
return results
def stop_all(self) -> Dict:
"""Stop all discovery methods."""
results = {}
ok, msg = self.stop_mdns()
results['mdns'] = {'ok': ok, 'message': msg}
ok, msg = self.stop_bluetooth()
results['bluetooth'] = {'ok': ok, 'message': msg}
return results
# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
def shutdown(self):
"""Clean shutdown of all discovery services."""
self.stop_all()
# ======================================================================
# Singleton
# ======================================================================
_manager = None
def get_discovery_manager(config=None) -> DiscoveryManager:
"""Get or create the DiscoveryManager singleton."""
global _manager
if _manager is None:
if config is None:
try:
from core.config import get_config
cfg = get_config()
config = {}
if cfg.has_section('discovery'):
config = dict(cfg.items('discovery'))
if cfg.has_section('web'):
config['web_port'] = cfg.get('web', 'port', fallback='8181')
except Exception:
config = {}
_manager = DiscoveryManager(config)
return _manager