diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index 6956551..af66b88 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -89,8 +89,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { defer cancel() eventChan := make(chan events.Event) - logOutput := make(chan []byte) - installOutput := make(chan []byte) + logOutput := make(chan []byte, 8) + installOutput := make(chan []byte, 4) h.server.Events().On(eventChan, e...) h.server.Sink(server.LogSink).On(logOutput) h.server.Sink(server.InstallSink).On(installOutput) diff --git a/server/sink.go b/server/sink.go index 41a5e93..4d056ae 100644 --- a/server/sink.go +++ b/server/sink.go @@ -2,6 +2,7 @@ package server import ( "sync" + "time" ) // SinkName represents one of the registered sinks for a server. @@ -79,20 +80,44 @@ func (p *sinkPool) Destroy() { } // Push sends a given message to each of the channels registered in the pool. +// This will use a Ring Buffer channel in order to avoid blocking the channel +// sends, and attempt to push though the most recent messages in the queue in +// favor of the oldest messages. +// +// If the channel becomes full and isn't being drained fast enough, this +// function will remove the oldest message in the channel, and then push the +// message that it got onto the end, effectively making the channel a rolling +// buffer. +// +// There is a potential for data to be lost when passing it through this +// function, but only in instances where the channel buffer is full and the +// channel is not drained fast enough, in which case dropping messages is most +// likely the best option anyways. This uses waitgroups to allow every channel +// to attempt its send concurrently thus making the total blocking time of this +// function "O(1)" instead of "O(n)". func (p *sinkPool) Push(data []byte) { p.mu.RLock() - // Attempt to send the data over to the channels. If the channel buffer is full, - // or otherwise blocked for some reason (such as being a nil channel), just discard - // the event data and move on to the next channel in the slice. If you don't - // implement the "default" on the select you'll block execution until the channel - // becomes unblocked, which is not what we want to do here. + defer p.mu.RUnlock() + var wg sync.WaitGroup + wg.Add(len(p.sinks)) for _, c := range p.sinks { - select { - case c <- data: - default: - } + go func(c chan []byte) { + defer wg.Done() + select { + case c <- data: + case <-time.After(time.Millisecond * 10): + // If there is nothing in the channel to read, but we also cannot write + // to the channel, just skip over sending data. If we don't do this you'll + // end up blocking the application on the channel read below. + if len(c) == 0 { + break + } + <-c + c <- data + } + }(c) } - p.mu.RUnlock() + wg.Wait() } // Sink returns the instantiated and named sink for a server. If the sink has diff --git a/server/sink_test.go b/server/sink_test.go index 97c763f..0d6c495 100644 --- a/server/sink_test.go +++ b/server/sink_test.go @@ -1,9 +1,11 @@ package server import ( + "fmt" "reflect" "sync" "testing" + "time" . "github.com/franela/goblin" ) @@ -123,22 +125,67 @@ func TestSink(t *testing.T) { g.Assert(len(pool.sinks)).Equal(2) }) - g.It("does not block if a channel is nil or otherwise full", func() { - ch := make([]chan []byte, 2) - ch[1] = make(chan []byte, 1) - ch[1] <- []byte("test") + g.It("uses a ring-buffer to avoid blocking when the channel is full", func() { + ch1 := make(chan []byte, 1) + ch2 := make(chan []byte, 2) + ch3 := make(chan []byte) - pool.On(ch[0]) - pool.On(ch[1]) + // ch1 and ch2 are now full, and would block if the code doesn't account + // for a full buffer. + ch1 <- []byte("pre-test") + ch2 <- []byte("pre-test") + ch2 <- []byte("pre-test 2") + + pool.On(ch1) + pool.On(ch2) + pool.On(ch3) pool.Push([]byte("testing")) + time.Sleep(time.Millisecond * 20) g.Assert(MutexLocked(&pool.mu)).IsFalse() - g.Assert(<-ch[1]).Equal([]byte("test")) + // We expect that value previously in the channel to have been dumped + // and therefore only the value we pushed will be present. For ch2 we + // expect only the first message was dropped, and the second one is now + // the first in the out queue. + g.Assert(<-ch1).Equal([]byte("testing")) + g.Assert(<-ch2).Equal([]byte("pre-test 2")) + g.Assert(<-ch2).Equal([]byte("testing")) + + // Because nothing in this test was listening for ch3, it would have + // blocked for the 10ms duration, and then been skipped over entirely + // because it had no length to try and push onto. + g.Assert(len(ch3)).Equal(0) + + // Now, push again and expect similar results. + pool.Push([]byte("testing 2")) + time.Sleep(time.Millisecond * 20) - pool.Push([]byte("test2")) - g.Assert(<-ch[1]).Equal([]byte("test2")) g.Assert(MutexLocked(&pool.mu)).IsFalse() + g.Assert(<-ch1).Equal([]byte("testing 2")) + g.Assert(<-ch2).Equal([]byte("testing 2")) + }) + + g.It("can handle concurrent pushes FIFO", func() { + ch := make(chan []byte, 4) + + pool.On(ch) + pool.On(make(chan []byte)) + + for i := 0; i < 100; i++ { + pool.Push([]byte(fmt.Sprintf("iteration %d", i))) + } + + time.Sleep(time.Millisecond * 20) + g.Assert(MutexLocked(&pool.mu)).IsFalse() + g.Assert(len(ch)).Equal(4) + + g.Timeout(time.Millisecond * 500) + g.Assert(<-ch).Equal([]byte("iteration 96")) + g.Assert(<-ch).Equal([]byte("iteration 97")) + g.Assert(<-ch).Equal([]byte("iteration 98")) + g.Assert(<-ch).Equal([]byte("iteration 99")) + g.Assert(len(ch)).Equal(0) }) })