wings/events/events.go

152 lines
3.1 KiB
Go
Raw Normal View History

package events
import (
"strings"
2020-09-12 06:03:35 +00:00
"sync"
)
2022-01-18 03:23:29 +00:00
type Listener chan Event
// Event represents an Event sent over a Bus.
type Event struct {
Topic string
2022-01-18 03:23:29 +00:00
Data interface{}
}
2022-01-18 03:23:29 +00:00
// Bus represents an Event Bus.
type Bus struct {
listenersMx sync.Mutex
listeners map[string][]Listener
}
2022-01-18 03:23:29 +00:00
// NewBus returns a new empty Event Bus.
func NewBus() *Bus {
return &Bus{
listeners: make(map[string][]Listener),
}
}
2022-01-18 03:23:29 +00:00
// Off unregisters a listener from the specified topics on the Bus.
func (b *Bus) Off(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
var closed bool
2022-01-18 03:23:29 +00:00
for _, topic := range topics {
ok := b.off(topic, listener)
if !closed && ok {
close(listener)
closed = true
}
}
2022-01-18 03:23:29 +00:00
}
2022-01-18 03:23:29 +00:00
func (b *Bus) off(topic string, listener Listener) bool {
listeners, ok := b.listeners[topic]
if !ok {
return false
}
for i, l := range listeners {
if l != listener {
continue
}
2022-01-18 03:23:29 +00:00
listeners = append(listeners[:i], listeners[i+1:]...)
b.listeners[topic] = listeners
return true
}
2022-01-18 03:23:29 +00:00
return false
}
2022-01-18 03:23:29 +00:00
// On registers a listener to the specified topics on the Bus.
func (b *Bus) On(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
2022-01-18 03:23:29 +00:00
for _, topic := range topics {
b.on(topic, listener)
}
}
2022-01-18 03:23:29 +00:00
func (b *Bus) on(topic string, listener Listener) {
listeners, ok := b.listeners[topic]
if !ok {
b.listeners[topic] = []Listener{listener}
} else {
b.listeners[topic] = append(listeners, listener)
}
}
2022-01-18 03:23:29 +00:00
// Publish publishes a message to the Bus.
func (b *Bus) Publish(topic string, data interface{}) {
// Some of our topics for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed.
//
// In these cases, we still need to send the event using the standard listener
// name of "backup completed".
if strings.Contains(topic, ":") {
parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 {
topic = parts[0]
}
}
2022-01-18 03:23:29 +00:00
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
2022-01-18 03:23:29 +00:00
listeners, ok := b.listeners[topic]
if !ok {
return
}
if len(listeners) < 1 {
return
}
2022-01-18 03:23:29 +00:00
var wg sync.WaitGroup
event := Event{Topic: topic, Data: data}
for _, listener := range listeners {
l := listener
wg.Add(1)
go func(l Listener, event Event) {
defer wg.Done()
l <- event
}(l, event)
}
2022-01-18 03:23:29 +00:00
wg.Wait()
}
2022-01-18 03:23:29 +00:00
// Destroy destroys the Event Bus by unregistering and closing all listeners.
func (b *Bus) Destroy() {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
// Track what listeners have already been closed. Because the same listener
// can be listening on multiple topics, we need a way to essentially
// "de-duplicate" all the listeners across all the topics.
var closed []Listener
2022-01-18 03:23:29 +00:00
for _, listeners := range b.listeners {
for _, listener := range listeners {
if contains(closed, listener) {
continue
}
2022-01-18 03:23:29 +00:00
close(listener)
closed = append(closed, listener)
2022-01-18 03:23:29 +00:00
}
}
2022-01-18 03:23:29 +00:00
b.listeners = make(map[string][]Listener)
}
func contains(closed []Listener, listener Listener) bool {
for _, c := range closed {
if c == listener {
return true
}
}
return false
}