diff --git a/server/environment_docker.go b/server/environment_docker.go index dcaca7d..5383d3b 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -228,7 +228,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { s := bufio.NewScanner(r) for s.Scan() { - fmt.Println(s.Text()) + d.Server.Emit(ConsoleOutputEvent, s.Text()) } if err := s.Err(); err != nil { diff --git a/server/events.go b/server/events.go new file mode 100644 index 0000000..75f906b --- /dev/null +++ b/server/events.go @@ -0,0 +1,46 @@ +package server + +type EventListeners map[string][]EventListenerFunction + +type EventListenerFunction *func(string) + +// Defines all of the possible output events for a server. +const ( + ConsoleOutputEvent = "console" +) + +// Adds an event listener for the server instance. +func (s *Server) AddListener(event string, f EventListenerFunction) { + if s.listeners == nil { + s.listeners = make(map[string][]EventListenerFunction) + } + + if _, ok := s.listeners[event]; ok { + s.listeners[event] = append(s.listeners[event], f) + } else { + s.listeners[event] = []EventListenerFunction{f} + } +} + +// Removes the event listener for the server instance. +func (s *Server) RemoveListener(event string, f EventListenerFunction) { + if _, ok := s.listeners[event]; ok { + for i := range s.listeners[event] { + if s.listeners[event][i] == f { + s.listeners[event] = append(s.listeners[event][:i], s.listeners[event][i+1:]...) + break + } + } + } +} + +// Emits an event to all of the active listeners for a server. +func (s *Server) Emit(event string, data string) { + if _, ok := s.listeners[event]; ok { + for _, handler := range s.listeners[event] { + go func (f EventListenerFunction, d string) { + (*f)(d) + }(handler, data) + } + } +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index 19afac6..120ae98 100644 --- a/server/server.go +++ b/server/server.go @@ -1,7 +1,6 @@ package server import ( - "github.com/olebedev/emitter" "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/pterodactyl/wings/config" @@ -62,7 +61,8 @@ type Server struct { // certain long operations return faster. For example, FS disk space usage. Cache *cache.Cache `json:"-"` - Emitter *emitter.Emitter `json:"-"` + // All of the registered event listeners for this server instance. + listeners EventListeners } // The build settings for a given server that impact docker container creation and @@ -206,7 +206,6 @@ func FromConfiguration(data []byte, cfg *config.SystemConfiguration) (*Server, e return nil, err } - s.Emitter = &emitter.Emitter{} s.Environment = env s.Cache = cache.New(time.Minute * 10, time.Minute * 15) s.Filesystem = &Filesystem{ diff --git a/websocket.go b/websocket.go index b1bb60e..eeb5564 100644 --- a/websocket.go +++ b/websocket.go @@ -2,7 +2,6 @@ package main import ( "errors" - "fmt" "github.com/gorilla/websocket" "github.com/julienschmidt/httprouter" "github.com/pterodactyl/wings/server" @@ -10,6 +9,7 @@ import ( "net/http" "os" "strings" + "sync" ) type WebsocketMessage struct { @@ -25,11 +25,17 @@ type WebsocketMessage struct { // should either omit the field or pass an empty value as it is ignored. Args []string `json:"args,omitempty"` - server *server.Server - inbound bool } +type WebsocketHandler struct { + Server *server.Server + Mutex sync.Mutex + Connection *websocket.Conn +} + +// Handle a request for a specific server websocket. This will handle inbound requests as well +// as ensure that any console output is also passed down the wire on the socket. func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { c, err := rt.upgrader.Upgrade(w, r, nil) if err != nil { @@ -39,12 +45,27 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http defer c.Close() s := rt.Servers.Get(ps.ByName("server")) + handler := WebsocketHandler{ + Server: s, + Mutex: sync.Mutex{}, + Connection: c, + } + + handleOutput := func(data string) { + handler.SendJson(&WebsocketMessage{ + Event: "console output", + Args: []string{data}, + }) + } + + s.AddListener(server.ConsoleOutputEvent, &handleOutput) + defer s.RemoveListener(server.ConsoleOutputEvent, &handleOutput) for { - j := WebsocketMessage{server: s, inbound: true} + j := WebsocketMessage{inbound: true} if _, _, err := c.ReadMessage(); err != nil { - if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseServiceRestart) { + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseServiceRestart) { zap.S().Errorw("error handling websocket message", zap.Error(err)) } break @@ -57,34 +78,42 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http continue } - fmt.Printf("%s received: %s = %s\n", s.Uuid, j.Event, strings.Join(j.Args, " ")) - if err := j.HandleInbound(c); err != nil { + if err := handler.HandleInbound(j); err != nil { zap.S().Warnw("error handling inbound websocket request", zap.Error(err)) break } } - - zap.S().Debugw("disconnected from instance", zap.String("ip", c.RemoteAddr().String())) } -func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error { - if !wsm.inbound { +// Perform a blocking send operation on the websocket since we want to avoid any +// concurrent writes to the connection, which would cause a runtime panic and cause +// the program to crash out. +func (wsh *WebsocketHandler) SendJson(v interface{}) error { + wsh.Mutex.Lock() + defer wsh.Mutex.Unlock() + + return wsh.Connection.WriteJSON(v) +} + +// Handle the inbound socket request and route it to the proper server action. +func (wsh *WebsocketHandler) HandleInbound(m WebsocketMessage) error { + if !m.inbound { return errors.New("cannot handle websocket message, not an inbound connection") } - switch wsm.Event { + switch m.Event { case "set state": { var err error - switch strings.Join(wsm.Args, "") { + switch strings.Join(m.Args, "") { case "start": - err = wsm.server.Environment.Start() + err = wsh.Server.Environment.Start() break case "stop": - err = wsm.server.Environment.Stop() + err = wsh.Server.Environment.Stop() break case "restart": - err = wsm.server.Environment.Terminate(os.Kill) + err = wsh.Server.Environment.Terminate(os.Kill) break } @@ -94,15 +123,15 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error { } case "send logs": { - logs, err := wsm.server.Environment.Readlog(1024 * 5) + logs, err := wsh.Server.Environment.Readlog(1024 * 5) if err != nil { return err } for _, line := range logs { - c.WriteJSON(&WebsocketMessage{ + wsh.SendJson(&WebsocketMessage{ Event: "console output", - Args: []string{line}, + Args: []string{line}, }) } @@ -110,9 +139,9 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error { } case "send command": { - return wsm.server.Environment.SendCommand(strings.Join(wsm.Args, "")) + return wsh.Server.Environment.SendCommand(strings.Join(m.Args, "")) } } return nil -} \ No newline at end of file +}