events: remove waitgroup on Publish, add Destroy test

This commit is contained in:
Matthew Penner 2022-01-22 10:14:54 -07:00
parent 45418c86dd
commit 764aed89ae
No known key found for this signature in database
GPG Key ID: 31311906AD4CF6D6
2 changed files with 55 additions and 4 deletions

View File

@ -104,17 +104,13 @@ func (b *Bus) Publish(topic string, data interface{}) {
return return
} }
var wg sync.WaitGroup
event := Event{Topic: topic, Data: data} event := Event{Topic: topic, Data: data}
for _, listener := range listeners { for _, listener := range listeners {
l := listener l := listener
wg.Add(1)
go func(l Listener, event Event) { go func(l Listener, event Event) {
defer wg.Done()
l <- event l <- event
}(l, event) }(l, event)
} }
wg.Wait()
} }
// Destroy destroys the Event Bus by unregistering and closing all listeners. // Destroy destroys the Event Bus by unregistering and closing all listeners.

View File

@ -168,3 +168,58 @@ func TestBus_Publish(t *testing.T) {
}) })
}) })
} }
func TestBus_Destroy(t *testing.T) {
g := Goblin(t)
g.Describe("Destroy", func() {
g.It("unsubscribes and closes all listeners", func() {
bus := NewBus()
listener := make(chan Event)
bus.On(listener, "test")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m).IsZero()
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Destroy()
<-done
g.Assert(bus.listeners).Equal(map[string][]Listener{})
})
// This is a check that ensures Destroy only closes each listener
// channel once, even if it is subscribed to multiple topics.
//
// Closing a channel multiple times will cause a runtime panic, which
// I'm pretty sure we don't want.
g.It("unsubscribes and closes channel only once", func() {
bus := NewBus()
listener := make(chan Event)
bus.On(listener, "test", "test2", "test3", "test4", "test5")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m).IsZero()
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Destroy()
<-done
g.Assert(bus.listeners).Equal(map[string][]Listener{})
})
})
}