From a4904365c9e6bd5391cea4e386d7127ac33dcec1 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 23 Jan 2022 09:57:25 -0500 Subject: [PATCH] Sink pool cleanup and organization; better future support when we add more sinks --- router/router_server.go | 3 +- router/websocket/listeners.go | 8 ++-- server/install.go | 8 ++-- server/listeners.go | 2 +- server/server.go | 17 +++---- server/sink.go | 90 ++++++++++++++++++++++++----------- 6 files changed, 79 insertions(+), 49 deletions(-) diff --git a/router/router_server.go b/router/router_server.go index c5b7b8d..2ae2030 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -188,8 +188,7 @@ func deleteServer(c *gin.Context) { // as well. s.CtxCancel() s.Events().Destroy() - s.LogSink().Destroy() - s.InstallSink().Destroy() + s.DestroyAllSinks() s.Websockets().CancelAll() // Remove any pending remote file downloads for the server. diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index aae8855..e6b1db4 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -91,8 +91,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { logOutput := make(chan []byte) installOutput := make(chan []byte) h.server.Events().On(eventChan, e...) - h.server.LogSink().On(logOutput) - h.server.InstallSink().On(installOutput) + h.server.Sink(server.LogSink).On(logOutput) + h.server.Sink(server.InstallSink).On(installOutput) onError := func(evt string, err2 error) { h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket") @@ -148,8 +148,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { // These functions will automatically close the channel if it hasn't been already. h.server.Events().Off(eventChan, e...) - h.server.LogSink().Off(logOutput) - h.server.InstallSink().Off(installOutput) + h.server.Sink(server.LogSink).Off(logOutput) + h.server.Sink(server.InstallSink).Off(installOutput) // If the internal context is stopped it is either because the parent context // got canceled or because we ran into an error. If the "err" variable is nil diff --git a/server/install.go b/server/install.go index 42c3bcf..d4c6dd6 100644 --- a/server/install.go +++ b/server/install.go @@ -507,9 +507,9 @@ func (ip *InstallationProcess) Execute() (string, error) { return r.ID, nil } -// Streams the output of the installation process to a log file in the server configuration -// directory, as well as to a websocket listener so that the process can be viewed in -// the panel by administrators. +// StreamOutput streams the output of the installation process to a log file in +// the server configuration directory, as well as to a websocket listener so +// that the process can be viewed in the panel by administrators. func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error { reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{ ShowStdout: true, @@ -521,7 +521,7 @@ func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) erro } defer reader.Close() - err = system.ScanReader(reader, ip.Server.InstallSink().Push) + err = system.ScanReader(reader, ip.Server.Sink(InstallSink).Push) if err != nil { ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") } diff --git a/server/listeners.go b/server/listeners.go index a8077a9..bc08c26 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -83,7 +83,7 @@ func (s *Server) processConsoleOutputEvent(v []byte) { // If we are not throttled, go ahead and output the data. if !t.Throttled() { - s.LogSink().Push(v) + s.Sink(LogSink).Push(v) } // Also pass the data along to the console output channel. diff --git a/server/server.go b/server/server.go index 3b50684..7008bd4 100644 --- a/server/server.go +++ b/server/server.go @@ -71,6 +71,8 @@ type Server struct { wsBag *WebsocketBag wsBagLocker sync.Mutex + sinks map[SinkName]*sinkPool + logSink *sinkPool installSink *sinkPool } @@ -86,9 +88,10 @@ func New(client remote.Client) (*Server, error) { installing: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false), - - logSink: newSinkPool(), - installSink: newSinkPool(), + sinks: map[SinkName]*sinkPool{ + LogSink: newSinkPool(), + InstallSink: newSinkPool(), + }, } if err := defaults.Set(&s); err != nil { return nil, errors.Wrap(err, "server: could not set default values for struct") @@ -355,11 +358,3 @@ func (s *Server) ToAPIResponse() APIResponse { Configuration: *s.Config(), } } - -func (s *Server) LogSink() *sinkPool { - return s.logSink -} - -func (s *Server) InstallSink() *sinkPool { - return s.installSink -} diff --git a/server/sink.go b/server/sink.go index 908748e..a6f5db9 100644 --- a/server/sink.go +++ b/server/sink.go @@ -2,52 +2,70 @@ package server import ( "sync" - "time" +) + +// SinkName represents one of the registered sinks for a server. +type SinkName string + +const ( + // LogSink handles console output for game servers, including messages being + // sent via Wings to the console instance. + LogSink SinkName = "log" + // InstallSink handles installation output for a server. + InstallSink SinkName = "install" ) // sinkPool represents a pool with sinks. type sinkPool struct { - mx sync.RWMutex + mu sync.RWMutex sinks []chan []byte } -// newSinkPool returns a new empty sinkPool. +// newSinkPool returns a new empty sinkPool. A sink pool generally lives with a +// server instance for it's full lifetime. func newSinkPool() *sinkPool { return &sinkPool{} } -// Off removes a sink from the pool. +// On adds a channel to the sink pool instance. +func (p *sinkPool) On(c chan []byte) { + p.mu.Lock() + p.sinks = append(p.sinks, c) + p.mu.Unlock() +} + +// Off removes a given channel from the sink pool. If no matching sink is found +// this function is a no-op. If a matching channel is found, it will be removed. func (p *sinkPool) Off(c chan []byte) { - p.mx.Lock() - defer p.mx.Unlock() + p.mu.Lock() + defer p.mu.Unlock() sinks := p.sinks - for i, sink := range sinks { if c != sink { continue } + + // We need to maintain the order of the sinks in the slice we're tracking, + // so shift everything to the left, rather than changing the order of the + // elements. copy(sinks[i:], sinks[i+1:]) sinks[len(sinks)-1] = nil sinks = sinks[:len(sinks)-1] + + // Update our tracked sinks, and close the matched channel. p.sinks = sinks close(c) + return } } -// On adds a sink on the pool. -func (p *sinkPool) On(c chan []byte) { - p.mx.Lock() - defer p.mx.Unlock() - - p.sinks = append(p.sinks, c) -} - -// Destroy destroys the pool by removing and closing all sinks. +// Destroy destroys the pool by removing and closing all sinks and destroying +// all of the channels that are present. func (p *sinkPool) Destroy() { - p.mx.Lock() - defer p.mx.Unlock() + p.mu.Lock() + defer p.mu.Unlock() for _, c := range p.sinks { close(c) @@ -56,17 +74,35 @@ func (p *sinkPool) Destroy() { p.sinks = nil } -// Push pushes a message to all registered sinks. -func (p *sinkPool) Push(v []byte) { - p.mx.RLock() +// Push sends a given message to each of the channels registered in the pool. +func (p *sinkPool) Push(data []byte) { + p.mu.RLock() for _, c := range p.sinks { - // TODO: should this be done in parallel? select { - // Send the log output to the channel - case c <- v: - // Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled. - case <-time.After(100 * time.Millisecond): + // Send the event data over to the channels. + case c <- data: } } - p.mx.RUnlock() + p.mu.RUnlock() +} + +// Sink returns the instantiated and named sink for a server. If the sink has +// not been configured yet this function will cause a panic condition. +func (s *Server) Sink(name SinkName) *sinkPool { + sink, ok := s.sinks[name] + if !ok { + s.Log().Fatalf("attempt to access nil sink: %s", name) + } + return sink +} + +// DestroyAllSinks iterates over all of the sinks configured for the server and +// destroys their instances. Note that this will cause a panic if you attempt +// to call Server.Sink() again after. This function is only used when a server +// is being deleted from the system. +func (s *Server) DestroyAllSinks() { + s.Log().Info("destroying all registered sinks for server instance") + for _, sink := range s.sinks { + sink.Destroy() + } }