Improve event emitter/subscription abilities

This commit is contained in:
Dane Everitt 2020-01-18 14:04:26 -08:00
parent c6fcd8cabb
commit 99a11f81c3
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
8 changed files with 131 additions and 103 deletions

View File

@ -1,6 +1,10 @@
package server package server
import "io" import (
"fmt"
"github.com/mitchellh/colorstring"
"io"
)
type Console struct { type Console struct {
Server *Server Server *Server
@ -19,3 +23,12 @@ func (c Console) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
// Sends output to the server console formatted to appear correctly as being sent
// from Wings.
func (s *Server) PublishConsoleOutputFromDaemon(data string) {
s.Events().Publish(
ConsoleOutputEvent,
colorstring.Color(fmt.Sprintf("[yellow][bold][Pterodactyl Daemon]:[default] %s", data)),
)
}

View File

@ -37,7 +37,7 @@ func (s *Server) handleServerCrash() error {
if !s.CrashDetection.Enabled { if !s.CrashDetection.Enabled {
zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid)) zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid))
s.SendConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.") s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.")
} }
return nil return nil
@ -56,15 +56,15 @@ func (s *Server) handleServerCrash() error {
return nil return nil
} }
s.SendConsoleOutputFromDaemon("---------- Detected server process in a crashed state! ----------") s.PublishConsoleOutputFromDaemon("---------- Detected server process in a crashed state! ----------")
s.SendConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode)) s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode))
s.SendConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled))
c := s.CrashDetection.lastCrash c := s.CrashDetection.lastCrash
// If the last crash time was within the last 60 seconds we do not want to perform // If the last crash time was within the last 60 seconds we do not want to perform
// an automatic reboot of the process. Return an error that can be handled. // an automatic reboot of the process. Return an error that can be handled.
if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) { if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) {
s.SendConsoleOutputFromDaemon("Aborting automatic reboot: last crash occurred less than 60 seconds ago.") s.PublishConsoleOutputFromDaemon("Aborting automatic reboot: last crash occurred less than 60 seconds ago.")
return &crashTooFrequent{} return &crashTooFrequent{}
} }

View File

@ -388,7 +388,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
s := bufio.NewScanner(r) s := bufio.NewScanner(r)
for s.Scan() { for s.Scan() {
d.Server.Emit(ConsoleOutputEvent, s.Text()) d.Server.Events().Publish(ConsoleOutputEvent, s.Text())
} }
if err := s.Err(); err != nil { if err := s.Err(); err != nil {
@ -450,7 +450,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
} }
b, _ := json.Marshal(s.Resources) b, _ := json.Marshal(s.Resources)
s.Emit(StatsEvent, string(b)) s.Events().Publish(StatsEvent, string(b))
} }
}(d.Server) }(d.Server)

View File

@ -1,14 +1,9 @@
package server package server
import ( import (
"fmt" "sync"
"github.com/mitchellh/colorstring"
) )
type EventListeners map[string][]EventListenerFunction
type EventListenerFunction *func(string)
// Defines all of the possible output events for a server. // Defines all of the possible output events for a server.
// noinspection GoNameStartsWithPackageName // noinspection GoNameStartsWithPackageName
const ( const (
@ -19,47 +14,63 @@ const (
StatsEvent = "stats" StatsEvent = "stats"
) )
// Adds an event listener for the server instance. type Event struct {
func (s *Server) AddListener(event string, f EventListenerFunction) { Data string
if s.listeners == nil { Topic string
s.listeners = make(map[string][]EventListenerFunction) }
type EventBus struct {
subscribers map[string][]chan Event
mu sync.Mutex
}
// Returns the server's emitter instance.
func (s *Server) Events() *EventBus {
if s.emitter == nil {
s.emitter = &EventBus{
subscribers: map[string][]chan Event{},
}
} }
if _, ok := s.listeners[event]; ok { return s.emitter
s.listeners[event] = append(s.listeners[event], f) }
// Publish data to a given topic.
func (e *EventBus) Publish(topic string, data string) {
e.mu.Lock()
defer e.mu.Unlock()
if ch, ok := e.subscribers[topic]; ok {
go func(data Event, cs []chan Event) {
for _, channel := range cs {
channel <- data
}
}(Event{Data: data, Topic: topic}, ch)
}
}
// Subscribe to an emitter topic using a channel.
func (e *EventBus) Subscribe(topic string, ch chan Event) {
e.mu.Lock()
defer e.mu.Unlock()
if p, ok := e.subscribers[topic]; ok {
e.subscribers[topic] = append(p, ch)
} else { } else {
s.listeners[event] = []EventListenerFunction{f} e.subscribers[topic] = append([]chan Event{}, ch)
} }
} }
// Removes the event listener for the server instance. // Unsubscribe a channel from a topic.
func (s *Server) RemoveListener(event string, f EventListenerFunction) { func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
if _, ok := s.listeners[event]; ok { e.mu.Lock()
for i := range s.listeners[event] { defer e.mu.Unlock()
if s.listeners[event][i] == f {
s.listeners[event] = append(s.listeners[event][:i], s.listeners[event][i+1:]...)
break
}
}
}
}
// Emits an event to all of the active listeners for a server. if _, ok := e.subscribers[topic]; ok {
func (s *Server) Emit(event string, data string) { for i := range e.subscribers[topic] {
if _, ok := s.listeners[event]; ok { if ch == e.subscribers[topic][i] {
for _, handler := range s.listeners[event] { e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...)
go func(f EventListenerFunction, d string) { }
(*f)(d)
}(handler, data)
} }
} }
} }
// Sends output to the server console formatted to appear correctly as being sent
// from Wings.
func (s *Server) SendConsoleOutputFromDaemon(data string) {
s.Emit(
ConsoleOutputEvent,
colorstring.Color(fmt.Sprintf("[yellow][bold][Pterodactyl Daemon]:[default] %s", data)),
)
}

View File

@ -268,7 +268,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) {
} }
go func(id string) { go func(id string) {
ip.Server.Emit(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...")
if err := ip.StreamOutput(id); err != nil { if err := ip.StreamOutput(id); err != nil {
zap.S().Errorw( zap.S().Errorw(
"error handling streaming output for server install process", "error handling streaming output for server install process",
@ -276,7 +276,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) {
zap.Error(err), zap.Error(err),
) )
} }
ip.Server.Emit(DaemonMessageEvent, "Installation process completed.") ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.")
}(r.ID) }(r.ID)
sChann, eChann := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning) sChann, eChann := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning)
@ -309,7 +309,7 @@ func (ip *InstallationProcess) StreamOutput(id string) error {
s := bufio.NewScanner(reader) s := bufio.NewScanner(reader)
for s.Scan() { for s.Scan() {
ip.Server.Emit(InstallOutputEvent, s.Text()) ip.Server.Events().Publish(InstallOutputEvent, s.Text())
} }
if err := s.Err(); err != nil { if err := s.Err(); err != nil {

View File

@ -6,19 +6,24 @@ import (
"strings" "strings"
) )
// Adds all of the internal event listeners we want to use for a server. // Adds all of the internal event listeners we want to use for a server.
func (s *Server) AddEventListeners() { func (s *Server) AddEventListeners() {
s.AddListener(ConsoleOutputEvent, s.onConsoleOutput()) consoleChannel := make(chan Event)
} s.Events().Subscribe(ConsoleOutputEvent, consoleChannel)
var onConsoleOutputListener func(string) go func() {
for {
select {
case data := <-consoleChannel:
s.onConsoleOutput(data.Data)
}
}
}()
}
// Custom listener for console output events that will check if the given line // Custom listener for console output events that will check if the given line
// of output matches one that should mark the server as started or not. // of output matches one that should mark the server as started or not.
func (s *Server) onConsoleOutput() *func(string) { func (s *Server) onConsoleOutput(data string) {
if onConsoleOutputListener == nil {
onConsoleOutputListener = func (data string) {
// If the specific line of output is one that would mark the server as started, // If the specific line of output is one that would mark the server as started,
// set the server to that state. Only do this if the server is not currently stopped // set the server to that state. Only do this if the server is not currently stopped
// or stopping. // or stopping.
@ -38,8 +43,4 @@ func (s *Server) onConsoleOutput() *func(string) {
s.SetState(ProcessStoppingState) s.SetState(ProcessStoppingState)
} }
} }
}
}
return &onConsoleOutputListener
} }

View File

@ -64,8 +64,8 @@ type Server struct {
// certain long operations return faster. For example, FS disk space usage. // certain long operations return faster. For example, FS disk space usage.
Cache *cache.Cache `json:"-" yaml:"-"` Cache *cache.Cache `json:"-" yaml:"-"`
// All of the registered event listeners for this server instance. // Events emitted by the server instance.
listeners EventListeners emitter *EventBus
// Defines the process configuration for the server instance. This is dynamically // Defines the process configuration for the server instance. This is dynamically
// fetched from the Pterodactyl Server instance each time the server process is // fetched from the Pterodactyl Server instance each time the server process is
@ -199,7 +199,6 @@ func LoadDirectory(dir string, cfg *config.SystemConfiguration) error {
// Initializes the default required internal struct components for a Server. // Initializes the default required internal struct components for a Server.
func (s *Server) Init() { func (s *Server) Init() {
s.listeners = make(map[string][]EventListenerFunction)
s.mutex = &sync.Mutex{} s.mutex = &sync.Mutex{}
} }
@ -357,7 +356,7 @@ func (s *Server) SetState(state string) error {
zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State)) zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State))
// Emit the event to any listeners that are currently registered. // Emit the event to any listeners that are currently registered.
s.Emit(StatusEvent, s.State) s.Events().Publish(StatusEvent, s.State)
// If server was in an online state, and is now in an offline state we should handle // If server was in an online state, and is now in an offline state we should handle
// that as a crash event. In that scenario, check the last crash time, and the crash // that as a crash event. In that scenario, check the last crash time, and the crash

View File

@ -164,7 +164,6 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
JWT: nil, JWT: nil,
} }
// Register all of the event handlers.
events := []string{ events := []string{
server.StatsEvent, server.StatsEvent,
server.StatusEvent, server.StatusEvent,
@ -173,24 +172,29 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
server.DaemonMessageEvent, server.DaemonMessageEvent,
} }
var eventFuncs = make(map[string]*func(string)) eventChannel := make(chan server.Event)
for _, event := range events { for _, event := range events {
var e = event s.Events().Subscribe(event, eventChannel)
var fn = func(data string) { }
defer func() {
for _, event := range events {
s.Events().Unsubscribe(event, eventChannel)
}
close(eventChannel)
}()
// Listen for different events emitted by the server and respond to them appropriately.
go func() {
for {
select {
case d := <-eventChannel:
handler.SendJson(&WebsocketMessage{ handler.SendJson(&WebsocketMessage{
Event: e, Event: d.Topic,
Args: []string{data}, Args: []string{d.Data},
}) })
} }
eventFuncs[event] = &fn
s.AddListener(event, &fn)
}
// When done with the socket, remove all of the event handlers we had registered.
defer func() {
for event, action := range eventFuncs {
s.RemoveListener(event, action)
} }
}() }()
@ -354,7 +358,7 @@ func (wsh *WebsocketHandler) HandleInbound(m WebsocketMessage) error {
// On every authentication event, send the current server status back // On every authentication event, send the current server status back
// to the client. :) // to the client. :)
wsh.Server.Emit(server.StatusEvent, wsh.Server.State) wsh.Server.Events().Publish(server.StatusEvent, wsh.Server.State)
wsh.unsafeSendJson(WebsocketMessage{ wsh.unsafeSendJson(WebsocketMessage{
Event: AuthenticationSuccessEvent, Event: AuthenticationSuccessEvent,