From fab88a380e82a91c72daa6acf930a88516166810 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 30 Jan 2022 10:55:45 -0500 Subject: [PATCH] Use buffered channels and ring-buffer logic when processing console data This change fixes pterodactyl/panel#3921 by implementing logic to drop the oldest message in a channel and push the newest message onto the channel when the channel buffer is full. This is distinctly different than the previous implementation which just dropped the newest messages, leading to confusing behavior on the client side when a large amount of data was sent over the connection. Up to 10ms per channel is allowed for blocking before falling back to the drop logic. --- router/websocket/listeners.go | 4 +-- server/sink.go | 45 ++++++++++++++++++------ server/sink_test.go | 65 ++++++++++++++++++++++++++++++----- 3 files changed, 93 insertions(+), 21 deletions(-) diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index 6956551..af66b88 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -89,8 +89,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { defer cancel() eventChan := make(chan events.Event) - logOutput := make(chan []byte) - installOutput := make(chan []byte) + logOutput := make(chan []byte, 8) + installOutput := make(chan []byte, 4) h.server.Events().On(eventChan, e...) h.server.Sink(server.LogSink).On(logOutput) h.server.Sink(server.InstallSink).On(installOutput) diff --git a/server/sink.go b/server/sink.go index 41a5e93..4d056ae 100644 --- a/server/sink.go +++ b/server/sink.go @@ -2,6 +2,7 @@ package server import ( "sync" + "time" ) // SinkName represents one of the registered sinks for a server. @@ -79,20 +80,44 @@ func (p *sinkPool) Destroy() { } // Push sends a given message to each of the channels registered in the pool. +// This will use a Ring Buffer channel in order to avoid blocking the channel +// sends, and attempt to push though the most recent messages in the queue in +// favor of the oldest messages. +// +// If the channel becomes full and isn't being drained fast enough, this +// function will remove the oldest message in the channel, and then push the +// message that it got onto the end, effectively making the channel a rolling +// buffer. +// +// There is a potential for data to be lost when passing it through this +// function, but only in instances where the channel buffer is full and the +// channel is not drained fast enough, in which case dropping messages is most +// likely the best option anyways. This uses waitgroups to allow every channel +// to attempt its send concurrently thus making the total blocking time of this +// function "O(1)" instead of "O(n)". func (p *sinkPool) Push(data []byte) { p.mu.RLock() - // Attempt to send the data over to the channels. If the channel buffer is full, - // or otherwise blocked for some reason (such as being a nil channel), just discard - // the event data and move on to the next channel in the slice. If you don't - // implement the "default" on the select you'll block execution until the channel - // becomes unblocked, which is not what we want to do here. + defer p.mu.RUnlock() + var wg sync.WaitGroup + wg.Add(len(p.sinks)) for _, c := range p.sinks { - select { - case c <- data: - default: - } + go func(c chan []byte) { + defer wg.Done() + select { + case c <- data: + case <-time.After(time.Millisecond * 10): + // If there is nothing in the channel to read, but we also cannot write + // to the channel, just skip over sending data. If we don't do this you'll + // end up blocking the application on the channel read below. + if len(c) == 0 { + break + } + <-c + c <- data + } + }(c) } - p.mu.RUnlock() + wg.Wait() } // Sink returns the instantiated and named sink for a server. If the sink has diff --git a/server/sink_test.go b/server/sink_test.go index 97c763f..0d6c495 100644 --- a/server/sink_test.go +++ b/server/sink_test.go @@ -1,9 +1,11 @@ package server import ( + "fmt" "reflect" "sync" "testing" + "time" . "github.com/franela/goblin" ) @@ -123,22 +125,67 @@ func TestSink(t *testing.T) { g.Assert(len(pool.sinks)).Equal(2) }) - g.It("does not block if a channel is nil or otherwise full", func() { - ch := make([]chan []byte, 2) - ch[1] = make(chan []byte, 1) - ch[1] <- []byte("test") + g.It("uses a ring-buffer to avoid blocking when the channel is full", func() { + ch1 := make(chan []byte, 1) + ch2 := make(chan []byte, 2) + ch3 := make(chan []byte) - pool.On(ch[0]) - pool.On(ch[1]) + // ch1 and ch2 are now full, and would block if the code doesn't account + // for a full buffer. + ch1 <- []byte("pre-test") + ch2 <- []byte("pre-test") + ch2 <- []byte("pre-test 2") + + pool.On(ch1) + pool.On(ch2) + pool.On(ch3) pool.Push([]byte("testing")) + time.Sleep(time.Millisecond * 20) g.Assert(MutexLocked(&pool.mu)).IsFalse() - g.Assert(<-ch[1]).Equal([]byte("test")) + // We expect that value previously in the channel to have been dumped + // and therefore only the value we pushed will be present. For ch2 we + // expect only the first message was dropped, and the second one is now + // the first in the out queue. + g.Assert(<-ch1).Equal([]byte("testing")) + g.Assert(<-ch2).Equal([]byte("pre-test 2")) + g.Assert(<-ch2).Equal([]byte("testing")) + + // Because nothing in this test was listening for ch3, it would have + // blocked for the 10ms duration, and then been skipped over entirely + // because it had no length to try and push onto. + g.Assert(len(ch3)).Equal(0) + + // Now, push again and expect similar results. + pool.Push([]byte("testing 2")) + time.Sleep(time.Millisecond * 20) - pool.Push([]byte("test2")) - g.Assert(<-ch[1]).Equal([]byte("test2")) g.Assert(MutexLocked(&pool.mu)).IsFalse() + g.Assert(<-ch1).Equal([]byte("testing 2")) + g.Assert(<-ch2).Equal([]byte("testing 2")) + }) + + g.It("can handle concurrent pushes FIFO", func() { + ch := make(chan []byte, 4) + + pool.On(ch) + pool.On(make(chan []byte)) + + for i := 0; i < 100; i++ { + pool.Push([]byte(fmt.Sprintf("iteration %d", i))) + } + + time.Sleep(time.Millisecond * 20) + g.Assert(MutexLocked(&pool.mu)).IsFalse() + g.Assert(len(ch)).Equal(4) + + g.Timeout(time.Millisecond * 500) + g.Assert(<-ch).Equal([]byte("iteration 96")) + g.Assert(<-ch).Equal([]byte("iteration 97")) + g.Assert(<-ch).Equal([]byte("iteration 98")) + g.Assert(<-ch).Equal([]byte("iteration 99")) + g.Assert(len(ch)).Equal(0) }) })