diff --git a/events/events.go b/events/events.go index c0d8213..1bb4327 100644 --- a/events/events.go +++ b/events/events.go @@ -2,10 +2,11 @@ package events import ( "strings" - "sync" -) -type Listener chan Event + "emperror.dev/errors" + "github.com/goccy/go-json" + "github.com/pterodactyl/wings/system" +) // Event represents an Event sent over a Bus. type Event struct { @@ -15,137 +16,51 @@ type Event struct { // Bus represents an Event Bus. type Bus struct { - listenersMx sync.Mutex - listeners map[string][]Listener + *system.SinkPool } -// NewBus returns a new empty Event Bus. +// NewBus returns a new empty Bus. This is simply a nicer wrapper around the +// system.SinkPool implementation that allows for more simplistic usage within +// the codebase. +// +// All of the events emitted out of this bus are byte slices that can be decoded +// back into an events.Event interface. func NewBus() *Bus { return &Bus{ - listeners: make(map[string][]Listener), - } -} - -// Off unregisters a listener from the specified topics on the Bus. -func (b *Bus) Off(listener Listener, topics ...string) { - b.listenersMx.Lock() - defer b.listenersMx.Unlock() - - var closed bool - - for _, topic := range topics { - ok := b.off(topic, listener) - if !closed && ok { - close(listener) - closed = true - } - } -} - -func (b *Bus) off(topic string, listener Listener) bool { - listeners, ok := b.listeners[topic] - if !ok { - return false - } - for i, l := range listeners { - if l != listener { - continue - } - - listeners = append(listeners[:i], listeners[i+1:]...) - b.listeners[topic] = listeners - return true - } - return false -} - -// On registers a listener to the specified topics on the Bus. -func (b *Bus) On(listener Listener, topics ...string) { - b.listenersMx.Lock() - defer b.listenersMx.Unlock() - - for _, topic := range topics { - b.on(topic, listener) - } -} - -func (b *Bus) on(topic string, listener Listener) { - listeners, ok := b.listeners[topic] - if !ok { - b.listeners[topic] = []Listener{listener} - } else { - b.listeners[topic] = append(listeners, listener) + system.NewSinkPool(), } } // Publish publishes a message to the Bus. func (b *Bus) Publish(topic string, data interface{}) { - // Some of our topics for the socket support passing a more specific namespace, + // Some of our actions for the socket support passing a more specific namespace, // such as "backup completed:1234" to indicate which specific backup was completed. // // In these cases, we still need to send the event using the standard listener // name of "backup completed". if strings.Contains(topic, ":") { parts := strings.SplitN(topic, ":", 2) - if len(parts) == 2 { topic = parts[0] } } - b.listenersMx.Lock() - defer b.listenersMx.Unlock() - - listeners, ok := b.listeners[topic] - if !ok { - return + enc, err := json.Marshal(Event{Topic: topic, Data: data}) + if err != nil { + panic(errors.WithStack(err)) } - if len(listeners) < 1 { - return - } - - var wg sync.WaitGroup - event := Event{Topic: topic, Data: data} - for _, listener := range listeners { - l := listener - wg.Add(1) - go func(l Listener, event Event) { - defer wg.Done() - l <- event - }(l, event) - } - wg.Wait() + b.Push(enc) } -// Destroy destroys the Event Bus by unregistering and closing all listeners. -func (b *Bus) Destroy() { - b.listenersMx.Lock() - defer b.listenersMx.Unlock() - - // Track what listeners have already been closed. Because the same listener - // can be listening on multiple topics, we need a way to essentially - // "de-duplicate" all the listeners across all the topics. - var closed []Listener - - for _, listeners := range b.listeners { - for _, listener := range listeners { - if contains(closed, listener) { - continue - } - - close(listener) - closed = append(closed, listener) - } - } - - b.listeners = make(map[string][]Listener) +// MustDecode decodes the event byte slice back into an events.Event struct or +// panics if an error is encountered during this process. +func MustDecode(data []byte) (e Event) { + MustDecodeTo(data, &e) + return } -func contains(closed []Listener, listener Listener) bool { - for _, c := range closed { - if c == listener { - return true - } +func MustDecodeTo(data []byte, v interface{}) { + if err := json.Unmarshal(data, &v); err != nil { + panic(errors.Wrap(err, "events: failed to decode event data into interface")) } - return false } diff --git a/events/events_test.go b/events/events_test.go index 542a8e2..00e1083 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -9,162 +9,90 @@ import ( func TestNewBus(t *testing.T) { g := Goblin(t) - bus := NewBus() - g.Describe("NewBus", func() { - g.It("is not nil", func() { - g.Assert(bus).IsNotNil("Bus expected to not be nil") - g.Assert(bus.listeners).IsNotNil("Bus#listeners expected to not be nil") - }) - }) -} - -func TestBus_Off(t *testing.T) { - g := Goblin(t) - - const topic = "test" - - g.Describe("Off", func() { - g.It("unregisters listener", func() { - bus := NewBus() - - g.Assert(bus.listeners[topic]).IsNotNil() - g.Assert(len(bus.listeners[topic])).IsZero() - listener := make(chan Event) - bus.On(listener, topic) - g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") - - bus.Off(listener, topic) - g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners") + g.Describe("Events", func() { + var bus *Bus + g.BeforeEach(func() { + bus = NewBus() }) - g.It("unregisters correct listener", func() { - bus := NewBus() - - listener := make(chan Event) - listener2 := make(chan Event) - listener3 := make(chan Event) - bus.On(listener, topic) - bus.On(listener2, topic) - bus.On(listener3, topic) - g.Assert(len(bus.listeners[topic])).Equal(3, "Listeners were not registered") - - bus.Off(listener, topic) - bus.Off(listener3, topic) - g.Assert(len(bus.listeners[topic])).Equal(1, "Expected 1 listener to remain") - - if bus.listeners[topic][0] != listener2 { - // A normal Assert does not properly compare channels. - g.Fail("wrong listener unregistered") - } - - // Cleanup - bus.Off(listener2, topic) - }) - }) -} - -func TestBus_On(t *testing.T) { - g := Goblin(t) - - const topic = "test" - - g.Describe("On", func() { - g.It("registers listener", func() { - bus := NewBus() - - g.Assert(bus.listeners[topic]).IsNotNil() - g.Assert(len(bus.listeners[topic])).IsZero() - listener := make(chan Event) - bus.On(listener, topic) - g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") - - if bus.listeners[topic][0] != listener { - // A normal Assert does not properly compare channels. - g.Fail("wrong listener registered") - } - - // Cleanup - bus.Off(listener, topic) - }) - }) -} - -func TestBus_Publish(t *testing.T) { - g := Goblin(t) - - const topic = "test" - const message = "this is a test message!" - - g.Describe("Publish", func() { - g.It("publishes message", func() { - bus := NewBus() - - g.Assert(bus.listeners[topic]).IsNotNil() - g.Assert(len(bus.listeners[topic])).IsZero() - listener := make(chan Event) - bus.On(listener, topic) - g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") - - done := make(chan struct{}, 1) - go func() { - select { - case m := <-listener: - g.Assert(m.Topic).Equal(topic) - g.Assert(m.Data).Equal(message) - case <-time.After(1 * time.Second): - g.Fail("listener did not receive message in time") - } - done <- struct{}{} - }() - bus.Publish(topic, message) - <-done - - // Cleanup - bus.Off(listener, topic) + g.Describe("NewBus", func() { + g.It("is not nil", func() { + g.Assert(bus).IsNotNil("Bus expected to not be nil") + }) }) - g.It("publishes message to all listeners", func() { - bus := NewBus() + g.Describe("Publish", func() { + const topic = "test" + const message = "this is a test message!" - g.Assert(bus.listeners[topic]).IsNotNil() - g.Assert(len(bus.listeners[topic])).IsZero() - listener := make(chan Event) - listener2 := make(chan Event) - listener3 := make(chan Event) - bus.On(listener, topic) - bus.On(listener2, topic) - bus.On(listener3, topic) - g.Assert(len(bus.listeners[topic])).Equal(3, "Listener was not registered") + g.It("publishes message", func() { + bus := NewBus() - done := make(chan struct{}, 1) - go func() { - for i := 0; i < 3; i++ { + listener := make(chan []byte) + bus.On(listener) + + done := make(chan struct{}, 1) + go func() { select { - case m := <-listener: - g.Assert(m.Topic).Equal(topic) - g.Assert(m.Data).Equal(message) - case m := <-listener2: - g.Assert(m.Topic).Equal(topic) - g.Assert(m.Data).Equal(message) - case m := <-listener3: + case v := <-listener: + m := MustDecode(v) g.Assert(m.Topic).Equal(topic) g.Assert(m.Data).Equal(message) case <-time.After(1 * time.Second): - g.Fail("all listeners did not receive the message in time") - i = 3 + g.Fail("listener did not receive message in time") } - } + done <- struct{}{} + }() + bus.Publish(topic, message) + <-done - done <- struct{}{} - }() - bus.Publish(topic, message) - <-done + // Cleanup + bus.Off(listener) + }) - // Cleanup - bus.Off(listener, topic) - bus.Off(listener2, topic) - bus.Off(listener3, topic) + g.It("publishes message to all listeners", func() { + bus := NewBus() + + listener := make(chan []byte) + listener2 := make(chan []byte) + listener3 := make(chan []byte) + bus.On(listener) + bus.On(listener2) + bus.On(listener3) + + done := make(chan struct{}, 1) + go func() { + for i := 0; i < 3; i++ { + select { + case v := <-listener: + m := MustDecode(v) + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case v := <-listener2: + m := MustDecode(v) + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case v := <-listener3: + m := MustDecode(v) + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case <-time.After(1 * time.Second): + g.Fail("all listeners did not receive the message in time") + i = 3 + } + } + + done <- struct{}{} + }() + bus.Publish(topic, message) + <-done + + // Cleanup + bus.Off(listener) + bus.Off(listener2) + bus.Off(listener3) + }) }) }) } diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index 204e3e6..ecb557e 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -7,9 +7,9 @@ import ( "emperror.dev/errors" "github.com/goccy/go-json" + "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/system" - "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/server" ) @@ -89,10 +89,11 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - eventChan := make(chan events.Event) + eventChan := make(chan []byte) logOutput := make(chan []byte, 8) installOutput := make(chan []byte, 4) - h.server.Events().On(eventChan, e...) + + h.server.Events().On(eventChan) // TODO: make a sinky h.server.Sink(system.LogSink).On(logOutput) h.server.Sink(system.InstallSink).On(installOutput) @@ -111,19 +112,20 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { select { case <-ctx.Done(): break - case e := <-logOutput: - sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}}) + case b := <-logOutput: + sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(b)}}) if sendErr == nil { continue } onError(server.ConsoleOutputEvent, sendErr) - case e := <-installOutput: - sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}}) + case b := <-installOutput: + sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(b)}}) if sendErr == nil { continue } onError(server.InstallOutputEvent, sendErr) - case e := <-eventChan: + case b := <-eventChan: + e := events.MustDecode(b) var sendErr error message := Message{Event: e.Topic} if str, ok := e.Data.(string); ok { @@ -149,7 +151,7 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { } // These functions will automatically close the channel if it hasn't been already. - h.server.Events().Off(eventChan, e...) + h.server.Events().Off(eventChan) h.server.Sink(system.LogSink).Off(logOutput) h.server.Sink(system.InstallSink).Off(installOutput) diff --git a/server/listeners.go b/server/listeners.go index e5a5e29..7546638 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -8,10 +8,10 @@ import ( "time" "github.com/apex/log" + "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/environment" - "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/remote" ) @@ -81,40 +81,44 @@ func (s *Server) processConsoleOutputEvent(v []byte) { // a server. These listeners can only be removed by deleting the server as they // should last for the duration of the process' lifetime. func (s *Server) StartEventListeners() { - state := make(chan events.Event) - stats := make(chan events.Event) - docker := make(chan events.Event) + c := make(chan []byte, 8) + limit := newDiskLimiter(s) + + s.Log().Debug("registering event listeners: console, state, resources...") + s.Environment.Events().On(c) + s.Environment.SetLogCallback(s.processConsoleOutputEvent) go func() { - l := newDiskLimiter(s) - for { select { - case e := <-state: - go func() { - // Reset the throttler when the process is started. - if e.Data == environment.ProcessStartingState { - l.Reset() - s.Throttler().Reset() - } - - s.OnStateChange() - }() - case e := <-stats: - go func() { - s.resources.UpdateStats(e.Data.(environment.Stats)) - - // If there is no disk space available at this point, trigger the server - // disk limiter logic which will start to stop the running instance. - if !s.Filesystem().HasSpaceAvailable(true) { - l.Trigger() - } - - s.Events().Publish(StatsEvent, s.Proc()) - }() - case e := <-docker: - go func() { + case v := <-c: + go func(v []byte, limit *diskSpaceLimiter) { + e := events.MustDecode(v) switch e.Topic { + case environment.ResourceEvent: + { + var stats struct { + Topic string + Data environment.Stats + } + events.MustDecodeTo(v, &stats) + s.resources.UpdateStats(stats.Data) + // If there is no disk space available at this point, trigger the server + // disk limiter logic which will start to stop the running instance. + if !s.Filesystem().HasSpaceAvailable(true) { + limit.Trigger() + } + s.Events().Publish(StatsEvent, s.Proc()) + } + case environment.StateChangeEvent: + { + // Reset the throttler when the process is started. + if e.Data == environment.ProcessStartingState { + limit.Reset() + s.Throttler().Reset() + } + s.OnStateChange() + } case environment.DockerImagePullStatus: s.Events().Publish(InstallOutputEvent, e.Data) case environment.DockerImagePullStarted: @@ -122,18 +126,13 @@ func (s *Server) StartEventListeners() { case environment.DockerImagePullCompleted: s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image") default: - s.Log().WithField("topic", e.Topic).Error("unhandled docker event topic") } - }() + }(v, limit) + case <-s.Context().Done(): + return } } }() - - s.Log().Debug("registering event listeners: console, state, resources...") - s.Environment.SetLogCallback(s.processConsoleOutputEvent) - s.Environment.Events().On(state, environment.StateChangeEvent) - s.Environment.Events().On(stats, environment.ResourceEvent) - s.Environment.Events().On(docker, dockerEvents...) } var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")