refactor websockets implementation

This commit is contained in:
Jakob Schrettenbrunner 2018-03-14 10:34:06 +01:00
parent b8b0702f84
commit 53704fb0d7
9 changed files with 234 additions and 230 deletions

View File

@ -1,76 +0,0 @@
package websockets
import "github.com/gorilla/websocket"
import (
"time"
log "github.com/sirupsen/logrus"
)
type Client struct {
hub *Hub
socket *websocket.Conn
send chan []byte
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.socket.Close()
}()
c.socket.SetReadLimit(maxMessageSize)
c.socket.SetReadDeadline(time.Now().Add(pongWait))
c.socket.SetPongHandler(func(string) error {
c.socket.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, _, err := c.socket.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.WithError(err).Debug("Websocket closed unexpectedly.")
}
return
}
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.socket.Close()
}()
for {
select {
case m, ok := <-c.send:
c.socket.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel
c.socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.socket.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write([]byte{'['})
w.Write(m)
for i := 0; i < len(c.send)+1; i++ {
w.Write([]byte{','})
w.Write(<-c.send)
}
w.Write([]byte{']'})
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.socket.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.socket.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

View File

@ -0,0 +1,120 @@
package websockets
import (
"encoding/json"
"net/http"
"time"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = pongWait * 9 / 10
maxMessageSize = 512
)
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type websocketMap map[*Socket]bool
type Collection struct {
sockets websocketMap
Broadcast chan Message
register chan *Socket
unregister chan *Socket
close chan bool
}
//var _ io.Writer = &Collection{}
func NewCollection() *Collection {
return &Collection{
Broadcast: make(chan Message),
register: make(chan *Socket),
unregister: make(chan *Socket),
close: make(chan bool),
sockets: make(websocketMap),
}
}
func (c *Collection) Upgrade(w http.ResponseWriter, r *http.Request) {
socket, err := wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.WithError(err).Error("Failed to upgrade to websocket")
return
}
s := &Socket{
collection: c,
conn: socket,
send: make(chan []byte, 256),
}
c.register <- s
go s.readPump()
go s.writePump()
}
func (c *Collection) Add(s *Socket) {
c.register <- s
}
func (c *Collection) Remove(s *Socket) {
c.unregister <- s
}
func (c *Collection) Run() {
defer func() {
for s := range c.sockets {
close(s.send)
delete(c.sockets, s)
}
close(c.register)
close(c.unregister)
close(c.Broadcast)
close(c.close)
}()
for {
select {
case s := <-c.register:
c.sockets[s] = true
case s := <-c.unregister:
if _, ok := c.sockets[s]; ok {
delete(c.sockets, s)
close(s.send)
}
case m := <-c.Broadcast:
b, err := json.Marshal(m)
if err != nil {
log.WithError(err).Error("Failed to encode websocket message.")
continue
}
for s := range c.sockets {
select {
case s.send <- b:
default:
close(s.send)
delete(c.sockets, s)
}
}
case <-c.close:
return
}
}
}
func (c *Collection) Close() {
c.close <- true
}

View File

@ -1,28 +0,0 @@
package websockets
import "io"
type ConsoleWriter struct {
Hub *Hub
HandlerFunc *func(string)
}
var _ io.Writer = ConsoleWriter{}
func (c ConsoleWriter) Write(b []byte) (n int, e error) {
line := make([]byte, len(b))
copy(line, b)
m := Message{
Type: MessageTypeConsole,
Payload: ConsolePayload{
Line: string(line),
Level: ConsoleLevelPlain,
Source: ConsoleSourceServer,
},
}
c.Hub.Broadcast <- m
if c.HandlerFunc != nil {
(*c.HandlerFunc)(string(line))
}
return len(b), nil
}

View File

@ -1,120 +0,0 @@
package websockets
import (
"encoding/json"
"net/http"
"time"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = pongWait * 9 / 10
maxMessageSize = 512
)
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type websocketMap map[*Client]bool
type Hub struct {
clients websocketMap
Broadcast chan Message
register chan *Client
unregister chan *Client
close chan bool
}
//var _ io.Writer = &Hub{}
func NewHub() *Hub {
return &Hub{
Broadcast: make(chan Message),
register: make(chan *Client),
unregister: make(chan *Client),
close: make(chan bool),
clients: make(websocketMap),
}
}
func (h *Hub) Upgrade(w http.ResponseWriter, r *http.Request) {
socket, err := wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.WithError(err).Error("Failed to upgrade to websocket")
return
}
c := &Client{
hub: h,
socket: socket,
send: make(chan []byte, 256),
}
h.register <- c
go c.readPump()
go c.writePump()
}
func (h *Hub) Subscribe(c *Client) {
h.register <- c
}
func (h *Hub) Unsubscribe(c *Client) {
h.unregister <- c
}
func (h *Hub) Run() {
defer func() {
for s := range h.clients {
close(s.send)
delete(h.clients, s)
}
close(h.register)
close(h.unregister)
close(h.Broadcast)
close(h.close)
}()
for {
select {
case s := <-h.register:
h.clients[s] = true
case s := <-h.unregister:
if _, ok := h.clients[s]; ok {
delete(h.clients, s)
close(s.send)
}
case m := <-h.Broadcast:
b, err := json.Marshal(m)
if err != nil {
log.WithError(err).Error("Failed to encode websocket message.")
continue
}
for s := range h.clients {
select {
case s.send <- b:
default:
close(s.send)
delete(h.clients, s)
}
}
case <-h.close:
return
}
}
}
func (h *Hub) Close() {
h.close <- true
}

View File

@ -47,8 +47,8 @@ type ConsolePayload struct {
Line string `json:"line"`
}
func (h *Hub) Log(l ConsoleLevel, m string) {
h.Broadcast <- Message{
func (c *Collection) Log(l ConsoleLevel, m string) {
c.Broadcast <- Message{
Type: MessageTypeConsole,
Payload: ConsolePayload{
Source: ConsoleSourceWings,

81
api/websockets/socket.go Normal file
View File

@ -0,0 +1,81 @@
package websockets
import "github.com/gorilla/websocket"
import (
"time"
log "github.com/sirupsen/logrus"
)
type Socket struct {
collection *Collection
conn *websocket.Conn
send chan []byte
}
func (s *Socket) readPump() {
defer func() {
s.collection.unregister <- s
s.conn.Close()
}()
s.conn.SetReadLimit(maxMessageSize)
s.conn.SetReadDeadline(time.Now().Add(pongWait))
s.conn.SetPongHandler(func(string) error {
s.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
t, m, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.WithError(err).Debug("Websocket closed unexpectedly.")
}
return
}
// Handle websocket responses somehow
if t == websocket.TextMessage {
log.Debug("Received websocket message: " + string(m))
}
}
}
func (s *Socket) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
s.conn.Close()
}()
for {
select {
case m, ok := <-s.send:
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The collection closed the channel
s.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := s.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write([]byte{'['})
w.Write(m)
n := len(s.send) - 1
for i := 0; i < n; i++ {
w.Write([]byte{','})
w.Write(<-s.send)
}
w.Write([]byte{']'})
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

View File

@ -67,7 +67,34 @@ func (env *dockerEnvironment) attach() error {
if env.attached {
return nil
}
cw := ConsoleHandler{
Websockets: env.server.websockets,
}
var err error
env.hires, err = env.client.ContainerAttach(context.TODO(), env.server.DockerContainer.ID,
types.ContainerAttachOptions{
Stdin: true,
Stdout: true,
Stderr: true,
Stream: true,
})
if err != nil {
log.WithField("server", env.server.ID).WithError(err).Error("Failed to attach to docker container.")
return err
}
env.attached = true
go func() {
defer env.hires.Close()
defer func() {
env.attached = false
}()
io.Copy(cw, env.hires.Reader)
}()
return nil
}

View File

@ -37,7 +37,7 @@ type Server interface {
Save() error
Environment() (Environment, error)
Websockets() *websockets.Hub
Websockets() *websockets.Collection
HasPermission(string, string) bool
}
@ -73,7 +73,7 @@ type ServerStruct struct {
// TODO remove
Keys map[string][]string `json:"keys"`
websockets *websockets.Hub
websockets *websockets.Collection
status Status
}
@ -161,7 +161,7 @@ func (s *ServerStruct) init() error {
}
s.status = StatusStopped
s.websockets = websockets.NewHub()
s.websockets = websockets.NewCollection()
go s.websockets.Run()
var err error

View File

@ -29,7 +29,7 @@ func (s *ServerStruct) Environment() (Environment, error) {
return s.environment, nil
}
func (s *ServerStruct) Websockets() *websockets.Hub {
func (s *ServerStruct) Websockets() *websockets.Collection {
return s.websockets
}