diff --git a/events/events.go b/events/events.go index c0d8213..d87a0d7 100644 --- a/events/events.go +++ b/events/events.go @@ -104,17 +104,13 @@ func (b *Bus) Publish(topic string, data interface{}) { return } - var wg sync.WaitGroup event := Event{Topic: topic, Data: data} for _, listener := range listeners { l := listener - wg.Add(1) go func(l Listener, event Event) { - defer wg.Done() l <- event }(l, event) } - wg.Wait() } // Destroy destroys the Event Bus by unregistering and closing all listeners. diff --git a/events/events_test.go b/events/events_test.go index 542a8e2..22949d4 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -168,3 +168,58 @@ func TestBus_Publish(t *testing.T) { }) }) } + +func TestBus_Destroy(t *testing.T) { + g := Goblin(t) + + g.Describe("Destroy", func() { + g.It("unsubscribes and closes all listeners", func() { + bus := NewBus() + + listener := make(chan Event) + bus.On(listener, "test") + + done := make(chan struct{}, 1) + go func() { + select { + case m := <-listener: + g.Assert(m).IsZero() + case <-time.After(1 * time.Second): + g.Fail("listener did not receive message in time") + } + done <- struct{}{} + }() + bus.Destroy() + <-done + + g.Assert(bus.listeners).Equal(map[string][]Listener{}) + }) + + // This is a check that ensures Destroy only closes each listener + // channel once, even if it is subscribed to multiple topics. + // + // Closing a channel multiple times will cause a runtime panic, which + // I'm pretty sure we don't want. + g.It("unsubscribes and closes channel only once", func() { + bus := NewBus() + + listener := make(chan Event) + bus.On(listener, "test", "test2", "test3", "test4", "test5") + + done := make(chan struct{}, 1) + go func() { + select { + case m := <-listener: + g.Assert(m).IsZero() + case <-time.After(1 * time.Second): + g.Fail("listener did not receive message in time") + } + done <- struct{}{} + }() + bus.Destroy() + <-done + + g.Assert(bus.listeners).Equal(map[string][]Listener{}) + }) + }) +}