diff --git a/api/websockets/client.go b/api/websockets/client.go deleted file mode 100644 index 43147c4..0000000 --- a/api/websockets/client.go +++ /dev/null @@ -1,76 +0,0 @@ -package websockets - -import "github.com/gorilla/websocket" -import ( - "time" - - log "github.com/sirupsen/logrus" -) - -type Client struct { - hub *Hub - - socket *websocket.Conn - - send chan []byte -} - -func (c *Client) readPump() { - defer func() { - c.hub.unregister <- c - c.socket.Close() - }() - c.socket.SetReadLimit(maxMessageSize) - c.socket.SetReadDeadline(time.Now().Add(pongWait)) - c.socket.SetPongHandler(func(string) error { - c.socket.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) - for { - _, _, err := c.socket.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - log.WithError(err).Debug("Websocket closed unexpectedly.") - } - return - } - } -} - -func (c *Client) writePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.socket.Close() - }() - for { - select { - case m, ok := <-c.send: - c.socket.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - // The hub closed the channel - c.socket.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - w, err := c.socket.NextWriter(websocket.TextMessage) - if err != nil { - return - } - w.Write([]byte{'['}) - w.Write(m) - for i := 0; i < len(c.send)+1; i++ { - w.Write([]byte{','}) - w.Write(<-c.send) - } - w.Write([]byte{']'}) - if err := w.Close(); err != nil { - return - } - case <-ticker.C: - c.socket.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.socket.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - return - } - } - } -} diff --git a/api/websockets/collection.go b/api/websockets/collection.go new file mode 100644 index 0000000..dc5425b --- /dev/null +++ b/api/websockets/collection.go @@ -0,0 +1,120 @@ +package websockets + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" +) + +const ( + writeWait = 10 * time.Second + pongWait = 60 * time.Second + + pingPeriod = pongWait * 9 / 10 + + maxMessageSize = 512 +) + +var wsupgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type websocketMap map[*Socket]bool + +type Collection struct { + sockets websocketMap + + Broadcast chan Message + + register chan *Socket + unregister chan *Socket + close chan bool +} + +//var _ io.Writer = &Collection{} + +func NewCollection() *Collection { + return &Collection{ + Broadcast: make(chan Message), + register: make(chan *Socket), + unregister: make(chan *Socket), + close: make(chan bool), + sockets: make(websocketMap), + } +} + +func (c *Collection) Upgrade(w http.ResponseWriter, r *http.Request) { + socket, err := wsupgrader.Upgrade(w, r, nil) + if err != nil { + log.WithError(err).Error("Failed to upgrade to websocket") + return + } + s := &Socket{ + collection: c, + conn: socket, + send: make(chan []byte, 256), + } + c.register <- s + + go s.readPump() + go s.writePump() +} + +func (c *Collection) Add(s *Socket) { + c.register <- s +} + +func (c *Collection) Remove(s *Socket) { + c.unregister <- s +} + +func (c *Collection) Run() { + defer func() { + for s := range c.sockets { + close(s.send) + delete(c.sockets, s) + } + close(c.register) + close(c.unregister) + close(c.Broadcast) + close(c.close) + }() + for { + select { + case s := <-c.register: + c.sockets[s] = true + case s := <-c.unregister: + if _, ok := c.sockets[s]; ok { + delete(c.sockets, s) + close(s.send) + } + case m := <-c.Broadcast: + b, err := json.Marshal(m) + if err != nil { + log.WithError(err).Error("Failed to encode websocket message.") + continue + } + for s := range c.sockets { + select { + case s.send <- b: + default: + close(s.send) + delete(c.sockets, s) + } + } + case <-c.close: + return + } + } +} + +func (c *Collection) Close() { + c.close <- true +} diff --git a/api/websockets/consolewriter.go b/api/websockets/consolewriter.go deleted file mode 100644 index 13df333..0000000 --- a/api/websockets/consolewriter.go +++ /dev/null @@ -1,28 +0,0 @@ -package websockets - -import "io" - -type ConsoleWriter struct { - Hub *Hub - HandlerFunc *func(string) -} - -var _ io.Writer = ConsoleWriter{} - -func (c ConsoleWriter) Write(b []byte) (n int, e error) { - line := make([]byte, len(b)) - copy(line, b) - m := Message{ - Type: MessageTypeConsole, - Payload: ConsolePayload{ - Line: string(line), - Level: ConsoleLevelPlain, - Source: ConsoleSourceServer, - }, - } - c.Hub.Broadcast <- m - if c.HandlerFunc != nil { - (*c.HandlerFunc)(string(line)) - } - return len(b), nil -} diff --git a/api/websockets/hub.go b/api/websockets/hub.go deleted file mode 100644 index 11d42d1..0000000 --- a/api/websockets/hub.go +++ /dev/null @@ -1,120 +0,0 @@ -package websockets - -import ( - "encoding/json" - "net/http" - "time" - - "github.com/gorilla/websocket" - log "github.com/sirupsen/logrus" -) - -const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - - pingPeriod = pongWait * 9 / 10 - - maxMessageSize = 512 -) - -var wsupgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -type websocketMap map[*Client]bool - -type Hub struct { - clients websocketMap - - Broadcast chan Message - - register chan *Client - unregister chan *Client - close chan bool -} - -//var _ io.Writer = &Hub{} - -func NewHub() *Hub { - return &Hub{ - Broadcast: make(chan Message), - register: make(chan *Client), - unregister: make(chan *Client), - close: make(chan bool), - clients: make(websocketMap), - } -} - -func (h *Hub) Upgrade(w http.ResponseWriter, r *http.Request) { - socket, err := wsupgrader.Upgrade(w, r, nil) - if err != nil { - log.WithError(err).Error("Failed to upgrade to websocket") - return - } - c := &Client{ - hub: h, - socket: socket, - send: make(chan []byte, 256), - } - h.register <- c - - go c.readPump() - go c.writePump() -} - -func (h *Hub) Subscribe(c *Client) { - h.register <- c -} - -func (h *Hub) Unsubscribe(c *Client) { - h.unregister <- c -} - -func (h *Hub) Run() { - defer func() { - for s := range h.clients { - close(s.send) - delete(h.clients, s) - } - close(h.register) - close(h.unregister) - close(h.Broadcast) - close(h.close) - }() - for { - select { - case s := <-h.register: - h.clients[s] = true - case s := <-h.unregister: - if _, ok := h.clients[s]; ok { - delete(h.clients, s) - close(s.send) - } - case m := <-h.Broadcast: - b, err := json.Marshal(m) - if err != nil { - log.WithError(err).Error("Failed to encode websocket message.") - continue - } - for s := range h.clients { - select { - case s.send <- b: - default: - close(s.send) - delete(h.clients, s) - } - } - case <-h.close: - return - } - } -} - -func (h *Hub) Close() { - h.close <- true -} diff --git a/api/websockets/message.go b/api/websockets/message.go index 52b31e9..8ce2230 100644 --- a/api/websockets/message.go +++ b/api/websockets/message.go @@ -47,8 +47,8 @@ type ConsolePayload struct { Line string `json:"line"` } -func (h *Hub) Log(l ConsoleLevel, m string) { - h.Broadcast <- Message{ +func (c *Collection) Log(l ConsoleLevel, m string) { + c.Broadcast <- Message{ Type: MessageTypeConsole, Payload: ConsolePayload{ Source: ConsoleSourceWings, diff --git a/api/websockets/socket.go b/api/websockets/socket.go new file mode 100644 index 0000000..fe977cd --- /dev/null +++ b/api/websockets/socket.go @@ -0,0 +1,81 @@ +package websockets + +import "github.com/gorilla/websocket" +import ( + "time" + + log "github.com/sirupsen/logrus" +) + +type Socket struct { + collection *Collection + + conn *websocket.Conn + + send chan []byte +} + +func (s *Socket) readPump() { + defer func() { + s.collection.unregister <- s + s.conn.Close() + }() + s.conn.SetReadLimit(maxMessageSize) + s.conn.SetReadDeadline(time.Now().Add(pongWait)) + s.conn.SetPongHandler(func(string) error { + s.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + for { + t, m, err := s.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + log.WithError(err).Debug("Websocket closed unexpectedly.") + } + return + } + // Handle websocket responses somehow + if t == websocket.TextMessage { + log.Debug("Received websocket message: " + string(m)) + } + } +} + +func (s *Socket) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + s.conn.Close() + }() + for { + select { + case m, ok := <-s.send: + s.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The collection closed the channel + s.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + w, err := s.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write([]byte{'['}) + w.Write(m) + n := len(s.send) - 1 + for i := 0; i < n; i++ { + w.Write([]byte{','}) + w.Write(<-s.send) + } + w.Write([]byte{']'}) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + s.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} diff --git a/control/docker_environment.go b/control/docker_environment.go index 9c0b2b1..08b1bcb 100644 --- a/control/docker_environment.go +++ b/control/docker_environment.go @@ -67,7 +67,34 @@ func (env *dockerEnvironment) attach() error { if env.attached { return nil } + + cw := ConsoleHandler{ + Websockets: env.server.websockets, + } + + var err error + env.hires, err = env.client.ContainerAttach(context.TODO(), env.server.DockerContainer.ID, + types.ContainerAttachOptions{ + Stdin: true, + Stdout: true, + Stderr: true, + Stream: true, + }) + + if err != nil { + log.WithField("server", env.server.ID).WithError(err).Error("Failed to attach to docker container.") + return err + } env.attached = true + + go func() { + defer env.hires.Close() + defer func() { + env.attached = false + }() + io.Copy(cw, env.hires.Reader) + }() + return nil } diff --git a/control/server.go b/control/server.go index bedc721..c21fcd3 100644 --- a/control/server.go +++ b/control/server.go @@ -37,7 +37,7 @@ type Server interface { Save() error Environment() (Environment, error) - Websockets() *websockets.Hub + Websockets() *websockets.Collection HasPermission(string, string) bool } @@ -73,7 +73,7 @@ type ServerStruct struct { // TODO remove Keys map[string][]string `json:"keys"` - websockets *websockets.Hub + websockets *websockets.Collection status Status } @@ -161,7 +161,7 @@ func (s *ServerStruct) init() error { } s.status = StatusStopped - s.websockets = websockets.NewHub() + s.websockets = websockets.NewCollection() go s.websockets.Run() var err error diff --git a/control/server_util.go b/control/server_util.go index 03d20e5..d5d6ada 100644 --- a/control/server_util.go +++ b/control/server_util.go @@ -29,7 +29,7 @@ func (s *ServerStruct) Environment() (Environment, error) { return s.environment, nil } -func (s *ServerStruct) Websockets() *websockets.Hub { +func (s *ServerStruct) Websockets() *websockets.Collection { return s.websockets }