diff --git a/events/events.go b/events/events.go index 66d66af..884f8a8 100644 --- a/events/events.go +++ b/events/events.go @@ -2,8 +2,9 @@ package events import ( "encoding/json" + "fmt" + "github.com/gammazero/workerpool" "github.com/pkg/errors" - "reflect" "strings" "sync" ) @@ -14,13 +15,13 @@ type Event struct { } type EventBus struct { - mu sync.RWMutex - callbacks map[string][]*func(Event) + mu sync.RWMutex + pools map[string]*CallbackPool } func New() *EventBus { return &EventBus{ - callbacks: make(map[string][]*func(Event)), + pools: make(map[string]*CallbackPool), } } @@ -46,12 +47,25 @@ func (e *EventBus) Publish(topic string, data string) { // Acquire a read lock and loop over all of the channels registered for the topic. This // avoids a panic crash if the process tries to unregister the channel while this routine // is running. - if _, ok := e.callbacks[t]; ok { + if cp, ok := e.pools[t]; ok { evt := Event{Data: data, Topic: topic} - for _, callback := range e.callbacks[t] { - go func(evt Event, callback func(Event)) { - callback(evt) - }(evt, *callback) + + for _, callback := range cp.callbacks { + c := *callback + evt := evt + // Using the workerpool with one worker allows us to execute events in a FIFO manner. Running + // this using goroutines would cause things such as console output to just output in random order + // if more than one event is fired at the same time. + // + // However, the pool submission does not block the execution of this function itself, allowing + // us to call publish without blocking any of the other pathways. + // + // @see https://github.com/pterodactyl/panel/issues/2303 + fmt.Println("pre-submit for topic:", evt.Topic) + cp.pool.Submit(func() { + fmt.Println("executing callback for event:", evt.Topic) + c(evt) + }) } } } @@ -76,15 +90,17 @@ func (e *EventBus) On(topic string, callback *func(Event)) { // Check if this topic has been registered at least once for the event listener, and if // not create an empty struct for the topic. - if _, exists := e.callbacks[topic]; !exists { - e.callbacks[topic] = make([]*func(Event), 0) + if _, exists := e.pools[topic]; !exists { + fmt.Println("no pool for topic, creating:", topic) + e.pools[topic] = &CallbackPool{ + callbacks: make([]*func(Event), 0), + pool: workerpool.New(1), + } } // If this callback is not already registered as an event listener, go ahead and append // it to the array of callbacks for this topic. - if e.index(topic, reflect.ValueOf(callback)) < 0 { - e.callbacks[topic] = append(e.callbacks[topic], callback) - } + e.pools[topic].Add(callback) } // Removes an event listener from the bus. @@ -92,41 +108,22 @@ func (e *EventBus) Off(topic string, callback *func(Event)) { e.mu.Lock() defer e.mu.Unlock() - i := e.index(topic, reflect.ValueOf(callback)) - - // If i < 0 it means there was no index found for the given callback, meaning it was - // never registered or was already unregistered from the listeners. Also double check - // that we didn't somehow escape the length of the topic callback (not sure how that - // would happen, but lets avoid a panic condition). - if i < 0 || i >= len(e.callbacks[topic]) { - return + if cp, ok := e.pools[topic]; ok { + cp.Remove(callback) } - - // We can assume that the topic still exists at this point since we acquire an exclusive - // lock on the process, and the "e.index" function cannot return a value >= 0 if there is - // no topic already existing. - e.callbacks[topic] = append(e.callbacks[topic][:i], e.callbacks[topic][i+1:]...) } -// Removes all of the event listeners that have been registered for any topic. -func (e *EventBus) RemoveAll() { +// Removes all of the event listeners that have been registered for any topic. Also stops the worker +// pool to close that routine. +func (e *EventBus) Destroy() { e.mu.Lock() defer e.mu.Unlock() - e.callbacks = make(map[string][]*func(Event)) -} - -// Finds the index of a given callback in the topic by comparing all of the registered callback -// pointers to the passed function. This function does not aquire a lock as it should only be called -// within the confines of a function that has already acquired a lock for the duration of the lookup. -func (e *EventBus) index(topic string, v reflect.Value) int { - if _, ok := e.callbacks[topic]; ok { - for i, handler := range e.callbacks[topic] { - if reflect.ValueOf(handler).Pointer() == v.Pointer() { - return i - } - } + fmt.Println("destroying pool for event bus") + // Stop every pool that exists for a given callback topic. + for _, cp := range e.pools { + cp.pool.Stop() } - return -1 + e.pools = make(map[string]*CallbackPool) } diff --git a/events/pool.go b/events/pool.go new file mode 100644 index 0000000..881d3c7 --- /dev/null +++ b/events/pool.go @@ -0,0 +1,49 @@ +package events + +import ( + "github.com/gammazero/workerpool" + "reflect" +) + +type CallbackPool struct { + callbacks []*func(Event) + pool *workerpool.WorkerPool +} + +// Pushes a new callback into the array of listeners for the pool. +func (cp *CallbackPool) Add(callback *func(Event)) { + if cp.index(reflect.ValueOf(callback)) < 0 { + cp.callbacks = append(cp.callbacks, callback) + } +} + +// Removes a callback from the array of registered callbacks if it exists. +func (cp *CallbackPool) Remove(callback *func(Event)) { + i := cp.index(reflect.ValueOf(callback)) + + // If i < 0 it means there was no index found for the given callback, meaning it was + // never registered or was already unregistered from the listeners. Also double check + // that we didn't somehow escape the length of the topic callback (not sure how that + // would happen, but lets avoid a panic condition). + if i < 0 || i >= len(cp.callbacks) { + return + } + + // We can assume that the topic still exists at this point since we acquire an exclusive + // lock on the process, and the "e.index" function cannot return a value >= 0 if there is + // no topic already existing. + cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...) +} + +// Finds the index of a given callback in the topic by comparing all of the registered callback +// pointers to the passed function. This function does not aquire a lock as it should only be called +// within the confines of a function that has already acquired a lock for the duration of the lookup. +func (cp *CallbackPool) index(v reflect.Value) int { + for i, handler := range cp.callbacks { + if reflect.ValueOf(handler).Pointer() == v.Pointer() { + return i + } + } + + return -1 +} \ No newline at end of file diff --git a/router/router_server.go b/router/router_server.go index 49f0c3c..74b74db 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -198,7 +198,7 @@ func deleteServer(c *gin.Context) { } // Unsubscribe all of the event listeners. - s.Events().RemoveAll() + s.Events().Destroy() // Destroy the environment; in Docker this will handle a running container and // forcibly terminate it before removing the container, so we do not need to handle