Sink pool cleanup and organization; better future support when we add more sinks

This commit is contained in:
Dane Everitt 2022-01-23 09:57:25 -05:00
parent 2a9c9e893e
commit a4904365c9
6 changed files with 79 additions and 49 deletions

View File

@ -188,8 +188,7 @@ func deleteServer(c *gin.Context) {
// as well. // as well.
s.CtxCancel() s.CtxCancel()
s.Events().Destroy() s.Events().Destroy()
s.LogSink().Destroy() s.DestroyAllSinks()
s.InstallSink().Destroy()
s.Websockets().CancelAll() s.Websockets().CancelAll()
// Remove any pending remote file downloads for the server. // Remove any pending remote file downloads for the server.

View File

@ -91,8 +91,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
logOutput := make(chan []byte) logOutput := make(chan []byte)
installOutput := make(chan []byte) installOutput := make(chan []byte)
h.server.Events().On(eventChan, e...) h.server.Events().On(eventChan, e...)
h.server.LogSink().On(logOutput) h.server.Sink(server.LogSink).On(logOutput)
h.server.InstallSink().On(installOutput) h.server.Sink(server.InstallSink).On(installOutput)
onError := func(evt string, err2 error) { onError := func(evt string, err2 error) {
h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket") h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket")
@ -148,8 +148,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
// These functions will automatically close the channel if it hasn't been already. // These functions will automatically close the channel if it hasn't been already.
h.server.Events().Off(eventChan, e...) h.server.Events().Off(eventChan, e...)
h.server.LogSink().Off(logOutput) h.server.Sink(server.LogSink).Off(logOutput)
h.server.InstallSink().Off(installOutput) h.server.Sink(server.InstallSink).Off(installOutput)
// If the internal context is stopped it is either because the parent context // If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil // got canceled or because we ran into an error. If the "err" variable is nil

View File

@ -507,9 +507,9 @@ func (ip *InstallationProcess) Execute() (string, error) {
return r.ID, nil return r.ID, nil
} }
// Streams the output of the installation process to a log file in the server configuration // StreamOutput streams the output of the installation process to a log file in
// directory, as well as to a websocket listener so that the process can be viewed in // the server configuration directory, as well as to a websocket listener so
// the panel by administrators. // that the process can be viewed in the panel by administrators.
func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error { func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error {
reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{ reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
@ -521,7 +521,7 @@ func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) erro
} }
defer reader.Close() defer reader.Close()
err = system.ScanReader(reader, ip.Server.InstallSink().Push) err = system.ScanReader(reader, ip.Server.Sink(InstallSink).Push)
if err != nil { if err != nil {
ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines")
} }

View File

@ -83,7 +83,7 @@ func (s *Server) processConsoleOutputEvent(v []byte) {
// If we are not throttled, go ahead and output the data. // If we are not throttled, go ahead and output the data.
if !t.Throttled() { if !t.Throttled() {
s.LogSink().Push(v) s.Sink(LogSink).Push(v)
} }
// Also pass the data along to the console output channel. // Also pass the data along to the console output channel.

View File

@ -71,6 +71,8 @@ type Server struct {
wsBag *WebsocketBag wsBag *WebsocketBag
wsBagLocker sync.Mutex wsBagLocker sync.Mutex
sinks map[SinkName]*sinkPool
logSink *sinkPool logSink *sinkPool
installSink *sinkPool installSink *sinkPool
} }
@ -86,9 +88,10 @@ func New(client remote.Client) (*Server, error) {
installing: system.NewAtomicBool(false), installing: system.NewAtomicBool(false),
transferring: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false),
restoring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false),
sinks: map[SinkName]*sinkPool{
logSink: newSinkPool(), LogSink: newSinkPool(),
installSink: newSinkPool(), InstallSink: newSinkPool(),
},
} }
if err := defaults.Set(&s); err != nil { if err := defaults.Set(&s); err != nil {
return nil, errors.Wrap(err, "server: could not set default values for struct") return nil, errors.Wrap(err, "server: could not set default values for struct")
@ -355,11 +358,3 @@ func (s *Server) ToAPIResponse() APIResponse {
Configuration: *s.Config(), Configuration: *s.Config(),
} }
} }
func (s *Server) LogSink() *sinkPool {
return s.logSink
}
func (s *Server) InstallSink() *sinkPool {
return s.installSink
}

View File

@ -2,52 +2,70 @@ package server
import ( import (
"sync" "sync"
"time" )
// SinkName represents one of the registered sinks for a server.
type SinkName string
const (
// LogSink handles console output for game servers, including messages being
// sent via Wings to the console instance.
LogSink SinkName = "log"
// InstallSink handles installation output for a server.
InstallSink SinkName = "install"
) )
// sinkPool represents a pool with sinks. // sinkPool represents a pool with sinks.
type sinkPool struct { type sinkPool struct {
mx sync.RWMutex mu sync.RWMutex
sinks []chan []byte sinks []chan []byte
} }
// newSinkPool returns a new empty sinkPool. // newSinkPool returns a new empty sinkPool. A sink pool generally lives with a
// server instance for it's full lifetime.
func newSinkPool() *sinkPool { func newSinkPool() *sinkPool {
return &sinkPool{} return &sinkPool{}
} }
// Off removes a sink from the pool. // On adds a channel to the sink pool instance.
func (p *sinkPool) On(c chan []byte) {
p.mu.Lock()
p.sinks = append(p.sinks, c)
p.mu.Unlock()
}
// Off removes a given channel from the sink pool. If no matching sink is found
// this function is a no-op. If a matching channel is found, it will be removed.
func (p *sinkPool) Off(c chan []byte) { func (p *sinkPool) Off(c chan []byte) {
p.mx.Lock() p.mu.Lock()
defer p.mx.Unlock() defer p.mu.Unlock()
sinks := p.sinks sinks := p.sinks
for i, sink := range sinks { for i, sink := range sinks {
if c != sink { if c != sink {
continue continue
} }
// We need to maintain the order of the sinks in the slice we're tracking,
// so shift everything to the left, rather than changing the order of the
// elements.
copy(sinks[i:], sinks[i+1:]) copy(sinks[i:], sinks[i+1:])
sinks[len(sinks)-1] = nil sinks[len(sinks)-1] = nil
sinks = sinks[:len(sinks)-1] sinks = sinks[:len(sinks)-1]
// Update our tracked sinks, and close the matched channel.
p.sinks = sinks p.sinks = sinks
close(c) close(c)
return return
} }
} }
// On adds a sink on the pool. // Destroy destroys the pool by removing and closing all sinks and destroying
func (p *sinkPool) On(c chan []byte) { // all of the channels that are present.
p.mx.Lock()
defer p.mx.Unlock()
p.sinks = append(p.sinks, c)
}
// Destroy destroys the pool by removing and closing all sinks.
func (p *sinkPool) Destroy() { func (p *sinkPool) Destroy() {
p.mx.Lock() p.mu.Lock()
defer p.mx.Unlock() defer p.mu.Unlock()
for _, c := range p.sinks { for _, c := range p.sinks {
close(c) close(c)
@ -56,17 +74,35 @@ func (p *sinkPool) Destroy() {
p.sinks = nil p.sinks = nil
} }
// Push pushes a message to all registered sinks. // Push sends a given message to each of the channels registered in the pool.
func (p *sinkPool) Push(v []byte) { func (p *sinkPool) Push(data []byte) {
p.mx.RLock() p.mu.RLock()
for _, c := range p.sinks { for _, c := range p.sinks {
// TODO: should this be done in parallel?
select { select {
// Send the log output to the channel // Send the event data over to the channels.
case c <- v: case c <- data:
// Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled.
case <-time.After(100 * time.Millisecond):
} }
} }
p.mx.RUnlock() p.mu.RUnlock()
}
// Sink returns the instantiated and named sink for a server. If the sink has
// not been configured yet this function will cause a panic condition.
func (s *Server) Sink(name SinkName) *sinkPool {
sink, ok := s.sinks[name]
if !ok {
s.Log().Fatalf("attempt to access nil sink: %s", name)
}
return sink
}
// DestroyAllSinks iterates over all of the sinks configured for the server and
// destroys their instances. Note that this will cause a panic if you attempt
// to call Server.Sink() again after. This function is only used when a server
// is being deleted from the system.
func (s *Server) DestroyAllSinks() {
s.Log().Info("destroying all registered sinks for server instance")
for _, sink := range s.sinks {
sink.Destroy()
}
} }