// Package websocket fournit le hub central WebSocket de ProxmoxPanel. // Les clients s'abonnent à des channels nommés et reçoivent les messages qui leur sont destinés. package websocket import ( "encoding/json" "sync" "time" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 8192 ) // Message représente un message WebSocket avec un type et un payload JSON. type Message struct { Type string `json:"type"` Channel string `json:"channel"` Payload json.RawMessage `json:"payload,omitempty"` } // Client représente un client WebSocket connecté. type Client struct { hub *Hub conn *websocket.Conn send chan []byte channels map[string]bool mu sync.RWMutex userID int64 } // Hub gère toutes les connexions WebSocket actives et le routage des messages par channel. type Hub struct { mu sync.RWMutex clients map[*Client]bool register chan *Client unregister chan *Client broadcast chan broadcastMsg } type broadcastMsg struct { channel string data []byte } // NewHub crée un nouveau hub WebSocket et le démarre. func NewHub() *Hub { h := &Hub{ clients: make(map[*Client]bool), register: make(chan *Client, 64), unregister: make(chan *Client, 64), broadcast: make(chan broadcastMsg, 256), } go h.run() return h } // run est la boucle principale du hub (goroutine unique pour éviter les races). func (h *Hub) run() { for { select { case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() case client := <-h.unregister: h.mu.Lock() if h.clients[client] { delete(h.clients, client) close(client.send) } h.mu.Unlock() case msg := <-h.broadcast: h.mu.RLock() for client := range h.clients { client.mu.RLock() subscribed := client.channels[msg.channel] || client.channels["*"] client.mu.RUnlock() if subscribed { select { case client.send <- msg.data: default: // Client lent ou déconnecté — on le supprime h.mu.RUnlock() h.mu.Lock() if h.clients[client] { delete(h.clients, client) close(client.send) } h.mu.Unlock() h.mu.RLock() } } } h.mu.RUnlock() } } } // Publish envoie un message sur un channel donné à tous les clients abonnés. func (h *Hub) Publish(channel, msgType string, payload any) { data, err := marshalMessage(msgType, channel, payload) if err != nil { return } h.broadcast <- broadcastMsg{channel: channel, data: data} } // PublishRaw envoie des données brutes sur un channel. func (h *Hub) PublishRaw(channel string, data []byte) { h.broadcast <- broadcastMsg{channel: channel, data: data} } // NewClient crée et enregistre un nouveau client WebSocket. func (h *Hub) NewClient(conn *websocket.Conn, userID int64) *Client { c := &Client{ hub: h, conn: conn, send: make(chan []byte, 256), channels: make(map[string]bool), userID: userID, } h.register <- c go c.writePump() go c.readPump() return c } // Subscribe abonne le client à un channel. func (c *Client) Subscribe(channel string) { c.mu.Lock() c.channels[channel] = true c.mu.Unlock() } // Unsubscribe désabonne le client d'un channel. func (c *Client) Unsubscribe(channel string) { c.mu.Lock() delete(c.channels, channel) c.mu.Unlock() } // writePump envoie les messages en attente au client WebSocket. func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case msg, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // Hub a fermé le channel c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // readPump lit les messages entrants du client (abonnements, ping, etc.) func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, rawMsg, err := c.conn.ReadMessage() if err != nil { break } // Traiter les messages d'abonnement entrants var msg Message if json.Unmarshal(rawMsg, &msg) == nil { switch msg.Type { case "subscribe": c.Subscribe(msg.Channel) case "unsubscribe": c.Unsubscribe(msg.Channel) } } } } // marshalMessage sérialise un message WebSocket en JSON. func marshalMessage(msgType, channel string, payload any) ([]byte, error) { var rawPayload json.RawMessage if payload != nil { data, err := json.Marshal(payload) if err != nil { return nil, err } rawPayload = data } return json.Marshal(Message{ Type: msgType, Channel: channel, Payload: rawPayload, }) }