Refactor confusing & fragile event bus logic to use callbacks and not channels; ref pterodactyl/panel#2298
This commit is contained in:
parent
8407ea21da
commit
4ac19bd29d
119
events/events.go
119
events/events.go
|
@ -2,6 +2,8 @@ package events
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -12,14 +14,13 @@ type Event struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
callbacks map[string][]*func(Event)
|
||||||
subscribers map[string]map[chan Event]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *EventBus {
|
func New() *EventBus {
|
||||||
return &EventBus{
|
return &EventBus{
|
||||||
subscribers: make(map[string]map[chan Event]struct{}),
|
callbacks: make(map[string][]*func(Event)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,29 +40,27 @@ func (e *EventBus) Publish(topic string, data string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
|
||||||
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
||||||
// avoids a panic crash if the process tries to unregister the channel while this routine
|
// avoids a panic crash if the process tries to unregister the channel while this routine
|
||||||
// is running.
|
// is running.
|
||||||
go func() {
|
if _, ok := e.callbacks[t]; ok {
|
||||||
e.RLock()
|
evt := Event{Data: data, Topic: topic}
|
||||||
defer e.RUnlock()
|
for _, callback := range e.callbacks[t] {
|
||||||
|
go func(evt Event, callback func(Event)) {
|
||||||
if ch, ok := e.subscribers[t]; ok {
|
callback(evt)
|
||||||
e := Event{Data: data, Topic: topic}
|
}(evt, *callback)
|
||||||
|
|
||||||
for channel := range ch {
|
|
||||||
go func(channel chan Event, e Event) {
|
|
||||||
channel <- e
|
|
||||||
}(channel, e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publishes a JSON message to a given topic.
|
||||||
func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||||
b, err := json.Marshal(data)
|
b, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Publish(topic, string(b))
|
e.Publish(topic, string(b))
|
||||||
|
@ -69,45 +68,65 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to an emitter topic using a channel.
|
// Register a callback function that will be executed each time one of the events using the topic
|
||||||
func (e *EventBus) Subscribe(topics []string, ch chan Event) {
|
// name is called.
|
||||||
e.Lock()
|
func (e *EventBus) On(topic string, callback *func(Event)) {
|
||||||
defer e.Unlock()
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
for _, topic := range topics {
|
// Check if this topic has been registered at least once for the event listener, and if
|
||||||
if _, exists := e.subscribers[topic]; !exists {
|
// not create an empty struct for the topic.
|
||||||
e.subscribers[topic] = make(map[chan Event]struct{})
|
if _, exists := e.callbacks[topic]; !exists {
|
||||||
}
|
e.callbacks[topic] = make([]*func(Event), 0)
|
||||||
|
}
|
||||||
|
|
||||||
// Only set the channel if there is not currently a matching one for this topic. This
|
// If this callback is not already registered as an event listener, go ahead and append
|
||||||
// avoids registering two identical listeners for the same topic and causing pain in
|
// it to the array of callbacks for this topic.
|
||||||
// the unsubscribe functionality as well.
|
if e.index(topic, reflect.ValueOf(callback)) < 0 {
|
||||||
if _, exists := e.subscribers[topic][ch]; !exists {
|
e.callbacks[topic] = append(e.callbacks[topic], callback)
|
||||||
e.subscribers[topic][ch] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe a channel from a given topic.
|
// Removes an event listener from the bus.
|
||||||
func (e *EventBus) Unsubscribe(topics []string, ch chan Event) {
|
func (e *EventBus) Off(topic string, callback *func(Event)) {
|
||||||
e.Lock()
|
e.mu.Lock()
|
||||||
defer e.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
for _, topic := range topics {
|
i := e.index(topic, reflect.ValueOf(callback))
|
||||||
if _, exists := e.subscribers[topic][ch]; exists {
|
|
||||||
delete(e.subscribers[topic], ch)
|
// If i < 0 it means there was no index found for the given callback, meaning it was
|
||||||
|
// never registered or was already unregistered from the listeners. Also double check
|
||||||
|
// that we didn't somehow escape the length of the topic callback (not sure how that
|
||||||
|
// would happen, but lets avoid a panic condition).
|
||||||
|
if i < 0 || i >= len(e.callbacks[topic]) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can assume that the topic still exists at this point since we acquire an exclusive
|
||||||
|
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
|
||||||
|
// no topic already existing.
|
||||||
|
e.callbacks[topic] = append(e.callbacks[topic][:i], e.callbacks[topic][i+1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Removes all of the event listeners that have been registered for any topic.
|
||||||
|
func (e *EventBus) RemoveAll() {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
e.callbacks = make(map[string][]*func(Event))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finds the index of a given callback in the topic by comparing all of the registered callback
|
||||||
|
// pointers to the passed function. This function does not aquire a lock as it should only be called
|
||||||
|
// within the confines of a function that has already acquired a lock for the duration of the lookup.
|
||||||
|
func (e *EventBus) index(topic string, v reflect.Value) int {
|
||||||
|
if _, ok := e.callbacks[topic]; ok {
|
||||||
|
for i, handler := range e.callbacks[topic] {
|
||||||
|
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
|
||||||
|
return i
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Removes all of the event listeners for the server. This is used when a server
|
return -1
|
||||||
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
|
|
||||||
// should also check elsewhere and handle a server reference going nil, but this
|
|
||||||
// won't hurt.
|
|
||||||
func (e *EventBus) UnsubscribeAll() {
|
|
||||||
e.Lock()
|
|
||||||
defer e.Unlock()
|
|
||||||
|
|
||||||
// Reset the entire struct into an empty map.
|
|
||||||
e.subscribers = make(map[string]map[chan Event]struct{})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -198,7 +198,7 @@ func deleteServer(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe all of the event listeners.
|
// Unsubscribe all of the event listeners.
|
||||||
s.Events().UnsubscribeAll()
|
s.Events().RemoveAll()
|
||||||
|
|
||||||
// Destroy the environment; in Docker this will handle a running container and
|
// Destroy the environment; in Docker this will handle a running container and
|
||||||
// forcibly terminate it before removing the container, so we do not need to handle
|
// forcibly terminate it before removing the container, so we do not need to handle
|
||||||
|
|
|
@ -51,22 +51,25 @@ var e = []string{
|
||||||
// to the connected websocket.
|
// to the connected websocket.
|
||||||
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
||||||
h.server.Log().Debug("listening for server events over websocket")
|
h.server.Log().Debug("listening for server events over websocket")
|
||||||
|
callback := func(e events.Event) {
|
||||||
|
if err := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); err != nil {
|
||||||
|
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
eventChannel := make(chan events.Event)
|
// Subscribe to all of the events with the same callback that will push the data out over the
|
||||||
h.server.Events().Subscribe(e, eventChannel)
|
// websocket for the server.
|
||||||
|
for _, evt := range e {
|
||||||
|
h.server.Events().On(evt, &callback)
|
||||||
|
}
|
||||||
|
|
||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
h.server.Events().Unsubscribe(e, eventChannel)
|
// Once this context is stopped, de-register all of the listeners that have been registered.
|
||||||
|
for _, evt := range e {
|
||||||
close(eventChannel)
|
h.server.Events().Off(evt, &callback)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}(ctx)
|
}(ctx)
|
||||||
|
|
||||||
for d := range eventChannel {
|
|
||||||
if err := h.SendJson(&Message{Event: d.Topic, Args: []string{d.Data}}); err != nil {
|
|
||||||
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,59 +11,44 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Adds all of the internal event listeners we want to use for a server.
|
// Adds all of the internal event listeners we want to use for a server. These listeners can only be
|
||||||
|
// removed by deleting the server as they should last for the duration of the process' lifetime.
|
||||||
func (s *Server) StartEventListeners() {
|
func (s *Server) StartEventListeners() {
|
||||||
console := make(chan events.Event)
|
console := func(e events.Event) {
|
||||||
state := make(chan events.Event)
|
// Immediately emit this event back over the server event stream since it is
|
||||||
stats := make(chan events.Event)
|
// being called from the environment event stream and things probably aren't
|
||||||
|
// listening to that event.
|
||||||
|
s.Events().Publish(ConsoleOutputEvent, e.Data)
|
||||||
|
|
||||||
go func(console chan events.Event) {
|
// Also pass the data along to the console output channel.
|
||||||
for data := range console {
|
s.onConsoleOutput(e.Data)
|
||||||
// Immediately emit this event back over the server event stream since it is
|
}
|
||||||
// being called from the environment event stream and things probably aren't
|
|
||||||
// listening to that event.
|
|
||||||
s.Events().Publish(ConsoleOutputEvent, data.Data)
|
|
||||||
|
|
||||||
// Also pass the data along to the console output channel.
|
state := func(e events.Event) {
|
||||||
s.onConsoleOutput(data.Data)
|
s.SetState(e.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := func(e events.Event) {
|
||||||
|
st := new(environment.Stats)
|
||||||
|
if err := json.Unmarshal([]byte(e.Data), st); err != nil {
|
||||||
|
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Log().Fatal("unexpected end-of-range for server console channel")
|
// Update the server resource tracking object with the resources we got here.
|
||||||
}(console)
|
s.resources.mu.Lock()
|
||||||
|
s.resources.Stats = *st
|
||||||
|
s.resources.mu.Unlock()
|
||||||
|
|
||||||
go func(state chan events.Event) {
|
s.Filesystem.HasSpaceAvailable(true)
|
||||||
for data := range state {
|
|
||||||
s.SetState(data.Data)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Log().Fatal("unexpected end-of-range for server state channel")
|
s.emitProcUsage()
|
||||||
}(state)
|
}
|
||||||
|
|
||||||
go func(stats chan events.Event) {
|
|
||||||
for data := range stats {
|
|
||||||
st := new(environment.Stats)
|
|
||||||
if err := json.Unmarshal([]byte(data.Data), st); err != nil {
|
|
||||||
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the server resource tracking object with the resources we got here.
|
|
||||||
s.resources.mu.Lock()
|
|
||||||
s.resources.Stats = *st
|
|
||||||
s.resources.mu.Unlock()
|
|
||||||
|
|
||||||
s.Filesystem.HasSpaceAvailable(true)
|
|
||||||
|
|
||||||
s.emitProcUsage()
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Log().Fatal("unexpected end-of-range for server stats channel")
|
|
||||||
}(stats)
|
|
||||||
|
|
||||||
s.Log().Info("registering event listeners: console, state, resources...")
|
s.Log().Info("registering event listeners: console, state, resources...")
|
||||||
s.Environment.Events().Subscribe([]string{environment.ConsoleOutputEvent}, console)
|
s.Environment.Events().On(environment.ConsoleOutputEvent, &console)
|
||||||
s.Environment.Events().Subscribe([]string{environment.StateChangeEvent}, state)
|
s.Environment.Events().On(environment.StateChangeEvent, &state)
|
||||||
s.Environment.Events().Subscribe([]string{environment.ResourceEvent}, stats)
|
s.Environment.Events().On(environment.ResourceEvent, &stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user