Use workerpools to enforce FIFO without blocking other topics
This commit is contained in:
parent
53bd0d57ad
commit
d02e37620d
|
@ -2,8 +2,9 @@ package events
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/gammazero/workerpool"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"reflect"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -15,12 +16,12 @@ type Event struct {
|
||||||
|
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
callbacks map[string][]*func(Event)
|
pools map[string]*CallbackPool
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *EventBus {
|
func New() *EventBus {
|
||||||
return &EventBus{
|
return &EventBus{
|
||||||
callbacks: make(map[string][]*func(Event)),
|
pools: make(map[string]*CallbackPool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,12 +47,25 @@ func (e *EventBus) Publish(topic string, data string) {
|
||||||
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
||||||
// avoids a panic crash if the process tries to unregister the channel while this routine
|
// avoids a panic crash if the process tries to unregister the channel while this routine
|
||||||
// is running.
|
// is running.
|
||||||
if _, ok := e.callbacks[t]; ok {
|
if cp, ok := e.pools[t]; ok {
|
||||||
evt := Event{Data: data, Topic: topic}
|
evt := Event{Data: data, Topic: topic}
|
||||||
for _, callback := range e.callbacks[t] {
|
|
||||||
go func(evt Event, callback func(Event)) {
|
for _, callback := range cp.callbacks {
|
||||||
callback(evt)
|
c := *callback
|
||||||
}(evt, *callback)
|
evt := evt
|
||||||
|
// Using the workerpool with one worker allows us to execute events in a FIFO manner. Running
|
||||||
|
// this using goroutines would cause things such as console output to just output in random order
|
||||||
|
// if more than one event is fired at the same time.
|
||||||
|
//
|
||||||
|
// However, the pool submission does not block the execution of this function itself, allowing
|
||||||
|
// us to call publish without blocking any of the other pathways.
|
||||||
|
//
|
||||||
|
// @see https://github.com/pterodactyl/panel/issues/2303
|
||||||
|
fmt.Println("pre-submit for topic:", evt.Topic)
|
||||||
|
cp.pool.Submit(func() {
|
||||||
|
fmt.Println("executing callback for event:", evt.Topic)
|
||||||
|
c(evt)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,15 +90,17 @@ func (e *EventBus) On(topic string, callback *func(Event)) {
|
||||||
|
|
||||||
// Check if this topic has been registered at least once for the event listener, and if
|
// Check if this topic has been registered at least once for the event listener, and if
|
||||||
// not create an empty struct for the topic.
|
// not create an empty struct for the topic.
|
||||||
if _, exists := e.callbacks[topic]; !exists {
|
if _, exists := e.pools[topic]; !exists {
|
||||||
e.callbacks[topic] = make([]*func(Event), 0)
|
fmt.Println("no pool for topic, creating:", topic)
|
||||||
|
e.pools[topic] = &CallbackPool{
|
||||||
|
callbacks: make([]*func(Event), 0),
|
||||||
|
pool: workerpool.New(1),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this callback is not already registered as an event listener, go ahead and append
|
// If this callback is not already registered as an event listener, go ahead and append
|
||||||
// it to the array of callbacks for this topic.
|
// it to the array of callbacks for this topic.
|
||||||
if e.index(topic, reflect.ValueOf(callback)) < 0 {
|
e.pools[topic].Add(callback)
|
||||||
e.callbacks[topic] = append(e.callbacks[topic], callback)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes an event listener from the bus.
|
// Removes an event listener from the bus.
|
||||||
|
@ -92,41 +108,22 @@ func (e *EventBus) Off(topic string, callback *func(Event)) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
i := e.index(topic, reflect.ValueOf(callback))
|
if cp, ok := e.pools[topic]; ok {
|
||||||
|
cp.Remove(callback)
|
||||||
// If i < 0 it means there was no index found for the given callback, meaning it was
|
}
|
||||||
// never registered or was already unregistered from the listeners. Also double check
|
|
||||||
// that we didn't somehow escape the length of the topic callback (not sure how that
|
|
||||||
// would happen, but lets avoid a panic condition).
|
|
||||||
if i < 0 || i >= len(e.callbacks[topic]) {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can assume that the topic still exists at this point since we acquire an exclusive
|
// Removes all of the event listeners that have been registered for any topic. Also stops the worker
|
||||||
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
|
// pool to close that routine.
|
||||||
// no topic already existing.
|
func (e *EventBus) Destroy() {
|
||||||
e.callbacks[topic] = append(e.callbacks[topic][:i], e.callbacks[topic][i+1:]...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Removes all of the event listeners that have been registered for any topic.
|
|
||||||
func (e *EventBus) RemoveAll() {
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
e.callbacks = make(map[string][]*func(Event))
|
fmt.Println("destroying pool for event bus")
|
||||||
|
// Stop every pool that exists for a given callback topic.
|
||||||
|
for _, cp := range e.pools {
|
||||||
|
cp.pool.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finds the index of a given callback in the topic by comparing all of the registered callback
|
e.pools = make(map[string]*CallbackPool)
|
||||||
// pointers to the passed function. This function does not aquire a lock as it should only be called
|
|
||||||
// within the confines of a function that has already acquired a lock for the duration of the lookup.
|
|
||||||
func (e *EventBus) index(topic string, v reflect.Value) int {
|
|
||||||
if _, ok := e.callbacks[topic]; ok {
|
|
||||||
for i, handler := range e.callbacks[topic] {
|
|
||||||
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1
|
|
||||||
}
|
}
|
||||||
|
|
49
events/pool.go
Normal file
49
events/pool.go
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/gammazero/workerpool"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CallbackPool struct {
|
||||||
|
callbacks []*func(Event)
|
||||||
|
pool *workerpool.WorkerPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pushes a new callback into the array of listeners for the pool.
|
||||||
|
func (cp *CallbackPool) Add(callback *func(Event)) {
|
||||||
|
if cp.index(reflect.ValueOf(callback)) < 0 {
|
||||||
|
cp.callbacks = append(cp.callbacks, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Removes a callback from the array of registered callbacks if it exists.
|
||||||
|
func (cp *CallbackPool) Remove(callback *func(Event)) {
|
||||||
|
i := cp.index(reflect.ValueOf(callback))
|
||||||
|
|
||||||
|
// If i < 0 it means there was no index found for the given callback, meaning it was
|
||||||
|
// never registered or was already unregistered from the listeners. Also double check
|
||||||
|
// that we didn't somehow escape the length of the topic callback (not sure how that
|
||||||
|
// would happen, but lets avoid a panic condition).
|
||||||
|
if i < 0 || i >= len(cp.callbacks) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can assume that the topic still exists at this point since we acquire an exclusive
|
||||||
|
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
|
||||||
|
// no topic already existing.
|
||||||
|
cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finds the index of a given callback in the topic by comparing all of the registered callback
|
||||||
|
// pointers to the passed function. This function does not aquire a lock as it should only be called
|
||||||
|
// within the confines of a function that has already acquired a lock for the duration of the lookup.
|
||||||
|
func (cp *CallbackPool) index(v reflect.Value) int {
|
||||||
|
for i, handler := range cp.callbacks {
|
||||||
|
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1
|
||||||
|
}
|
|
@ -198,7 +198,7 @@ func deleteServer(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe all of the event listeners.
|
// Unsubscribe all of the event listeners.
|
||||||
s.Events().RemoveAll()
|
s.Events().Destroy()
|
||||||
|
|
||||||
// Destroy the environment; in Docker this will handle a running container and
|
// Destroy the environment; in Docker this will handle a running container and
|
||||||
// forcibly terminate it before removing the container, so we do not need to handle
|
// forcibly terminate it before removing the container, so we do not need to handle
|
||||||
|
|
Loading…
Reference in New Issue
Block a user