diff --git a/events/events.go b/events/events.go index c757a07..66d66af 100644 --- a/events/events.go +++ b/events/events.go @@ -2,6 +2,8 @@ package events import ( "encoding/json" + "github.com/pkg/errors" + "reflect" "strings" "sync" ) @@ -12,14 +14,13 @@ type Event struct { } type EventBus struct { - sync.RWMutex - - subscribers map[string]map[chan Event]struct{} + mu sync.RWMutex + callbacks map[string][]*func(Event) } func New() *EventBus { return &EventBus{ - subscribers: make(map[string]map[chan Event]struct{}), + callbacks: make(map[string][]*func(Event)), } } @@ -39,29 +40,27 @@ func (e *EventBus) Publish(topic string, data string) { } } + e.mu.RLock() + defer e.mu.RUnlock() + // Acquire a read lock and loop over all of the channels registered for the topic. This // avoids a panic crash if the process tries to unregister the channel while this routine // is running. - go func() { - e.RLock() - defer e.RUnlock() - - if ch, ok := e.subscribers[t]; ok { - e := Event{Data: data, Topic: topic} - - for channel := range ch { - go func(channel chan Event, e Event) { - channel <- e - }(channel, e) - } + if _, ok := e.callbacks[t]; ok { + evt := Event{Data: data, Topic: topic} + for _, callback := range e.callbacks[t] { + go func(evt Event, callback func(Event)) { + callback(evt) + }(evt, *callback) } - }() + } } +// Publishes a JSON message to a given topic. func (e *EventBus) PublishJson(topic string, data interface{}) error { b, err := json.Marshal(data) if err != nil { - return err + return errors.WithStack(err) } e.Publish(topic, string(b)) @@ -69,45 +68,65 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error { return nil } -// Subscribe to an emitter topic using a channel. -func (e *EventBus) Subscribe(topics []string, ch chan Event) { - e.Lock() - defer e.Unlock() +// Register a callback function that will be executed each time one of the events using the topic +// name is called. +func (e *EventBus) On(topic string, callback *func(Event)) { + e.mu.Lock() + defer e.mu.Unlock() - for _, topic := range topics { - if _, exists := e.subscribers[topic]; !exists { - e.subscribers[topic] = make(map[chan Event]struct{}) - } + // Check if this topic has been registered at least once for the event listener, and if + // not create an empty struct for the topic. + if _, exists := e.callbacks[topic]; !exists { + e.callbacks[topic] = make([]*func(Event), 0) + } - // Only set the channel if there is not currently a matching one for this topic. This - // avoids registering two identical listeners for the same topic and causing pain in - // the unsubscribe functionality as well. - if _, exists := e.subscribers[topic][ch]; !exists { - e.subscribers[topic][ch] = struct{}{} - } + // If this callback is not already registered as an event listener, go ahead and append + // it to the array of callbacks for this topic. + if e.index(topic, reflect.ValueOf(callback)) < 0 { + e.callbacks[topic] = append(e.callbacks[topic], callback) } } -// Unsubscribe a channel from a given topic. -func (e *EventBus) Unsubscribe(topics []string, ch chan Event) { - e.Lock() - defer e.Unlock() +// Removes an event listener from the bus. +func (e *EventBus) Off(topic string, callback *func(Event)) { + e.mu.Lock() + defer e.mu.Unlock() - for _, topic := range topics { - if _, exists := e.subscribers[topic][ch]; exists { - delete(e.subscribers[topic], ch) + i := e.index(topic, reflect.ValueOf(callback)) + + // If i < 0 it means there was no index found for the given callback, meaning it was + // never registered or was already unregistered from the listeners. Also double check + // that we didn't somehow escape the length of the topic callback (not sure how that + // would happen, but lets avoid a panic condition). + if i < 0 || i >= len(e.callbacks[topic]) { + return + } + + // We can assume that the topic still exists at this point since we acquire an exclusive + // lock on the process, and the "e.index" function cannot return a value >= 0 if there is + // no topic already existing. + e.callbacks[topic] = append(e.callbacks[topic][:i], e.callbacks[topic][i+1:]...) +} + +// Removes all of the event listeners that have been registered for any topic. +func (e *EventBus) RemoveAll() { + e.mu.Lock() + defer e.mu.Unlock() + + e.callbacks = make(map[string][]*func(Event)) +} + +// Finds the index of a given callback in the topic by comparing all of the registered callback +// pointers to the passed function. This function does not aquire a lock as it should only be called +// within the confines of a function that has already acquired a lock for the duration of the lookup. +func (e *EventBus) index(topic string, v reflect.Value) int { + if _, ok := e.callbacks[topic]; ok { + for i, handler := range e.callbacks[topic] { + if reflect.ValueOf(handler).Pointer() == v.Pointer() { + return i + } } } -} -// Removes all of the event listeners for the server. This is used when a server -// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously -// should also check elsewhere and handle a server reference going nil, but this -// won't hurt. -func (e *EventBus) UnsubscribeAll() { - e.Lock() - defer e.Unlock() - - // Reset the entire struct into an empty map. - e.subscribers = make(map[string]map[chan Event]struct{}) + return -1 } diff --git a/router/router_server.go b/router/router_server.go index c697694..49f0c3c 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -198,7 +198,7 @@ func deleteServer(c *gin.Context) { } // Unsubscribe all of the event listeners. - s.Events().UnsubscribeAll() + s.Events().RemoveAll() // Destroy the environment; in Docker this will handle a running container and // forcibly terminate it before removing the container, so we do not need to handle diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index b7330ef..3856714 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -51,22 +51,25 @@ var e = []string{ // to the connected websocket. func (h *Handler) ListenForServerEvents(ctx context.Context) { h.server.Log().Debug("listening for server events over websocket") + callback := func(e events.Event) { + if err := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); err != nil { + h.server.Log().WithField("error", err).Warn("error while sending server data over websocket") + } + } - eventChannel := make(chan events.Event) - h.server.Events().Subscribe(e, eventChannel) + // Subscribe to all of the events with the same callback that will push the data out over the + // websocket for the server. + for _, evt := range e { + h.server.Events().On(evt, &callback) + } go func(ctx context.Context) { select { case <-ctx.Done(): - h.server.Events().Unsubscribe(e, eventChannel) - - close(eventChannel) + // Once this context is stopped, de-register all of the listeners that have been registered. + for _, evt := range e { + h.server.Events().Off(evt, &callback) + } } }(ctx) - - for d := range eventChannel { - if err := h.SendJson(&Message{Event: d.Topic, Args: []string{d.Data}}); err != nil { - h.server.Log().WithField("error", err).Warn("error while sending server data over websocket") - } - } } diff --git a/server/listeners.go b/server/listeners.go index bb0d278..02ef352 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -11,59 +11,44 @@ import ( "strconv" ) -// Adds all of the internal event listeners we want to use for a server. +// Adds all of the internal event listeners we want to use for a server. These listeners can only be +// removed by deleting the server as they should last for the duration of the process' lifetime. func (s *Server) StartEventListeners() { - console := make(chan events.Event) - state := make(chan events.Event) - stats := make(chan events.Event) + console := func(e events.Event) { + // Immediately emit this event back over the server event stream since it is + // being called from the environment event stream and things probably aren't + // listening to that event. + s.Events().Publish(ConsoleOutputEvent, e.Data) - go func(console chan events.Event) { - for data := range console { - // Immediately emit this event back over the server event stream since it is - // being called from the environment event stream and things probably aren't - // listening to that event. - s.Events().Publish(ConsoleOutputEvent, data.Data) + // Also pass the data along to the console output channel. + s.onConsoleOutput(e.Data) + } - // Also pass the data along to the console output channel. - s.onConsoleOutput(data.Data) + state := func(e events.Event) { + s.SetState(e.Data) + } + + stats := func(e events.Event) { + st := new(environment.Stats) + if err := json.Unmarshal([]byte(e.Data), st); err != nil { + s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats") + return } - s.Log().Fatal("unexpected end-of-range for server console channel") - }(console) + // Update the server resource tracking object with the resources we got here. + s.resources.mu.Lock() + s.resources.Stats = *st + s.resources.mu.Unlock() - go func(state chan events.Event) { - for data := range state { - s.SetState(data.Data) - } + s.Filesystem.HasSpaceAvailable(true) - s.Log().Fatal("unexpected end-of-range for server state channel") - }(state) - - go func(stats chan events.Event) { - for data := range stats { - st := new(environment.Stats) - if err := json.Unmarshal([]byte(data.Data), st); err != nil { - s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats") - continue - } - - // Update the server resource tracking object with the resources we got here. - s.resources.mu.Lock() - s.resources.Stats = *st - s.resources.mu.Unlock() - - s.Filesystem.HasSpaceAvailable(true) - - s.emitProcUsage() - } - - s.Log().Fatal("unexpected end-of-range for server stats channel") - }(stats) + s.emitProcUsage() + } s.Log().Info("registering event listeners: console, state, resources...") - s.Environment.Events().Subscribe([]string{environment.ConsoleOutputEvent}, console) - s.Environment.Events().Subscribe([]string{environment.StateChangeEvent}, state) - s.Environment.Events().Subscribe([]string{environment.ResourceEvent}, stats) + s.Environment.Events().On(environment.ConsoleOutputEvent, &console) + s.Environment.Events().On(environment.StateChangeEvent, &state) + s.Environment.Events().On(environment.ResourceEvent, &stats) } var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")