2022-01-18 03:23:29 +00:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
2022-01-30 15:55:45 +00:00
|
|
|
"time"
|
2022-01-23 14:57:25 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// SinkName represents one of the registered sinks for a server.
|
|
|
|
type SinkName string
|
|
|
|
|
|
|
|
const (
|
|
|
|
// LogSink handles console output for game servers, including messages being
|
|
|
|
// sent via Wings to the console instance.
|
|
|
|
LogSink SinkName = "log"
|
|
|
|
// InstallSink handles installation output for a server.
|
|
|
|
InstallSink SinkName = "install"
|
2022-01-18 03:23:29 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// sinkPool represents a pool with sinks.
|
|
|
|
type sinkPool struct {
|
2022-01-23 14:57:25 +00:00
|
|
|
mu sync.RWMutex
|
2022-01-18 03:23:29 +00:00
|
|
|
sinks []chan []byte
|
|
|
|
}
|
|
|
|
|
2022-01-23 14:57:25 +00:00
|
|
|
// newSinkPool returns a new empty sinkPool. A sink pool generally lives with a
|
|
|
|
// server instance for it's full lifetime.
|
2022-01-18 03:23:29 +00:00
|
|
|
func newSinkPool() *sinkPool {
|
|
|
|
return &sinkPool{}
|
|
|
|
}
|
|
|
|
|
2022-01-23 14:57:25 +00:00
|
|
|
// On adds a channel to the sink pool instance.
|
|
|
|
func (p *sinkPool) On(c chan []byte) {
|
|
|
|
p.mu.Lock()
|
|
|
|
p.sinks = append(p.sinks, c)
|
|
|
|
p.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Off removes a given channel from the sink pool. If no matching sink is found
|
|
|
|
// this function is a no-op. If a matching channel is found, it will be removed.
|
2022-01-18 03:23:29 +00:00
|
|
|
func (p *sinkPool) Off(c chan []byte) {
|
2022-01-23 14:57:25 +00:00
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
2022-01-18 03:23:29 +00:00
|
|
|
|
|
|
|
sinks := p.sinks
|
|
|
|
for i, sink := range sinks {
|
|
|
|
if c != sink {
|
|
|
|
continue
|
|
|
|
}
|
2022-01-23 14:57:25 +00:00
|
|
|
|
|
|
|
// We need to maintain the order of the sinks in the slice we're tracking,
|
|
|
|
// so shift everything to the left, rather than changing the order of the
|
|
|
|
// elements.
|
2022-01-18 03:23:29 +00:00
|
|
|
copy(sinks[i:], sinks[i+1:])
|
|
|
|
sinks[len(sinks)-1] = nil
|
|
|
|
sinks = sinks[:len(sinks)-1]
|
|
|
|
p.sinks = sinks
|
2022-01-23 15:41:12 +00:00
|
|
|
|
|
|
|
// Avoid a panic if the sink channel is nil at this point.
|
|
|
|
if c != nil {
|
|
|
|
close(c)
|
|
|
|
}
|
2022-01-23 14:57:25 +00:00
|
|
|
|
2022-01-18 03:23:29 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-23 14:57:25 +00:00
|
|
|
// Destroy destroys the pool by removing and closing all sinks and destroying
|
|
|
|
// all of the channels that are present.
|
2022-01-18 03:23:29 +00:00
|
|
|
func (p *sinkPool) Destroy() {
|
2022-01-23 14:57:25 +00:00
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
2022-01-18 03:23:29 +00:00
|
|
|
|
|
|
|
for _, c := range p.sinks {
|
2022-01-23 15:41:12 +00:00
|
|
|
if c != nil {
|
|
|
|
close(c)
|
|
|
|
}
|
2022-01-18 03:23:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
p.sinks = nil
|
|
|
|
}
|
|
|
|
|
2022-01-23 14:57:25 +00:00
|
|
|
// Push sends a given message to each of the channels registered in the pool.
|
2022-01-30 15:55:45 +00:00
|
|
|
// This will use a Ring Buffer channel in order to avoid blocking the channel
|
|
|
|
// sends, and attempt to push though the most recent messages in the queue in
|
|
|
|
// favor of the oldest messages.
|
|
|
|
//
|
|
|
|
// If the channel becomes full and isn't being drained fast enough, this
|
|
|
|
// function will remove the oldest message in the channel, and then push the
|
|
|
|
// message that it got onto the end, effectively making the channel a rolling
|
|
|
|
// buffer.
|
|
|
|
//
|
|
|
|
// There is a potential for data to be lost when passing it through this
|
|
|
|
// function, but only in instances where the channel buffer is full and the
|
|
|
|
// channel is not drained fast enough, in which case dropping messages is most
|
|
|
|
// likely the best option anyways. This uses waitgroups to allow every channel
|
|
|
|
// to attempt its send concurrently thus making the total blocking time of this
|
|
|
|
// function "O(1)" instead of "O(n)".
|
2022-01-23 14:57:25 +00:00
|
|
|
func (p *sinkPool) Push(data []byte) {
|
|
|
|
p.mu.RLock()
|
2022-01-30 15:55:45 +00:00
|
|
|
defer p.mu.RUnlock()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(len(p.sinks))
|
2022-01-18 03:23:29 +00:00
|
|
|
for _, c := range p.sinks {
|
2022-01-30 15:55:45 +00:00
|
|
|
go func(c chan []byte) {
|
|
|
|
defer wg.Done()
|
|
|
|
select {
|
|
|
|
case c <- data:
|
|
|
|
case <-time.After(time.Millisecond * 10):
|
|
|
|
// If there is nothing in the channel to read, but we also cannot write
|
|
|
|
// to the channel, just skip over sending data. If we don't do this you'll
|
|
|
|
// end up blocking the application on the channel read below.
|
|
|
|
if len(c) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
<-c
|
|
|
|
c <- data
|
|
|
|
}
|
|
|
|
}(c)
|
2022-01-18 03:23:29 +00:00
|
|
|
}
|
2022-01-30 15:55:45 +00:00
|
|
|
wg.Wait()
|
2022-01-23 14:57:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sink returns the instantiated and named sink for a server. If the sink has
|
|
|
|
// not been configured yet this function will cause a panic condition.
|
|
|
|
func (s *Server) Sink(name SinkName) *sinkPool {
|
|
|
|
sink, ok := s.sinks[name]
|
|
|
|
if !ok {
|
|
|
|
s.Log().Fatalf("attempt to access nil sink: %s", name)
|
|
|
|
}
|
|
|
|
return sink
|
|
|
|
}
|
|
|
|
|
|
|
|
// DestroyAllSinks iterates over all of the sinks configured for the server and
|
|
|
|
// destroys their instances. Note that this will cause a panic if you attempt
|
|
|
|
// to call Server.Sink() again after. This function is only used when a server
|
|
|
|
// is being deleted from the system.
|
|
|
|
func (s *Server) DestroyAllSinks() {
|
|
|
|
s.Log().Info("destroying all registered sinks for server instance")
|
|
|
|
for _, sink := range s.sinks {
|
|
|
|
sink.Destroy()
|
|
|
|
}
|
2022-01-18 03:23:29 +00:00
|
|
|
}
|