wings/api/websockets/hub.go

121 lines
2.0 KiB
Go
Raw Normal View History

package websockets
import (
"encoding/json"
"net/http"
"time"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = pongWait * 9 / 10
maxMessageSize = 512
)
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type websocketMap map[*Client]bool
type Hub struct {
clients websocketMap
Broadcast chan Message
register chan *Client
unregister chan *Client
close chan bool
}
//var _ io.Writer = &Hub{}
func NewHub() *Hub {
return &Hub{
Broadcast: make(chan Message),
register: make(chan *Client),
unregister: make(chan *Client),
close: make(chan bool),
clients: make(websocketMap),
}
}
func (h *Hub) Upgrade(w http.ResponseWriter, r *http.Request) {
socket, err := wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.WithError(err).Error("Failed to upgrade to websocket")
return
}
c := &Client{
hub: h,
socket: socket,
send: make(chan []byte, 256),
}
h.register <- c
go c.readPump()
go c.writePump()
}
func (h *Hub) Subscribe(c *Client) {
h.register <- c
}
func (h *Hub) Unsubscribe(c *Client) {
h.unregister <- c
}
func (h *Hub) Run() {
defer func() {
for s := range h.clients {
close(s.send)
delete(h.clients, s)
}
close(h.register)
close(h.unregister)
close(h.Broadcast)
close(h.close)
}()
for {
select {
case s := <-h.register:
h.clients[s] = true
case s := <-h.unregister:
if _, ok := h.clients[s]; ok {
delete(h.clients, s)
close(s.send)
}
case m := <-h.Broadcast:
b, err := json.Marshal(m)
if err != nil {
log.WithError(err).Error("Failed to encode websocket message.")
continue
}
for s := range h.clients {
select {
case s.send <- b:
default:
close(s.send)
delete(h.clients, s)
}
}
case <-h.close:
return
}
}
}
func (h *Hub) Close() {
h.close <- true
}