Get working console websocket implementation and fix race condition

This commit is contained in:
Dane Everitt 2019-04-20 17:38:12 -07:00
parent 49ca2e2404
commit bed30d9229
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
4 changed files with 99 additions and 25 deletions

View File

@ -228,7 +228,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
s := bufio.NewScanner(r) s := bufio.NewScanner(r)
for s.Scan() { for s.Scan() {
fmt.Println(s.Text()) d.Server.Emit(ConsoleOutputEvent, s.Text())
} }
if err := s.Err(); err != nil { if err := s.Err(); err != nil {

46
server/events.go Normal file
View File

@ -0,0 +1,46 @@
package server
type EventListeners map[string][]EventListenerFunction
type EventListenerFunction *func(string)
// Defines all of the possible output events for a server.
const (
ConsoleOutputEvent = "console"
)
// Adds an event listener for the server instance.
func (s *Server) AddListener(event string, f EventListenerFunction) {
if s.listeners == nil {
s.listeners = make(map[string][]EventListenerFunction)
}
if _, ok := s.listeners[event]; ok {
s.listeners[event] = append(s.listeners[event], f)
} else {
s.listeners[event] = []EventListenerFunction{f}
}
}
// Removes the event listener for the server instance.
func (s *Server) RemoveListener(event string, f EventListenerFunction) {
if _, ok := s.listeners[event]; ok {
for i := range s.listeners[event] {
if s.listeners[event][i] == f {
s.listeners[event] = append(s.listeners[event][:i], s.listeners[event][i+1:]...)
break
}
}
}
}
// Emits an event to all of the active listeners for a server.
func (s *Server) Emit(event string, data string) {
if _, ok := s.listeners[event]; ok {
for _, handler := range s.listeners[event] {
go func (f EventListenerFunction, d string) {
(*f)(d)
}(handler, data)
}
}
}

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"github.com/olebedev/emitter"
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
@ -62,7 +61,8 @@ type Server struct {
// certain long operations return faster. For example, FS disk space usage. // certain long operations return faster. For example, FS disk space usage.
Cache *cache.Cache `json:"-"` Cache *cache.Cache `json:"-"`
Emitter *emitter.Emitter `json:"-"` // All of the registered event listeners for this server instance.
listeners EventListeners
} }
// The build settings for a given server that impact docker container creation and // The build settings for a given server that impact docker container creation and
@ -206,7 +206,6 @@ func FromConfiguration(data []byte, cfg *config.SystemConfiguration) (*Server, e
return nil, err return nil, err
} }
s.Emitter = &emitter.Emitter{}
s.Environment = env s.Environment = env
s.Cache = cache.New(time.Minute * 10, time.Minute * 15) s.Cache = cache.New(time.Minute * 10, time.Minute * 15)
s.Filesystem = &Filesystem{ s.Filesystem = &Filesystem{

View File

@ -2,7 +2,6 @@ package main
import ( import (
"errors" "errors"
"fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
@ -10,6 +9,7 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"sync"
) )
type WebsocketMessage struct { type WebsocketMessage struct {
@ -25,11 +25,17 @@ type WebsocketMessage struct {
// should either omit the field or pass an empty value as it is ignored. // should either omit the field or pass an empty value as it is ignored.
Args []string `json:"args,omitempty"` Args []string `json:"args,omitempty"`
server *server.Server
inbound bool inbound bool
} }
type WebsocketHandler struct {
Server *server.Server
Mutex sync.Mutex
Connection *websocket.Conn
}
// Handle a request for a specific server websocket. This will handle inbound requests as well
// as ensure that any console output is also passed down the wire on the socket.
func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
c, err := rt.upgrader.Upgrade(w, r, nil) c, err := rt.upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -39,12 +45,27 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
defer c.Close() defer c.Close()
s := rt.Servers.Get(ps.ByName("server")) s := rt.Servers.Get(ps.ByName("server"))
handler := WebsocketHandler{
Server: s,
Mutex: sync.Mutex{},
Connection: c,
}
handleOutput := func(data string) {
handler.SendJson(&WebsocketMessage{
Event: "console output",
Args: []string{data},
})
}
s.AddListener(server.ConsoleOutputEvent, &handleOutput)
defer s.RemoveListener(server.ConsoleOutputEvent, &handleOutput)
for { for {
j := WebsocketMessage{server: s, inbound: true} j := WebsocketMessage{inbound: true}
if _, _, err := c.ReadMessage(); err != nil { if _, _, err := c.ReadMessage(); err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseServiceRestart) { if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseServiceRestart) {
zap.S().Errorw("error handling websocket message", zap.Error(err)) zap.S().Errorw("error handling websocket message", zap.Error(err))
} }
break break
@ -57,34 +78,42 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
continue continue
} }
fmt.Printf("%s received: %s = %s\n", s.Uuid, j.Event, strings.Join(j.Args, " ")) if err := handler.HandleInbound(j); err != nil {
if err := j.HandleInbound(c); err != nil {
zap.S().Warnw("error handling inbound websocket request", zap.Error(err)) zap.S().Warnw("error handling inbound websocket request", zap.Error(err))
break break
} }
} }
zap.S().Debugw("disconnected from instance", zap.String("ip", c.RemoteAddr().String()))
} }
func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error { // Perform a blocking send operation on the websocket since we want to avoid any
if !wsm.inbound { // concurrent writes to the connection, which would cause a runtime panic and cause
// the program to crash out.
func (wsh *WebsocketHandler) SendJson(v interface{}) error {
wsh.Mutex.Lock()
defer wsh.Mutex.Unlock()
return wsh.Connection.WriteJSON(v)
}
// Handle the inbound socket request and route it to the proper server action.
func (wsh *WebsocketHandler) HandleInbound(m WebsocketMessage) error {
if !m.inbound {
return errors.New("cannot handle websocket message, not an inbound connection") return errors.New("cannot handle websocket message, not an inbound connection")
} }
switch wsm.Event { switch m.Event {
case "set state": case "set state":
{ {
var err error var err error
switch strings.Join(wsm.Args, "") { switch strings.Join(m.Args, "") {
case "start": case "start":
err = wsm.server.Environment.Start() err = wsh.Server.Environment.Start()
break break
case "stop": case "stop":
err = wsm.server.Environment.Stop() err = wsh.Server.Environment.Stop()
break break
case "restart": case "restart":
err = wsm.server.Environment.Terminate(os.Kill) err = wsh.Server.Environment.Terminate(os.Kill)
break break
} }
@ -94,13 +123,13 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error {
} }
case "send logs": case "send logs":
{ {
logs, err := wsm.server.Environment.Readlog(1024 * 5) logs, err := wsh.Server.Environment.Readlog(1024 * 5)
if err != nil { if err != nil {
return err return err
} }
for _, line := range logs { for _, line := range logs {
c.WriteJSON(&WebsocketMessage{ wsh.SendJson(&WebsocketMessage{
Event: "console output", Event: "console output",
Args: []string{line}, Args: []string{line},
}) })
@ -110,7 +139,7 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error {
} }
case "send command": case "send command":
{ {
return wsm.server.Environment.SendCommand(strings.Join(wsm.Args, "")) return wsh.Server.Environment.SendCommand(strings.Join(m.Args, ""))
} }
} }