Use buffered channels and ring-buffer logic when processing console data
This change fixes pterodactyl/panel#3921 by implementing logic to drop the oldest message in a channel and push the newest message onto the channel when the channel buffer is full. This is distinctly different than the previous implementation which just dropped the newest messages, leading to confusing behavior on the client side when a large amount of data was sent over the connection. Up to 10ms per channel is allowed for blocking before falling back to the drop logic.
This commit is contained in:
parent
68d4fb454f
commit
fab88a380e
|
@ -89,8 +89,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
eventChan := make(chan events.Event)
|
eventChan := make(chan events.Event)
|
||||||
logOutput := make(chan []byte)
|
logOutput := make(chan []byte, 8)
|
||||||
installOutput := make(chan []byte)
|
installOutput := make(chan []byte, 4)
|
||||||
h.server.Events().On(eventChan, e...)
|
h.server.Events().On(eventChan, e...)
|
||||||
h.server.Sink(server.LogSink).On(logOutput)
|
h.server.Sink(server.LogSink).On(logOutput)
|
||||||
h.server.Sink(server.InstallSink).On(installOutput)
|
h.server.Sink(server.InstallSink).On(installOutput)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SinkName represents one of the registered sinks for a server.
|
// SinkName represents one of the registered sinks for a server.
|
||||||
|
@ -79,20 +80,44 @@ func (p *sinkPool) Destroy() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Push sends a given message to each of the channels registered in the pool.
|
// Push sends a given message to each of the channels registered in the pool.
|
||||||
|
// This will use a Ring Buffer channel in order to avoid blocking the channel
|
||||||
|
// sends, and attempt to push though the most recent messages in the queue in
|
||||||
|
// favor of the oldest messages.
|
||||||
|
//
|
||||||
|
// If the channel becomes full and isn't being drained fast enough, this
|
||||||
|
// function will remove the oldest message in the channel, and then push the
|
||||||
|
// message that it got onto the end, effectively making the channel a rolling
|
||||||
|
// buffer.
|
||||||
|
//
|
||||||
|
// There is a potential for data to be lost when passing it through this
|
||||||
|
// function, but only in instances where the channel buffer is full and the
|
||||||
|
// channel is not drained fast enough, in which case dropping messages is most
|
||||||
|
// likely the best option anyways. This uses waitgroups to allow every channel
|
||||||
|
// to attempt its send concurrently thus making the total blocking time of this
|
||||||
|
// function "O(1)" instead of "O(n)".
|
||||||
func (p *sinkPool) Push(data []byte) {
|
func (p *sinkPool) Push(data []byte) {
|
||||||
p.mu.RLock()
|
p.mu.RLock()
|
||||||
// Attempt to send the data over to the channels. If the channel buffer is full,
|
defer p.mu.RUnlock()
|
||||||
// or otherwise blocked for some reason (such as being a nil channel), just discard
|
var wg sync.WaitGroup
|
||||||
// the event data and move on to the next channel in the slice. If you don't
|
wg.Add(len(p.sinks))
|
||||||
// implement the "default" on the select you'll block execution until the channel
|
|
||||||
// becomes unblocked, which is not what we want to do here.
|
|
||||||
for _, c := range p.sinks {
|
for _, c := range p.sinks {
|
||||||
|
go func(c chan []byte) {
|
||||||
|
defer wg.Done()
|
||||||
select {
|
select {
|
||||||
case c <- data:
|
case c <- data:
|
||||||
default:
|
case <-time.After(time.Millisecond * 10):
|
||||||
|
// If there is nothing in the channel to read, but we also cannot write
|
||||||
|
// to the channel, just skip over sending data. If we don't do this you'll
|
||||||
|
// end up blocking the application on the channel read below.
|
||||||
|
if len(c) == 0 {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
<-c
|
||||||
|
c <- data
|
||||||
}
|
}
|
||||||
p.mu.RUnlock()
|
}(c)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sink returns the instantiated and named sink for a server. If the sink has
|
// Sink returns the instantiated and named sink for a server. If the sink has
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/franela/goblin"
|
. "github.com/franela/goblin"
|
||||||
)
|
)
|
||||||
|
@ -123,22 +125,67 @@ func TestSink(t *testing.T) {
|
||||||
g.Assert(len(pool.sinks)).Equal(2)
|
g.Assert(len(pool.sinks)).Equal(2)
|
||||||
})
|
})
|
||||||
|
|
||||||
g.It("does not block if a channel is nil or otherwise full", func() {
|
g.It("uses a ring-buffer to avoid blocking when the channel is full", func() {
|
||||||
ch := make([]chan []byte, 2)
|
ch1 := make(chan []byte, 1)
|
||||||
ch[1] = make(chan []byte, 1)
|
ch2 := make(chan []byte, 2)
|
||||||
ch[1] <- []byte("test")
|
ch3 := make(chan []byte)
|
||||||
|
|
||||||
pool.On(ch[0])
|
// ch1 and ch2 are now full, and would block if the code doesn't account
|
||||||
pool.On(ch[1])
|
// for a full buffer.
|
||||||
|
ch1 <- []byte("pre-test")
|
||||||
|
ch2 <- []byte("pre-test")
|
||||||
|
ch2 <- []byte("pre-test 2")
|
||||||
|
|
||||||
|
pool.On(ch1)
|
||||||
|
pool.On(ch2)
|
||||||
|
pool.On(ch3)
|
||||||
|
|
||||||
pool.Push([]byte("testing"))
|
pool.Push([]byte("testing"))
|
||||||
|
time.Sleep(time.Millisecond * 20)
|
||||||
|
|
||||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||||
g.Assert(<-ch[1]).Equal([]byte("test"))
|
// We expect that value previously in the channel to have been dumped
|
||||||
|
// and therefore only the value we pushed will be present. For ch2 we
|
||||||
|
// expect only the first message was dropped, and the second one is now
|
||||||
|
// the first in the out queue.
|
||||||
|
g.Assert(<-ch1).Equal([]byte("testing"))
|
||||||
|
g.Assert(<-ch2).Equal([]byte("pre-test 2"))
|
||||||
|
g.Assert(<-ch2).Equal([]byte("testing"))
|
||||||
|
|
||||||
|
// Because nothing in this test was listening for ch3, it would have
|
||||||
|
// blocked for the 10ms duration, and then been skipped over entirely
|
||||||
|
// because it had no length to try and push onto.
|
||||||
|
g.Assert(len(ch3)).Equal(0)
|
||||||
|
|
||||||
|
// Now, push again and expect similar results.
|
||||||
|
pool.Push([]byte("testing 2"))
|
||||||
|
time.Sleep(time.Millisecond * 20)
|
||||||
|
|
||||||
pool.Push([]byte("test2"))
|
|
||||||
g.Assert(<-ch[1]).Equal([]byte("test2"))
|
|
||||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||||
|
g.Assert(<-ch1).Equal([]byte("testing 2"))
|
||||||
|
g.Assert(<-ch2).Equal([]byte("testing 2"))
|
||||||
|
})
|
||||||
|
|
||||||
|
g.It("can handle concurrent pushes FIFO", func() {
|
||||||
|
ch := make(chan []byte, 4)
|
||||||
|
|
||||||
|
pool.On(ch)
|
||||||
|
pool.On(make(chan []byte))
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
pool.Push([]byte(fmt.Sprintf("iteration %d", i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 20)
|
||||||
|
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||||
|
g.Assert(len(ch)).Equal(4)
|
||||||
|
|
||||||
|
g.Timeout(time.Millisecond * 500)
|
||||||
|
g.Assert(<-ch).Equal([]byte("iteration 96"))
|
||||||
|
g.Assert(<-ch).Equal([]byte("iteration 97"))
|
||||||
|
g.Assert(<-ch).Equal([]byte("iteration 98"))
|
||||||
|
g.Assert(<-ch).Equal([]byte("iteration 99"))
|
||||||
|
g.Assert(len(ch)).Equal(0)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user