From cf33a2464af873fef94cd024d5b39641333abbf2 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Fri, 31 Jul 2020 21:02:25 -0700 Subject: [PATCH] Code cleanup for event listening and publishing Co-Authored-By: Jakob --- server/events.go | 87 ++++++++++++++++++------------------------------ 1 file changed, 33 insertions(+), 54 deletions(-) diff --git a/server/events.go b/server/events.go index 703179d..24976d3 100644 --- a/server/events.go +++ b/server/events.go @@ -27,7 +27,7 @@ type Event struct { type EventBus struct { sync.RWMutex - subscribers map[string][]chan Event + subscribers map[string]map[chan Event]struct{} } // Returns the server's emitter instance. @@ -37,7 +37,7 @@ func (s *Server) Events() *EventBus { if s.emitter == nil { s.emitter = &EventBus{ - subscribers: map[string][]chan Event{}, + subscribers: make(map[string]map[chan Event]struct{}), } } @@ -46,29 +46,30 @@ func (s *Server) Events() *EventBus { // Publish data to a given topic. func (e *EventBus) Publish(topic string, data string) { + t := topic + // Some of our topics for the socket support passing a more specific namespace, + // such as "backup completed:1234" to indicate which specific backup was completed. + // + // In these cases, we still need to the send the event using the standard listener + // name of "backup completed". + if strings.Contains(topic, ":") { + parts := strings.SplitN(topic, ":", 2) + + if len(parts) == 2 { + t = parts[0] + } + } + + // Acquire a read lock and loop over all of the channels registered for the topic. This + // avoids a panic crash if the process tries to unregister the channel while this routine + // is running. go func() { e.RLock() defer e.RUnlock() - t := topic - // Some of our topics for the socket support passing a more specific namespace, - // such as "backup completed:1234" to indicate which specific backup was completed. - // - // In these cases, we still need to the send the event using the standard listener - // name of "backup completed". - if strings.Contains(topic, ":") { - parts := strings.SplitN(topic, ":", 2) - - if len(parts) == 2 { - t = parts[0] - } - } - if ch, ok := e.subscribers[t]; ok { - data := Event{Data: data, Topic: topic} - - for _, channel := range ch { - channel <- data + for channel := range ch { + channel <- Event{Data: data, Topic: topic} } } }() @@ -90,44 +91,25 @@ func (e *EventBus) Subscribe(topic string, ch chan Event) { e.Lock() defer e.Unlock() - p, ok := e.subscribers[topic] - - // If there is nothing currently subscribed to this topic just go ahead and create - // the item and then return. - if !ok { - e.subscribers[topic] = append([]chan Event{}, ch) - return + if _, exists := e.subscribers[topic]; !exists { + e.subscribers[topic] = make(map[chan Event]struct{}) } - // If this topic is already setup, first iterate over the event channels currently in there - // and confirm there is not a match. If there _is_ a match do nothing since that means this - // channel is already being tracked. This avoids registering two identical handlers for the - // same topic, and means the Unsubscribe function can safely assume there will only be a - // single match for an event. - for i := range e.subscribers[topic] { - if ch == e.subscribers[topic][i] { - return - } + // Only set the channel if there is not currently a matching one for this topic. This + // avoids registering two identical listeners for the same topic and causing pain in + // the unsubscribe functionality as well. + if _, exists := e.subscribers[topic][ch]; !exists { + e.subscribers[topic][ch] = struct{}{} } - - e.subscribers[topic] = append(p, ch) } -// Unsubscribe a channel from a topic. +// Unsubscribe a channel from a given topic. func (e *EventBus) Unsubscribe(topic string, ch chan Event) { e.Lock() defer e.Unlock() - if _, ok := e.subscribers[topic]; ok { - for i := range e.subscribers[topic] { - if ch == e.subscribers[topic][i] { - e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...) - // Subscribe enforces a unique event channel for the topic, so we can safely exit - // this loop once matched since there should not be any additional matches after - // this point. - break - } - } + if _, exists := e.subscribers[topic][ch]; exists { + delete(e.subscribers[topic], ch) } } @@ -139,9 +121,6 @@ func (e *EventBus) UnsubscribeAll() { e.Lock() defer e.Unlock() - // Loop over all of the subscribers and just remove all of the events - // for them. - for t := range e.subscribers { - e.subscribers[t] = make([]chan Event, 0) - } + // Reset the entire struct into an empty map. + e.subscribers = make(map[string]map[chan Event]struct{}) }