Move the sink pool to be a shared tool
This commit is contained in:
121
system/sink_pool.go
Normal file
121
system/sink_pool.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SinkName represents one of the registered sinks for a server.
|
||||
type SinkName string
|
||||
|
||||
const (
|
||||
// LogSink handles console output for game servers, including messages being
|
||||
// sent via Wings to the console instance.
|
||||
LogSink SinkName = "log"
|
||||
// InstallSink handles installation output for a server.
|
||||
InstallSink SinkName = "install"
|
||||
)
|
||||
|
||||
// SinkPool represents a pool with sinks.
|
||||
type SinkPool struct {
|
||||
mu sync.RWMutex
|
||||
sinks []chan []byte
|
||||
}
|
||||
|
||||
// NewSinkPool returns a new empty SinkPool. A sink pool generally lives with a
|
||||
// server instance for it's full lifetime.
|
||||
func NewSinkPool() *SinkPool {
|
||||
return &SinkPool{}
|
||||
}
|
||||
|
||||
// On adds a channel to the sink pool instance.
|
||||
func (p *SinkPool) On(c chan []byte) {
|
||||
p.mu.Lock()
|
||||
p.sinks = append(p.sinks, c)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// Off removes a given channel from the sink pool. If no matching sink is found
|
||||
// this function is a no-op. If a matching channel is found, it will be removed.
|
||||
func (p *SinkPool) Off(c chan []byte) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
sinks := p.sinks
|
||||
for i, sink := range sinks {
|
||||
if c != sink {
|
||||
continue
|
||||
}
|
||||
|
||||
// We need to maintain the order of the sinks in the slice we're tracking,
|
||||
// so shift everything to the left, rather than changing the order of the
|
||||
// elements.
|
||||
copy(sinks[i:], sinks[i+1:])
|
||||
sinks[len(sinks)-1] = nil
|
||||
sinks = sinks[:len(sinks)-1]
|
||||
p.sinks = sinks
|
||||
|
||||
// Avoid a panic if the sink channel is nil at this point.
|
||||
if c != nil {
|
||||
close(c)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Destroy destroys the pool by removing and closing all sinks and destroying
|
||||
// all of the channels that are present.
|
||||
func (p *SinkPool) Destroy() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
for _, c := range p.sinks {
|
||||
if c != nil {
|
||||
close(c)
|
||||
}
|
||||
}
|
||||
|
||||
p.sinks = nil
|
||||
}
|
||||
|
||||
// Push sends a given message to each of the channels registered in the pool.
|
||||
// This will use a Ring Buffer channel in order to avoid blocking the channel
|
||||
// sends, and attempt to push though the most recent messages in the queue in
|
||||
// favor of the oldest messages.
|
||||
//
|
||||
// If the channel becomes full and isn't being drained fast enough, this
|
||||
// function will remove the oldest message in the channel, and then push the
|
||||
// message that it got onto the end, effectively making the channel a rolling
|
||||
// buffer.
|
||||
//
|
||||
// There is a potential for data to be lost when passing it through this
|
||||
// function, but only in instances where the channel buffer is full and the
|
||||
// channel is not drained fast enough, in which case dropping messages is most
|
||||
// likely the best option anyways. This uses waitgroups to allow every channel
|
||||
// to attempt its send concurrently thus making the total blocking time of this
|
||||
// function "O(1)" instead of "O(n)".
|
||||
func (p *SinkPool) Push(data []byte) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(p.sinks))
|
||||
for _, c := range p.sinks {
|
||||
go func(c chan []byte) {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case c <- data:
|
||||
case <-time.After(time.Millisecond * 10):
|
||||
// If there is nothing in the channel to read, but we also cannot write
|
||||
// to the channel, just skip over sending data. If we don't do this you'll
|
||||
// end up blocking the application on the channel read below.
|
||||
if len(c) == 0 {
|
||||
break
|
||||
}
|
||||
<-c
|
||||
c <- data
|
||||
}
|
||||
}(c)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
236
system/sink_pool_test.go
Normal file
236
system/sink_pool_test.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/franela/goblin"
|
||||
)
|
||||
|
||||
func MutexLocked(m *sync.RWMutex) bool {
|
||||
v := reflect.ValueOf(m).Elem()
|
||||
|
||||
state := v.FieldByName("w").FieldByName("state")
|
||||
|
||||
return state.Int()&1 == 1 || v.FieldByName("readerCount").Int() > 0
|
||||
}
|
||||
|
||||
func TestSink(t *testing.T) {
|
||||
g := Goblin(t)
|
||||
|
||||
g.Describe("SinkPool#On", func() {
|
||||
g.It("pushes additional channels to a sink", func() {
|
||||
pool := &SinkPool{}
|
||||
|
||||
g.Assert(pool.sinks).IsZero()
|
||||
|
||||
c1 := make(chan []byte, 1)
|
||||
pool.On(c1)
|
||||
|
||||
g.Assert(len(pool.sinks)).Equal(1)
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
})
|
||||
|
||||
g.Describe("SinkPool#Off", func() {
|
||||
var pool *SinkPool
|
||||
g.BeforeEach(func() {
|
||||
pool = &SinkPool{}
|
||||
})
|
||||
|
||||
g.It("works when no sinks are registered", func() {
|
||||
ch := make(chan []byte, 1)
|
||||
|
||||
g.Assert(pool.sinks).IsZero()
|
||||
pool.Off(ch)
|
||||
|
||||
g.Assert(pool.sinks).IsZero()
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
|
||||
g.It("does not remove any sinks when the channel does not match", func() {
|
||||
ch := make(chan []byte, 1)
|
||||
ch2 := make(chan []byte, 1)
|
||||
|
||||
pool.On(ch)
|
||||
g.Assert(len(pool.sinks)).Equal(1)
|
||||
|
||||
pool.Off(ch2)
|
||||
g.Assert(len(pool.sinks)).Equal(1)
|
||||
g.Assert(pool.sinks[0]).Equal(ch)
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
|
||||
g.It("removes a channel and maintains the order", func() {
|
||||
channels := make([]chan []byte, 8)
|
||||
for i := 0; i < len(channels); i++ {
|
||||
channels[i] = make(chan []byte, 1)
|
||||
pool.On(channels[i])
|
||||
}
|
||||
|
||||
g.Assert(len(pool.sinks)).Equal(8)
|
||||
|
||||
pool.Off(channels[2])
|
||||
g.Assert(len(pool.sinks)).Equal(7)
|
||||
g.Assert(pool.sinks[1]).Equal(channels[1])
|
||||
g.Assert(pool.sinks[2]).Equal(channels[3])
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
|
||||
g.It("does not panic if a nil channel is provided", func() {
|
||||
ch := make([]chan []byte, 1)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.Fail("removing a nil channel should not cause a panic")
|
||||
}
|
||||
}()
|
||||
|
||||
pool.On(ch[0])
|
||||
pool.Off(ch[0])
|
||||
|
||||
g.Assert(len(pool.sinks)).Equal(0)
|
||||
})
|
||||
})
|
||||
|
||||
g.Describe("SinkPool#Push", func() {
|
||||
var pool *SinkPool
|
||||
g.BeforeEach(func() {
|
||||
pool = &SinkPool{}
|
||||
})
|
||||
|
||||
g.It("works when no sinks are registered", func() {
|
||||
g.Assert(len(pool.sinks)).IsZero()
|
||||
pool.Push([]byte("test"))
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
|
||||
g.It("sends data to every registered sink", func() {
|
||||
ch1 := make(chan []byte, 1)
|
||||
ch2 := make(chan []byte, 1)
|
||||
|
||||
pool.On(ch1)
|
||||
pool.On(ch2)
|
||||
|
||||
g.Assert(len(pool.sinks)).Equal(2)
|
||||
b := []byte("test")
|
||||
pool.Push(b)
|
||||
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
g.Assert(<-ch1).Equal(b)
|
||||
g.Assert(<-ch2).Equal(b)
|
||||
g.Assert(len(pool.sinks)).Equal(2)
|
||||
})
|
||||
|
||||
g.It("uses a ring-buffer to avoid blocking when the channel is full", func() {
|
||||
ch1 := make(chan []byte, 1)
|
||||
ch2 := make(chan []byte, 2)
|
||||
ch3 := make(chan []byte)
|
||||
|
||||
// ch1 and ch2 are now full, and would block if the code doesn't account
|
||||
// for a full buffer.
|
||||
ch1 <- []byte("pre-test")
|
||||
ch2 <- []byte("pre-test")
|
||||
ch2 <- []byte("pre-test 2")
|
||||
|
||||
pool.On(ch1)
|
||||
pool.On(ch2)
|
||||
pool.On(ch3)
|
||||
|
||||
pool.Push([]byte("testing"))
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
// We expect that value previously in the channel to have been dumped
|
||||
// and therefore only the value we pushed will be present. For ch2 we
|
||||
// expect only the first message was dropped, and the second one is now
|
||||
// the first in the out queue.
|
||||
g.Assert(<-ch1).Equal([]byte("testing"))
|
||||
g.Assert(<-ch2).Equal([]byte("pre-test 2"))
|
||||
g.Assert(<-ch2).Equal([]byte("testing"))
|
||||
|
||||
// Because nothing in this test was listening for ch3, it would have
|
||||
// blocked for the 10ms duration, and then been skipped over entirely
|
||||
// because it had no length to try and push onto.
|
||||
g.Assert(len(ch3)).Equal(0)
|
||||
|
||||
// Now, push again and expect similar results.
|
||||
pool.Push([]byte("testing 2"))
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
g.Assert(<-ch1).Equal([]byte("testing 2"))
|
||||
g.Assert(<-ch2).Equal([]byte("testing 2"))
|
||||
})
|
||||
|
||||
g.It("can handle concurrent pushes FIFO", func() {
|
||||
ch := make(chan []byte, 4)
|
||||
|
||||
pool.On(ch)
|
||||
pool.On(make(chan []byte))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
pool.Push([]byte(fmt.Sprintf("iteration %d", i)))
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
g.Assert(len(ch)).Equal(4)
|
||||
|
||||
g.Timeout(time.Millisecond * 500)
|
||||
g.Assert(<-ch).Equal([]byte("iteration 96"))
|
||||
g.Assert(<-ch).Equal([]byte("iteration 97"))
|
||||
g.Assert(<-ch).Equal([]byte("iteration 98"))
|
||||
g.Assert(<-ch).Equal([]byte("iteration 99"))
|
||||
g.Assert(len(ch)).Equal(0)
|
||||
})
|
||||
})
|
||||
|
||||
g.Describe("SinkPool#Destroy", func() {
|
||||
var pool *SinkPool
|
||||
g.BeforeEach(func() {
|
||||
pool = &SinkPool{}
|
||||
})
|
||||
|
||||
g.It("works if no sinks are registered", func() {
|
||||
pool.Destroy()
|
||||
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
|
||||
g.It("closes all channels fully", func() {
|
||||
ch1 := make(chan []byte, 1)
|
||||
ch2 := make(chan []byte, 1)
|
||||
|
||||
pool.On(ch1)
|
||||
pool.On(ch2)
|
||||
|
||||
g.Assert(len(pool.sinks)).Equal(2)
|
||||
pool.Destroy()
|
||||
g.Assert(pool.sinks).IsZero()
|
||||
|
||||
defer func() {
|
||||
r := recover()
|
||||
|
||||
g.Assert(r).IsNotNil()
|
||||
g.Assert(r.(error).Error()).Equal("send on closed channel")
|
||||
}()
|
||||
|
||||
ch1 <- []byte("test")
|
||||
})
|
||||
|
||||
g.It("works when a sink channel is nil", func() {
|
||||
ch := make([]chan []byte, 2)
|
||||
|
||||
pool.On(ch[0])
|
||||
pool.On(ch[1])
|
||||
|
||||
pool.Destroy()
|
||||
|
||||
g.Assert(MutexLocked(&pool.mu)).IsFalse()
|
||||
})
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user