Code cleanup for event listening and publishing
Co-Authored-By: Jakob <schrej@users.noreply.github.com>
This commit is contained in:
parent
da26b4c5c7
commit
cf33a2464a
|
@ -27,7 +27,7 @@ type Event struct {
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
|
||||||
subscribers map[string][]chan Event
|
subscribers map[string]map[chan Event]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the server's emitter instance.
|
// Returns the server's emitter instance.
|
||||||
|
@ -37,7 +37,7 @@ func (s *Server) Events() *EventBus {
|
||||||
|
|
||||||
if s.emitter == nil {
|
if s.emitter == nil {
|
||||||
s.emitter = &EventBus{
|
s.emitter = &EventBus{
|
||||||
subscribers: map[string][]chan Event{},
|
subscribers: make(map[string]map[chan Event]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,29 +46,30 @@ func (s *Server) Events() *EventBus {
|
||||||
|
|
||||||
// Publish data to a given topic.
|
// Publish data to a given topic.
|
||||||
func (e *EventBus) Publish(topic string, data string) {
|
func (e *EventBus) Publish(topic string, data string) {
|
||||||
|
t := topic
|
||||||
|
// Some of our topics for the socket support passing a more specific namespace,
|
||||||
|
// such as "backup completed:1234" to indicate which specific backup was completed.
|
||||||
|
//
|
||||||
|
// In these cases, we still need to the send the event using the standard listener
|
||||||
|
// name of "backup completed".
|
||||||
|
if strings.Contains(topic, ":") {
|
||||||
|
parts := strings.SplitN(topic, ":", 2)
|
||||||
|
|
||||||
|
if len(parts) == 2 {
|
||||||
|
t = parts[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
||||||
|
// avoids a panic crash if the process tries to unregister the channel while this routine
|
||||||
|
// is running.
|
||||||
go func() {
|
go func() {
|
||||||
e.RLock()
|
e.RLock()
|
||||||
defer e.RUnlock()
|
defer e.RUnlock()
|
||||||
|
|
||||||
t := topic
|
|
||||||
// Some of our topics for the socket support passing a more specific namespace,
|
|
||||||
// such as "backup completed:1234" to indicate which specific backup was completed.
|
|
||||||
//
|
|
||||||
// In these cases, we still need to the send the event using the standard listener
|
|
||||||
// name of "backup completed".
|
|
||||||
if strings.Contains(topic, ":") {
|
|
||||||
parts := strings.SplitN(topic, ":", 2)
|
|
||||||
|
|
||||||
if len(parts) == 2 {
|
|
||||||
t = parts[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ch, ok := e.subscribers[t]; ok {
|
if ch, ok := e.subscribers[t]; ok {
|
||||||
data := Event{Data: data, Topic: topic}
|
for channel := range ch {
|
||||||
|
channel <- Event{Data: data, Topic: topic}
|
||||||
for _, channel := range ch {
|
|
||||||
channel <- data
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -90,44 +91,25 @@ func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
p, ok := e.subscribers[topic]
|
if _, exists := e.subscribers[topic]; !exists {
|
||||||
|
e.subscribers[topic] = make(map[chan Event]struct{})
|
||||||
// If there is nothing currently subscribed to this topic just go ahead and create
|
|
||||||
// the item and then return.
|
|
||||||
if !ok {
|
|
||||||
e.subscribers[topic] = append([]chan Event{}, ch)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this topic is already setup, first iterate over the event channels currently in there
|
// Only set the channel if there is not currently a matching one for this topic. This
|
||||||
// and confirm there is not a match. If there _is_ a match do nothing since that means this
|
// avoids registering two identical listeners for the same topic and causing pain in
|
||||||
// channel is already being tracked. This avoids registering two identical handlers for the
|
// the unsubscribe functionality as well.
|
||||||
// same topic, and means the Unsubscribe function can safely assume there will only be a
|
if _, exists := e.subscribers[topic][ch]; !exists {
|
||||||
// single match for an event.
|
e.subscribers[topic][ch] = struct{}{}
|
||||||
for i := range e.subscribers[topic] {
|
|
||||||
if ch == e.subscribers[topic][i] {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
e.subscribers[topic] = append(p, ch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe a channel from a topic.
|
// Unsubscribe a channel from a given topic.
|
||||||
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
if _, ok := e.subscribers[topic]; ok {
|
if _, exists := e.subscribers[topic][ch]; exists {
|
||||||
for i := range e.subscribers[topic] {
|
delete(e.subscribers[topic], ch)
|
||||||
if ch == e.subscribers[topic][i] {
|
|
||||||
e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...)
|
|
||||||
// Subscribe enforces a unique event channel for the topic, so we can safely exit
|
|
||||||
// this loop once matched since there should not be any additional matches after
|
|
||||||
// this point.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,9 +121,6 @@ func (e *EventBus) UnsubscribeAll() {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
// Loop over all of the subscribers and just remove all of the events
|
// Reset the entire struct into an empty map.
|
||||||
// for them.
|
e.subscribers = make(map[string]map[chan Event]struct{})
|
||||||
for t := range e.subscribers {
|
|
||||||
e.subscribers[t] = make([]chan Event, 0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user