events: don't explode when destroying a bus

Only attempt to close channels once, rather than per topic
they are subscribed to.
This commit is contained in:
Matthew Penner 2022-01-20 09:48:18 -07:00
parent 6d8c1d2225
commit 4ba5fe2866
No known key found for this signature in database
GPG Key ID: 31311906AD4CF6D6
4 changed files with 39 additions and 22 deletions

View File

@ -3,6 +3,8 @@ package events
import ( import (
"strings" "strings"
"sync" "sync"
"github.com/apex/log"
) )
type Listener chan Event type Listener chan Event
@ -31,8 +33,15 @@ func (b *Bus) Off(listener Listener, topics ...string) {
b.listenersMx.Lock() b.listenersMx.Lock()
defer b.listenersMx.Unlock() defer b.listenersMx.Unlock()
var closed bool
for _, topic := range topics { for _, topic := range topics {
b.off(topic, listener) ok := b.off(topic, listener)
if !closed && ok {
log.Debug("closing event channel!")
close(listener)
closed = true
}
} }
} }
@ -116,11 +125,30 @@ func (b *Bus) Destroy() {
b.listenersMx.Lock() b.listenersMx.Lock()
defer b.listenersMx.Unlock() defer b.listenersMx.Unlock()
// Track what listeners have already been closed. Because the same listener
// can be listening on multiple topics, we need a way to essentially
// "de-duplicate" all the listeners across all the topics.
var closed []Listener
for _, listeners := range b.listeners { for _, listeners := range b.listeners {
for _, listener := range listeners { for _, listener := range listeners {
if contains(closed, listener) {
continue
}
close(listener) close(listener)
closed = append(closed, listener)
} }
} }
b.listeners = make(map[string][]Listener) b.listeners = make(map[string][]Listener)
} }
func contains(closed []Listener, listener Listener) bool {
for _, c := range closed {
if c == listener {
return true
}
}
return false
}

View File

@ -36,8 +36,6 @@ func TestBus_Off(t *testing.T) {
bus.Off(listener, topic) bus.Off(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners") g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
close(listener)
}) })
g.It("unregisters correct listener", func() { g.It("unregisters correct listener", func() {
@ -62,9 +60,6 @@ func TestBus_Off(t *testing.T) {
// Cleanup // Cleanup
bus.Off(listener2, topic) bus.Off(listener2, topic)
close(listener)
close(listener2)
close(listener3)
}) })
}) })
} }
@ -91,7 +86,6 @@ func TestBus_On(t *testing.T) {
// Cleanup // Cleanup
bus.Off(listener, topic) bus.Off(listener, topic)
close(listener)
}) })
}) })
} }
@ -127,7 +121,6 @@ func TestBus_Publish(t *testing.T) {
<-done <-done
// Cleanup // Cleanup
close(listener)
bus.Off(listener, topic) bus.Off(listener, topic)
}) })
@ -172,9 +165,6 @@ func TestBus_Publish(t *testing.T) {
bus.Off(listener, topic) bus.Off(listener, topic)
bus.Off(listener2, topic) bus.Off(listener2, topic)
bus.Off(listener3, topic) bus.Off(listener3, topic)
close(listener)
close(listener2)
close(listener3)
}) })
}) })
} }

View File

@ -146,12 +146,10 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
break break
} }
// These functions will automatically close the channel if it hasn't been already.
h.server.Events().Off(eventChan, e...) h.server.Events().Off(eventChan, e...)
h.server.LogSink().Off(logOutput) h.server.LogSink().Off(logOutput)
h.server.InstallSink().Off(installOutput) h.server.InstallSink().Off(installOutput)
close(eventChan)
close(logOutput)
close(installOutput)
// If the internal context is stopped it is either because the parent context // If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil // got canceled or because we ran into an error. If the "err" variable is nil

View File

@ -16,14 +16,6 @@ func newSinkPool() *sinkPool {
return &sinkPool{} return &sinkPool{}
} }
// On adds a sink on the pool.
func (p *sinkPool) On(c chan []byte) {
p.mx.Lock()
defer p.mx.Unlock()
p.sinks = append(p.sinks, c)
}
// Off removes a sink from the pool. // Off removes a sink from the pool.
func (p *sinkPool) Off(c chan []byte) { func (p *sinkPool) Off(c chan []byte) {
p.mx.Lock() p.mx.Lock()
@ -39,10 +31,19 @@ func (p *sinkPool) Off(c chan []byte) {
sinks[len(sinks)-1] = nil sinks[len(sinks)-1] = nil
sinks = sinks[:len(sinks)-1] sinks = sinks[:len(sinks)-1]
p.sinks = sinks p.sinks = sinks
close(c)
return return
} }
} }
// On adds a sink on the pool.
func (p *sinkPool) On(c chan []byte) {
p.mx.Lock()
defer p.mx.Unlock()
p.sinks = append(p.sinks, c)
}
// Destroy destroys the pool by removing and closing all sinks. // Destroy destroys the pool by removing and closing all sinks.
func (p *sinkPool) Destroy() { func (p *sinkPool) Destroy() {
p.mx.Lock() p.mx.Lock()