367 lines
9.7 KiB
Go
367 lines
9.7 KiB
Go
package float
|
|
|
|
import (
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"setec-manager/internal/db"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// Bridge manages WebSocket connections for USB passthrough in Float Mode.
|
|
type Bridge struct {
|
|
db *db.DB
|
|
sessions map[string]*bridgeConn
|
|
mu sync.RWMutex
|
|
upgrader websocket.Upgrader
|
|
}
|
|
|
|
// bridgeConn tracks a single active WebSocket connection and its associated session.
|
|
type bridgeConn struct {
|
|
sessionID string
|
|
conn *websocket.Conn
|
|
devices []USBDevice
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
}
|
|
|
|
const (
|
|
writeWait = 10 * time.Second
|
|
pongWait = 60 * time.Second
|
|
pingInterval = 30 * time.Second
|
|
maxMessageSize = 64 * 1024 // 64 KB max frame payload
|
|
)
|
|
|
|
// NewBridge creates a new Bridge with the given database reference.
|
|
func NewBridge(database *db.DB) *Bridge {
|
|
return &Bridge{
|
|
db: database,
|
|
sessions: make(map[string]*bridgeConn),
|
|
upgrader: websocket.Upgrader{
|
|
ReadBufferSize: 4096,
|
|
WriteBufferSize: 4096,
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true // Accept all origins; auth is handled via session token
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// HandleWebSocket upgrades an HTTP connection to WebSocket and manages the
|
|
// binary frame protocol for USB passthrough. The session ID must be provided
|
|
// as a "session" query parameter.
|
|
func (b *Bridge) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|
sessionID := r.URL.Query().Get("session")
|
|
if sessionID == "" {
|
|
http.Error(w, "missing session parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Validate session exists and is not expired
|
|
sess, err := b.db.GetFloatSession(sessionID)
|
|
if err != nil {
|
|
http.Error(w, "invalid session", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if time.Now().After(sess.ExpiresAt) {
|
|
http.Error(w, "session expired", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
// Upgrade to WebSocket
|
|
conn, err := b.upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Printf("[float/bridge] upgrade failed for session %s: %v", sessionID, err)
|
|
return
|
|
}
|
|
|
|
bc := &bridgeConn{
|
|
sessionID: sessionID,
|
|
conn: conn,
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
// Register active connection
|
|
b.mu.Lock()
|
|
// Close any existing connection for this session
|
|
if existing, ok := b.sessions[sessionID]; ok {
|
|
close(existing.done)
|
|
existing.conn.Close()
|
|
}
|
|
b.sessions[sessionID] = bc
|
|
b.mu.Unlock()
|
|
|
|
log.Printf("[float/bridge] session %s connected from %s", sessionID, r.RemoteAddr)
|
|
|
|
// Start read/write loops
|
|
go b.writePump(bc)
|
|
b.readPump(bc)
|
|
}
|
|
|
|
// readPump reads binary frames from the WebSocket and dispatches them.
|
|
func (b *Bridge) readPump(bc *bridgeConn) {
|
|
defer b.cleanup(bc)
|
|
|
|
bc.conn.SetReadLimit(maxMessageSize)
|
|
bc.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
bc.conn.SetPongHandler(func(string) error {
|
|
bc.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
messageType, data, err := bc.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
|
log.Printf("[float/bridge] session %s read error: %v", bc.sessionID, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
if messageType != websocket.BinaryMessage {
|
|
b.sendError(bc, 0x0001, "expected binary message")
|
|
continue
|
|
}
|
|
|
|
frameType, payload, err := DecodeFrame(data)
|
|
if err != nil {
|
|
b.sendError(bc, 0x0002, "malformed frame: "+err.Error())
|
|
continue
|
|
}
|
|
|
|
// Update session ping in DB
|
|
b.db.PingFloatSession(bc.sessionID)
|
|
|
|
switch frameType {
|
|
case FrameEnumerate:
|
|
b.handleEnumerate(bc)
|
|
case FrameOpen:
|
|
b.handleOpen(bc, payload)
|
|
case FrameClose:
|
|
b.handleClose(bc, payload)
|
|
case FrameTransferOut:
|
|
b.handleTransfer(bc, payload)
|
|
case FrameInterrupt:
|
|
b.handleInterrupt(bc, payload)
|
|
case FramePong:
|
|
// Client responded to our ping; no action needed
|
|
default:
|
|
b.sendError(bc, 0x0003, "unknown frame type")
|
|
}
|
|
}
|
|
}
|
|
|
|
// writePump sends periodic pings to keep the connection alive.
|
|
func (b *Bridge) writePump(bc *bridgeConn) {
|
|
ticker := time.NewTicker(pingInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
bc.mu.Lock()
|
|
bc.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
err := bc.conn.WriteMessage(websocket.BinaryMessage, EncodeFrame(FramePing, nil))
|
|
bc.mu.Unlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
case <-bc.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEnumerate responds with the current list of USB devices known to this
|
|
// session. In a full implementation, this would forward the enumerate request
|
|
// to the client-side USB agent and await its response. Here we return the
|
|
// cached device list.
|
|
func (b *Bridge) handleEnumerate(bc *bridgeConn) {
|
|
bc.mu.Lock()
|
|
devices := bc.devices
|
|
bc.mu.Unlock()
|
|
|
|
if devices == nil {
|
|
devices = []USBDevice{}
|
|
}
|
|
|
|
payload := EncodeDeviceList(devices)
|
|
b.sendFrame(bc, FrameEnumResult, payload)
|
|
}
|
|
|
|
// handleOpen processes a device open request. The payload contains
|
|
// [deviceID:2] identifying which device to claim.
|
|
func (b *Bridge) handleOpen(bc *bridgeConn, payload []byte) {
|
|
if len(payload) < 2 {
|
|
b.sendError(bc, 0x0010, "open: payload too short")
|
|
return
|
|
}
|
|
|
|
deviceID := uint16(payload[0])<<8 | uint16(payload[1])
|
|
|
|
// Verify the device exists in our known list
|
|
bc.mu.Lock()
|
|
found := false
|
|
for _, dev := range bc.devices {
|
|
if dev.DeviceID == deviceID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
bc.mu.Unlock()
|
|
|
|
if !found {
|
|
b.sendError(bc, 0x0011, "open: device not found")
|
|
return
|
|
}
|
|
|
|
// In a real implementation, this would claim the USB device via the host agent.
|
|
// For now, acknowledge the open request.
|
|
result := make([]byte, 3)
|
|
result[0] = payload[0]
|
|
result[1] = payload[1]
|
|
result[2] = 0x00 // success
|
|
b.sendFrame(bc, FrameOpenResult, result)
|
|
|
|
log.Printf("[float/bridge] session %s opened device 0x%04X", bc.sessionID, deviceID)
|
|
}
|
|
|
|
// handleClose processes a device close request. Payload: [deviceID:2].
|
|
func (b *Bridge) handleClose(bc *bridgeConn, payload []byte) {
|
|
if len(payload) < 2 {
|
|
b.sendError(bc, 0x0020, "close: payload too short")
|
|
return
|
|
}
|
|
|
|
deviceID := uint16(payload[0])<<8 | uint16(payload[1])
|
|
|
|
// Acknowledge close
|
|
result := make([]byte, 3)
|
|
result[0] = payload[0]
|
|
result[1] = payload[1]
|
|
result[2] = 0x00 // success
|
|
b.sendFrame(bc, FrameCloseResult, result)
|
|
|
|
log.Printf("[float/bridge] session %s closed device 0x%04X", bc.sessionID, deviceID)
|
|
}
|
|
|
|
// handleTransfer forwards a bulk/interrupt OUT transfer to the USB device.
|
|
func (b *Bridge) handleTransfer(bc *bridgeConn, payload []byte) {
|
|
deviceID, endpoint, transferData, err := DecodeTransfer(payload)
|
|
if err != nil {
|
|
b.sendError(bc, 0x0030, "transfer: "+err.Error())
|
|
return
|
|
}
|
|
|
|
// In a real implementation, the transfer data would be sent to the USB device
|
|
// via the host agent, and the response would be sent back. Here we acknowledge
|
|
// receipt of the transfer request.
|
|
log.Printf("[float/bridge] session %s transfer to device 0x%04X endpoint 0x%02X: %d bytes",
|
|
bc.sessionID, deviceID, endpoint, len(transferData))
|
|
|
|
// Build transfer result: [deviceID:2][endpoint:1][status:1]
|
|
result := make([]byte, 4)
|
|
result[0] = byte(deviceID >> 8)
|
|
result[1] = byte(deviceID)
|
|
result[2] = endpoint
|
|
result[3] = 0x00 // success
|
|
b.sendFrame(bc, FrameTransferResult, result)
|
|
}
|
|
|
|
// handleInterrupt processes an interrupt transfer request.
|
|
func (b *Bridge) handleInterrupt(bc *bridgeConn, payload []byte) {
|
|
if len(payload) < 3 {
|
|
b.sendError(bc, 0x0040, "interrupt: payload too short")
|
|
return
|
|
}
|
|
|
|
deviceID := uint16(payload[0])<<8 | uint16(payload[1])
|
|
endpoint := payload[2]
|
|
|
|
log.Printf("[float/bridge] session %s interrupt on device 0x%04X endpoint 0x%02X",
|
|
bc.sessionID, deviceID, endpoint)
|
|
|
|
// Acknowledge interrupt request
|
|
result := make([]byte, 4)
|
|
result[0] = payload[0]
|
|
result[1] = payload[1]
|
|
result[2] = endpoint
|
|
result[3] = 0x00 // success
|
|
b.sendFrame(bc, FrameInterruptResult, result)
|
|
}
|
|
|
|
// sendFrame writes a binary frame to the WebSocket connection.
|
|
func (b *Bridge) sendFrame(bc *bridgeConn, frameType byte, payload []byte) {
|
|
bc.mu.Lock()
|
|
defer bc.mu.Unlock()
|
|
|
|
bc.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := bc.conn.WriteMessage(websocket.BinaryMessage, EncodeFrame(frameType, payload)); err != nil {
|
|
log.Printf("[float/bridge] session %s write error: %v", bc.sessionID, err)
|
|
}
|
|
}
|
|
|
|
// sendError writes an error frame to the WebSocket connection.
|
|
func (b *Bridge) sendError(bc *bridgeConn, code uint16, message string) {
|
|
b.sendFrame(bc, FrameError, EncodeError(code, message))
|
|
}
|
|
|
|
// cleanup removes a connection from the active sessions and cleans up resources.
|
|
func (b *Bridge) cleanup(bc *bridgeConn) {
|
|
b.mu.Lock()
|
|
if current, ok := b.sessions[bc.sessionID]; ok && current == bc {
|
|
delete(b.sessions, bc.sessionID)
|
|
}
|
|
b.mu.Unlock()
|
|
|
|
close(bc.done)
|
|
bc.conn.Close()
|
|
|
|
log.Printf("[float/bridge] session %s disconnected", bc.sessionID)
|
|
}
|
|
|
|
// ActiveSessions returns the number of currently connected WebSocket sessions.
|
|
func (b *Bridge) ActiveSessions() int {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
return len(b.sessions)
|
|
}
|
|
|
|
// DisconnectSession forcibly closes the WebSocket connection for a given session.
|
|
func (b *Bridge) DisconnectSession(sessionID string) {
|
|
b.mu.Lock()
|
|
bc, ok := b.sessions[sessionID]
|
|
if ok {
|
|
delete(b.sessions, sessionID)
|
|
}
|
|
b.mu.Unlock()
|
|
|
|
if ok {
|
|
close(bc.done)
|
|
bc.conn.WriteControl(
|
|
websocket.CloseMessage,
|
|
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "session terminated"),
|
|
time.Now().Add(writeWait),
|
|
)
|
|
bc.conn.Close()
|
|
log.Printf("[float/bridge] session %s forcibly disconnected", sessionID)
|
|
}
|
|
}
|
|
|
|
// UpdateDeviceList sets the known device list for a session (called when the
|
|
// client-side USB agent reports its attached devices).
|
|
func (b *Bridge) UpdateDeviceList(sessionID string, devices []USBDevice) {
|
|
b.mu.RLock()
|
|
bc, ok := b.sessions[sessionID]
|
|
b.mu.RUnlock()
|
|
|
|
if ok {
|
|
bc.mu.Lock()
|
|
bc.devices = devices
|
|
bc.mu.Unlock()
|
|
}
|
|
}
|