Simplify the event bus system; address pterodactyl/panel#3903

If my debugging is correct, this should address pterodactyl/panel#3903 in its entirety by addressing a few areas where it was possible for a channel to lock up and cause everything to block
This commit is contained in:
Dane Everitt 2022-02-02 21:02:10 -05:00
parent 0f2e9fcc0b
commit 72476c61ec
4 changed files with 143 additions and 299 deletions

View File

@ -2,10 +2,11 @@ package events
import ( import (
"strings" "strings"
"sync"
)
type Listener chan Event "emperror.dev/errors"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/system"
)
// Event represents an Event sent over a Bus. // Event represents an Event sent over a Bus.
type Event struct { type Event struct {
@ -15,137 +16,51 @@ type Event struct {
// Bus represents an Event Bus. // Bus represents an Event Bus.
type Bus struct { type Bus struct {
listenersMx sync.Mutex *system.SinkPool
listeners map[string][]Listener
} }
// NewBus returns a new empty Event Bus. // NewBus returns a new empty Bus. This is simply a nicer wrapper around the
// system.SinkPool implementation that allows for more simplistic usage within
// the codebase.
//
// All of the events emitted out of this bus are byte slices that can be decoded
// back into an events.Event interface.
func NewBus() *Bus { func NewBus() *Bus {
return &Bus{ return &Bus{
listeners: make(map[string][]Listener), system.NewSinkPool(),
}
}
// Off unregisters a listener from the specified topics on the Bus.
func (b *Bus) Off(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
var closed bool
for _, topic := range topics {
ok := b.off(topic, listener)
if !closed && ok {
close(listener)
closed = true
}
}
}
func (b *Bus) off(topic string, listener Listener) bool {
listeners, ok := b.listeners[topic]
if !ok {
return false
}
for i, l := range listeners {
if l != listener {
continue
}
listeners = append(listeners[:i], listeners[i+1:]...)
b.listeners[topic] = listeners
return true
}
return false
}
// On registers a listener to the specified topics on the Bus.
func (b *Bus) On(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
for _, topic := range topics {
b.on(topic, listener)
}
}
func (b *Bus) on(topic string, listener Listener) {
listeners, ok := b.listeners[topic]
if !ok {
b.listeners[topic] = []Listener{listener}
} else {
b.listeners[topic] = append(listeners, listener)
} }
} }
// Publish publishes a message to the Bus. // Publish publishes a message to the Bus.
func (b *Bus) Publish(topic string, data interface{}) { func (b *Bus) Publish(topic string, data interface{}) {
// Some of our topics for the socket support passing a more specific namespace, // Some of our actions for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed. // such as "backup completed:1234" to indicate which specific backup was completed.
// //
// In these cases, we still need to send the event using the standard listener // In these cases, we still need to send the event using the standard listener
// name of "backup completed". // name of "backup completed".
if strings.Contains(topic, ":") { if strings.Contains(topic, ":") {
parts := strings.SplitN(topic, ":", 2) parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 { if len(parts) == 2 {
topic = parts[0] topic = parts[0]
} }
} }
b.listenersMx.Lock() enc, err := json.Marshal(Event{Topic: topic, Data: data})
defer b.listenersMx.Unlock() if err != nil {
panic(errors.WithStack(err))
listeners, ok := b.listeners[topic]
if !ok {
return
} }
if len(listeners) < 1 { b.Push(enc)
return
}
var wg sync.WaitGroup
event := Event{Topic: topic, Data: data}
for _, listener := range listeners {
l := listener
wg.Add(1)
go func(l Listener, event Event) {
defer wg.Done()
l <- event
}(l, event)
}
wg.Wait()
} }
// Destroy destroys the Event Bus by unregistering and closing all listeners. // MustDecode decodes the event byte slice back into an events.Event struct or
func (b *Bus) Destroy() { // panics if an error is encountered during this process.
b.listenersMx.Lock() func MustDecode(data []byte) (e Event) {
defer b.listenersMx.Unlock() MustDecodeTo(data, &e)
return
// Track what listeners have already been closed. Because the same listener
// can be listening on multiple topics, we need a way to essentially
// "de-duplicate" all the listeners across all the topics.
var closed []Listener
for _, listeners := range b.listeners {
for _, listener := range listeners {
if contains(closed, listener) {
continue
}
close(listener)
closed = append(closed, listener)
}
}
b.listeners = make(map[string][]Listener)
} }
func contains(closed []Listener, listener Listener) bool { func MustDecodeTo(data []byte, v interface{}) {
for _, c := range closed { if err := json.Unmarshal(data, &v); err != nil {
if c == listener { panic(errors.Wrap(err, "events: failed to decode event data into interface"))
return true
}
} }
return false
} }

View File

@ -9,162 +9,90 @@ import (
func TestNewBus(t *testing.T) { func TestNewBus(t *testing.T) {
g := Goblin(t) g := Goblin(t)
bus := NewBus()
g.Describe("NewBus", func() { g.Describe("Events", func() {
g.It("is not nil", func() { var bus *Bus
g.Assert(bus).IsNotNil("Bus expected to not be nil") g.BeforeEach(func() {
g.Assert(bus.listeners).IsNotNil("Bus#listeners expected to not be nil") bus = NewBus()
})
})
}
func TestBus_Off(t *testing.T) {
g := Goblin(t)
const topic = "test"
g.Describe("Off", func() {
g.It("unregisters listener", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
bus.Off(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
}) })
g.It("unregisters correct listener", func() { g.Describe("NewBus", func() {
bus := NewBus() g.It("is not nil", func() {
g.Assert(bus).IsNotNil("Bus expected to not be nil")
listener := make(chan Event) })
listener2 := make(chan Event)
listener3 := make(chan Event)
bus.On(listener, topic)
bus.On(listener2, topic)
bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listeners were not registered")
bus.Off(listener, topic)
bus.Off(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Expected 1 listener to remain")
if bus.listeners[topic][0] != listener2 {
// A normal Assert does not properly compare channels.
g.Fail("wrong listener unregistered")
}
// Cleanup
bus.Off(listener2, topic)
})
})
}
func TestBus_On(t *testing.T) {
g := Goblin(t)
const topic = "test"
g.Describe("On", func() {
g.It("registers listener", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
if bus.listeners[topic][0] != listener {
// A normal Assert does not properly compare channels.
g.Fail("wrong listener registered")
}
// Cleanup
bus.Off(listener, topic)
})
})
}
func TestBus_Publish(t *testing.T) {
g := Goblin(t)
const topic = "test"
const message = "this is a test message!"
g.Describe("Publish", func() {
g.It("publishes message", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
bus.Off(listener, topic)
}) })
g.It("publishes message to all listeners", func() { g.Describe("Publish", func() {
bus := NewBus() const topic = "test"
const message = "this is a test message!"
g.Assert(bus.listeners[topic]).IsNotNil() g.It("publishes message", func() {
g.Assert(len(bus.listeners[topic])).IsZero() bus := NewBus()
listener := make(chan Event)
listener2 := make(chan Event)
listener3 := make(chan Event)
bus.On(listener, topic)
bus.On(listener2, topic)
bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listener was not registered")
done := make(chan struct{}, 1) listener := make(chan []byte)
go func() { bus.On(listener)
for i := 0; i < 3; i++ {
done := make(chan struct{}, 1)
go func() {
select { select {
case m := <-listener: case v := <-listener:
g.Assert(m.Topic).Equal(topic) m := MustDecode(v)
g.Assert(m.Data).Equal(message)
case m := <-listener2:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case m := <-listener3:
g.Assert(m.Topic).Equal(topic) g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message) g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
g.Fail("all listeners did not receive the message in time") g.Fail("listener did not receive message in time")
i = 3
} }
} done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
done <- struct{}{} // Cleanup
}() bus.Off(listener)
bus.Publish(topic, message) })
<-done
// Cleanup g.It("publishes message to all listeners", func() {
bus.Off(listener, topic) bus := NewBus()
bus.Off(listener2, topic)
bus.Off(listener3, topic) listener := make(chan []byte)
listener2 := make(chan []byte)
listener3 := make(chan []byte)
bus.On(listener)
bus.On(listener2)
bus.On(listener3)
done := make(chan struct{}, 1)
go func() {
for i := 0; i < 3; i++ {
select {
case v := <-listener:
m := MustDecode(v)
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case v := <-listener2:
m := MustDecode(v)
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case v := <-listener3:
m := MustDecode(v)
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("all listeners did not receive the message in time")
i = 3
}
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
bus.Off(listener)
bus.Off(listener2)
bus.Off(listener3)
})
}) })
}) })
} }

View File

@ -7,9 +7,9 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
"github.com/goccy/go-json" "github.com/goccy/go-json"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
) )
@ -89,10 +89,11 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
eventChan := make(chan events.Event) eventChan := make(chan []byte)
logOutput := make(chan []byte, 8) logOutput := make(chan []byte, 8)
installOutput := make(chan []byte, 4) installOutput := make(chan []byte, 4)
h.server.Events().On(eventChan, e...)
h.server.Events().On(eventChan) // TODO: make a sinky
h.server.Sink(system.LogSink).On(logOutput) h.server.Sink(system.LogSink).On(logOutput)
h.server.Sink(system.InstallSink).On(installOutput) h.server.Sink(system.InstallSink).On(installOutput)
@ -111,19 +112,20 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
break break
case e := <-logOutput: case b := <-logOutput:
sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}}) sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(b)}})
if sendErr == nil { if sendErr == nil {
continue continue
} }
onError(server.ConsoleOutputEvent, sendErr) onError(server.ConsoleOutputEvent, sendErr)
case e := <-installOutput: case b := <-installOutput:
sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}}) sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(b)}})
if sendErr == nil { if sendErr == nil {
continue continue
} }
onError(server.InstallOutputEvent, sendErr) onError(server.InstallOutputEvent, sendErr)
case e := <-eventChan: case b := <-eventChan:
e := events.MustDecode(b)
var sendErr error var sendErr error
message := Message{Event: e.Topic} message := Message{Event: e.Topic}
if str, ok := e.Data.(string); ok { if str, ok := e.Data.(string); ok {
@ -149,7 +151,7 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
} }
// These functions will automatically close the channel if it hasn't been already. // These functions will automatically close the channel if it hasn't been already.
h.server.Events().Off(eventChan, e...) h.server.Events().Off(eventChan)
h.server.Sink(system.LogSink).Off(logOutput) h.server.Sink(system.LogSink).Off(logOutput)
h.server.Sink(system.InstallSink).Off(installOutput) h.server.Sink(system.InstallSink).Off(installOutput)

View File

@ -8,10 +8,10 @@ import (
"time" "time"
"github.com/apex/log" "github.com/apex/log"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/remote" "github.com/pterodactyl/wings/remote"
) )
@ -81,40 +81,44 @@ func (s *Server) processConsoleOutputEvent(v []byte) {
// a server. These listeners can only be removed by deleting the server as they // a server. These listeners can only be removed by deleting the server as they
// should last for the duration of the process' lifetime. // should last for the duration of the process' lifetime.
func (s *Server) StartEventListeners() { func (s *Server) StartEventListeners() {
state := make(chan events.Event) c := make(chan []byte, 8)
stats := make(chan events.Event) limit := newDiskLimiter(s)
docker := make(chan events.Event)
s.Log().Debug("registering event listeners: console, state, resources...")
s.Environment.Events().On(c)
s.Environment.SetLogCallback(s.processConsoleOutputEvent)
go func() { go func() {
l := newDiskLimiter(s)
for { for {
select { select {
case e := <-state: case v := <-c:
go func() { go func(v []byte, limit *diskSpaceLimiter) {
// Reset the throttler when the process is started. e := events.MustDecode(v)
if e.Data == environment.ProcessStartingState {
l.Reset()
s.Throttler().Reset()
}
s.OnStateChange()
}()
case e := <-stats:
go func() {
s.resources.UpdateStats(e.Data.(environment.Stats))
// If there is no disk space available at this point, trigger the server
// disk limiter logic which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
l.Trigger()
}
s.Events().Publish(StatsEvent, s.Proc())
}()
case e := <-docker:
go func() {
switch e.Topic { switch e.Topic {
case environment.ResourceEvent:
{
var stats struct {
Topic string
Data environment.Stats
}
events.MustDecodeTo(v, &stats)
s.resources.UpdateStats(stats.Data)
// If there is no disk space available at this point, trigger the server
// disk limiter logic which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
limit.Trigger()
}
s.Events().Publish(StatsEvent, s.Proc())
}
case environment.StateChangeEvent:
{
// Reset the throttler when the process is started.
if e.Data == environment.ProcessStartingState {
limit.Reset()
s.Throttler().Reset()
}
s.OnStateChange()
}
case environment.DockerImagePullStatus: case environment.DockerImagePullStatus:
s.Events().Publish(InstallOutputEvent, e.Data) s.Events().Publish(InstallOutputEvent, e.Data)
case environment.DockerImagePullStarted: case environment.DockerImagePullStarted:
@ -122,18 +126,13 @@ func (s *Server) StartEventListeners() {
case environment.DockerImagePullCompleted: case environment.DockerImagePullCompleted:
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image") s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
default: default:
s.Log().WithField("topic", e.Topic).Error("unhandled docker event topic")
} }
}() }(v, limit)
case <-s.Context().Done():
return
} }
} }
}() }()
s.Log().Debug("registering event listeners: console, state, resources...")
s.Environment.SetLogCallback(s.processConsoleOutputEvent)
s.Environment.Events().On(state, environment.StateChangeEvent)
s.Environment.Events().On(stats, environment.ResourceEvent)
s.Environment.Events().On(docker, dockerEvents...)
} }
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")