From 99a11f81c3df63c14c6d63cbb5486c446b1d2d29 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sat, 18 Jan 2020 14:04:26 -0800 Subject: [PATCH] Improve event emitter/subscription abilities --- server/console.go | 19 ++++++-- server/crash.go | 10 ++-- server/environment_docker.go | 4 +- server/events.go | 95 ++++++++++++++++++++---------------- server/install.go | 6 +-- server/listeners.go | 57 +++++++++++----------- server/server.go | 7 ++- websocket.go | 36 ++++++++------ 8 files changed, 131 insertions(+), 103 deletions(-) diff --git a/server/console.go b/server/console.go index 1832dc0..59a3b55 100644 --- a/server/console.go +++ b/server/console.go @@ -1,9 +1,13 @@ package server -import "io" +import ( + "fmt" + "github.com/mitchellh/colorstring" + "io" +) type Console struct { - Server *Server + Server *Server HandlerFunc *func(string) } @@ -18,4 +22,13 @@ func (c Console) Write(b []byte) (int, error) { } return len(b), nil -} \ No newline at end of file +} + +// Sends output to the server console formatted to appear correctly as being sent +// from Wings. +func (s *Server) PublishConsoleOutputFromDaemon(data string) { + s.Events().Publish( + ConsoleOutputEvent, + colorstring.Color(fmt.Sprintf("[yellow][bold][Pterodactyl Daemon]:[default] %s", data)), + ) +} diff --git a/server/crash.go b/server/crash.go index 7b7c53c..328193e 100644 --- a/server/crash.go +++ b/server/crash.go @@ -37,7 +37,7 @@ func (s *Server) handleServerCrash() error { if !s.CrashDetection.Enabled { zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid)) - s.SendConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.") + s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.") } return nil @@ -56,15 +56,15 @@ func (s *Server) handleServerCrash() error { return nil } - s.SendConsoleOutputFromDaemon("---------- Detected server process in a crashed state! ----------") - s.SendConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode)) - s.SendConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) + s.PublishConsoleOutputFromDaemon("---------- Detected server process in a crashed state! ----------") + s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode)) + s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) c := s.CrashDetection.lastCrash // If the last crash time was within the last 60 seconds we do not want to perform // an automatic reboot of the process. Return an error that can be handled. if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) { - s.SendConsoleOutputFromDaemon("Aborting automatic reboot: last crash occurred less than 60 seconds ago.") + s.PublishConsoleOutputFromDaemon("Aborting automatic reboot: last crash occurred less than 60 seconds ago.") return &crashTooFrequent{} } diff --git a/server/environment_docker.go b/server/environment_docker.go index 76de4f6..e7a26f9 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -388,7 +388,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { s := bufio.NewScanner(r) for s.Scan() { - d.Server.Emit(ConsoleOutputEvent, s.Text()) + d.Server.Events().Publish(ConsoleOutputEvent, s.Text()) } if err := s.Err(); err != nil { @@ -450,7 +450,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error { } b, _ := json.Marshal(s.Resources) - s.Emit(StatsEvent, string(b)) + s.Events().Publish(StatsEvent, string(b)) } }(d.Server) diff --git a/server/events.go b/server/events.go index 086432a..f3e19b0 100644 --- a/server/events.go +++ b/server/events.go @@ -1,14 +1,9 @@ package server import ( - "fmt" - "github.com/mitchellh/colorstring" + "sync" ) -type EventListeners map[string][]EventListenerFunction - -type EventListenerFunction *func(string) - // Defines all of the possible output events for a server. // noinspection GoNameStartsWithPackageName const ( @@ -19,47 +14,63 @@ const ( StatsEvent = "stats" ) -// Adds an event listener for the server instance. -func (s *Server) AddListener(event string, f EventListenerFunction) { - if s.listeners == nil { - s.listeners = make(map[string][]EventListenerFunction) +type Event struct { + Data string + Topic string +} + +type EventBus struct { + subscribers map[string][]chan Event + mu sync.Mutex +} + +// Returns the server's emitter instance. +func (s *Server) Events() *EventBus { + if s.emitter == nil { + s.emitter = &EventBus{ + subscribers: map[string][]chan Event{}, + } } - if _, ok := s.listeners[event]; ok { - s.listeners[event] = append(s.listeners[event], f) - } else { - s.listeners[event] = []EventListenerFunction{f} + return s.emitter +} + +// Publish data to a given topic. +func (e *EventBus) Publish(topic string, data string) { + e.mu.Lock() + defer e.mu.Unlock() + + if ch, ok := e.subscribers[topic]; ok { + go func(data Event, cs []chan Event) { + for _, channel := range cs { + channel <- data + } + }(Event{Data: data, Topic: topic}, ch) } } -// Removes the event listener for the server instance. -func (s *Server) RemoveListener(event string, f EventListenerFunction) { - if _, ok := s.listeners[event]; ok { - for i := range s.listeners[event] { - if s.listeners[event][i] == f { - s.listeners[event] = append(s.listeners[event][:i], s.listeners[event][i+1:]...) - break +// Subscribe to an emitter topic using a channel. +func (e *EventBus) Subscribe(topic string, ch chan Event) { + e.mu.Lock() + defer e.mu.Unlock() + + if p, ok := e.subscribers[topic]; ok { + e.subscribers[topic] = append(p, ch) + } else { + e.subscribers[topic] = append([]chan Event{}, ch) + } +} + +// Unsubscribe a channel from a topic. +func (e *EventBus) Unsubscribe(topic string, ch chan Event) { + e.mu.Lock() + defer e.mu.Unlock() + + if _, ok := e.subscribers[topic]; ok { + for i := range e.subscribers[topic] { + if ch == e.subscribers[topic][i] { + e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...) } } } -} - -// Emits an event to all of the active listeners for a server. -func (s *Server) Emit(event string, data string) { - if _, ok := s.listeners[event]; ok { - for _, handler := range s.listeners[event] { - go func(f EventListenerFunction, d string) { - (*f)(d) - }(handler, data) - } - } -} - -// Sends output to the server console formatted to appear correctly as being sent -// from Wings. -func (s *Server) SendConsoleOutputFromDaemon(data string) { - s.Emit( - ConsoleOutputEvent, - colorstring.Color(fmt.Sprintf("[yellow][bold][Pterodactyl Daemon]:[default] %s", data)), - ) -} +} \ No newline at end of file diff --git a/server/install.go b/server/install.go index 3c85e5d..8f95186 100644 --- a/server/install.go +++ b/server/install.go @@ -268,7 +268,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) { } go func(id string) { - ip.Server.Emit(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") + ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") if err := ip.StreamOutput(id); err != nil { zap.S().Errorw( "error handling streaming output for server install process", @@ -276,7 +276,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) { zap.Error(err), ) } - ip.Server.Emit(DaemonMessageEvent, "Installation process completed.") + ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.") }(r.ID) sChann, eChann := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning) @@ -309,7 +309,7 @@ func (ip *InstallationProcess) StreamOutput(id string) error { s := bufio.NewScanner(reader) for s.Scan() { - ip.Server.Emit(InstallOutputEvent, s.Text()) + ip.Server.Events().Publish(InstallOutputEvent, s.Text()) } if err := s.Err(); err != nil { diff --git a/server/listeners.go b/server/listeners.go index 8a9ff73..4fcc353 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -6,40 +6,41 @@ import ( "strings" ) - // Adds all of the internal event listeners we want to use for a server. func (s *Server) AddEventListeners() { - s.AddListener(ConsoleOutputEvent, s.onConsoleOutput()) -} + consoleChannel := make(chan Event) + s.Events().Subscribe(ConsoleOutputEvent, consoleChannel) -var onConsoleOutputListener func(string) + go func() { + for { + select { + case data := <-consoleChannel: + s.onConsoleOutput(data.Data) + } + } + }() +} // Custom listener for console output events that will check if the given line // of output matches one that should mark the server as started or not. -func (s *Server) onConsoleOutput() *func(string) { - if onConsoleOutputListener == nil { - onConsoleOutputListener = func (data string) { - // If the specific line of output is one that would mark the server as started, - // set the server to that state. Only do this if the server is not currently stopped - // or stopping. - if s.State == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { - zap.S().Debugw( - "detected server in running state based on line output", zap.String("match", s.processConfiguration.Startup.Done), zap.String("against", data), - ) +func (s *Server) onConsoleOutput(data string) { + // If the specific line of output is one that would mark the server as started, + // set the server to that state. Only do this if the server is not currently stopped + // or stopping. + if s.State == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { + zap.S().Debugw( + "detected server in running state based on line output", zap.String("match", s.processConfiguration.Startup.Done), zap.String("against", data), + ) - s.SetState(ProcessRunningState) - } - - // If the command sent to the server is one that should stop the server we will need to - // set the server to be in a stopping state, otherwise crash detection will kick in and - // cause the server to unexpectedly restart on the user. - if s.State == ProcessStartingState || s.State == ProcessRunningState { - if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value { - s.SetState(ProcessStoppingState) - } - } - } + s.SetState(ProcessRunningState) } - return &onConsoleOutputListener -} \ No newline at end of file + // If the command sent to the server is one that should stop the server we will need to + // set the server to be in a stopping state, otherwise crash detection will kick in and + // cause the server to unexpectedly restart on the user. + if s.State == ProcessStartingState || s.State == ProcessRunningState { + if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value { + s.SetState(ProcessStoppingState) + } + } +} diff --git a/server/server.go b/server/server.go index ff9df74..491ecf3 100644 --- a/server/server.go +++ b/server/server.go @@ -64,8 +64,8 @@ type Server struct { // certain long operations return faster. For example, FS disk space usage. Cache *cache.Cache `json:"-" yaml:"-"` - // All of the registered event listeners for this server instance. - listeners EventListeners + // Events emitted by the server instance. + emitter *EventBus // Defines the process configuration for the server instance. This is dynamically // fetched from the Pterodactyl Server instance each time the server process is @@ -199,7 +199,6 @@ func LoadDirectory(dir string, cfg *config.SystemConfiguration) error { // Initializes the default required internal struct components for a Server. func (s *Server) Init() { - s.listeners = make(map[string][]EventListenerFunction) s.mutex = &sync.Mutex{} } @@ -357,7 +356,7 @@ func (s *Server) SetState(state string) error { zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State)) // Emit the event to any listeners that are currently registered. - s.Emit(StatusEvent, s.State) + s.Events().Publish(StatusEvent, s.State) // If server was in an online state, and is now in an offline state we should handle // that as a crash event. In that scenario, check the last crash time, and the crash diff --git a/websocket.go b/websocket.go index 332dc1e..e9a9921 100644 --- a/websocket.go +++ b/websocket.go @@ -164,7 +164,6 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http JWT: nil, } - // Register all of the event handlers. events := []string{ server.StatsEvent, server.StatusEvent, @@ -173,24 +172,29 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http server.DaemonMessageEvent, } - var eventFuncs = make(map[string]*func(string)) + eventChannel := make(chan server.Event) for _, event := range events { - var e = event - var fn = func(data string) { - handler.SendJson(&WebsocketMessage{ - Event: e, - Args: []string{data}, - }) - } - - eventFuncs[event] = &fn - s.AddListener(event, &fn) + s.Events().Subscribe(event, eventChannel) } - // When done with the socket, remove all of the event handlers we had registered. defer func() { - for event, action := range eventFuncs { - s.RemoveListener(event, action) + for _, event := range events { + s.Events().Unsubscribe(event, eventChannel) + } + + close(eventChannel) + }() + + // Listen for different events emitted by the server and respond to them appropriately. + go func() { + for { + select { + case d := <-eventChannel: + handler.SendJson(&WebsocketMessage{ + Event: d.Topic, + Args: []string{d.Data}, + }) + } } }() @@ -354,7 +358,7 @@ func (wsh *WebsocketHandler) HandleInbound(m WebsocketMessage) error { // On every authentication event, send the current server status back // to the client. :) - wsh.Server.Emit(server.StatusEvent, wsh.Server.State) + wsh.Server.Events().Publish(server.StatusEvent, wsh.Server.State) wsh.unsafeSendJson(WebsocketMessage{ Event: AuthenticationSuccessEvent,