From 0f2e9fcc0bd92a7265824ccf6b0a7cba7e7fa94b Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Wed, 2 Feb 2022 19:16:34 -0500 Subject: [PATCH] Move the sink pool to be a shared tool --- router/websocket/listeners.go | 9 ++-- server/events.go | 24 ++++++++++- server/install.go | 2 +- server/listeners.go | 3 +- server/server.go | 12 +++--- server/sink.go => system/sink_pool.go | 41 +++++-------------- .../sink_test.go => system/sink_pool_test.go | 16 ++++---- 7 files changed, 55 insertions(+), 52 deletions(-) rename server/sink.go => system/sink_pool.go (73%) rename server/sink_test.go => system/sink_pool_test.go (97%) diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index af66b88..204e3e6 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -7,6 +7,7 @@ import ( "emperror.dev/errors" "github.com/goccy/go-json" + "github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/server" @@ -92,8 +93,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { logOutput := make(chan []byte, 8) installOutput := make(chan []byte, 4) h.server.Events().On(eventChan, e...) - h.server.Sink(server.LogSink).On(logOutput) - h.server.Sink(server.InstallSink).On(installOutput) + h.server.Sink(system.LogSink).On(logOutput) + h.server.Sink(system.InstallSink).On(installOutput) onError := func(evt string, err2 error) { h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket") @@ -149,8 +150,8 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error { // These functions will automatically close the channel if it hasn't been already. h.server.Events().Off(eventChan, e...) - h.server.Sink(server.LogSink).Off(logOutput) - h.server.Sink(server.InstallSink).Off(installOutput) + h.server.Sink(system.LogSink).Off(logOutput) + h.server.Sink(system.InstallSink).Off(installOutput) // If the internal context is stopped it is either because the parent context // got canceled or because we ran into an error. If the "err" variable is nil diff --git a/server/events.go b/server/events.go index f83c17b..a7d8418 100644 --- a/server/events.go +++ b/server/events.go @@ -2,6 +2,7 @@ package server import ( "github.com/pterodactyl/wings/events" + "github.com/pterodactyl/wings/system" ) // Defines all of the possible output events for a server. @@ -20,7 +21,7 @@ const ( TransferStatusEvent = "transfer status" ) -// Returns the server's emitter instance. +// Events returns the server's emitter instance. func (s *Server) Events() *events.Bus { s.emitterLock.Lock() defer s.emitterLock.Unlock() @@ -31,3 +32,24 @@ func (s *Server) Events() *events.Bus { return s.emitter } + +// Sink returns the instantiated and named sink for a server. If the sink has +// not been configured yet this function will cause a panic condition. +func (s *Server) Sink(name system.SinkName) *system.SinkPool { + sink, ok := s.sinks[name] + if !ok { + s.Log().Fatalf("attempt to access nil sink: %s", name) + } + return sink +} + +// DestroyAllSinks iterates over all of the sinks configured for the server and +// destroys their instances. Note that this will cause a panic if you attempt +// to call Server.Sink() again after. This function is only used when a server +// is being deleted from the system. +func (s *Server) DestroyAllSinks() { + s.Log().Info("destroying all registered sinks for server instance") + for _, sink := range s.sinks { + sink.Destroy() + } +} \ No newline at end of file diff --git a/server/install.go b/server/install.go index bb9a878..789661e 100644 --- a/server/install.go +++ b/server/install.go @@ -522,7 +522,7 @@ func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) erro } defer reader.Close() - err = system.ScanReader(reader, ip.Server.Sink(InstallSink).Push) + err = system.ScanReader(reader, ip.Server.Sink(system.InstallSink).Push) if err != nil { ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") } diff --git a/server/listeners.go b/server/listeners.go index 54e8801..e5a5e29 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -8,6 +8,7 @@ import ( "time" "github.com/apex/log" + "github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/events" @@ -73,7 +74,7 @@ func (s *Server) processConsoleOutputEvent(v []byte) { return } - s.Sink(LogSink).Push(v) + s.Sink(system.LogSink).Push(v) } // StartEventListeners adds all the internal event listeners we want to use for diff --git a/server/server.go b/server/server.go index 7027431..010fd9c 100644 --- a/server/server.go +++ b/server/server.go @@ -70,10 +70,10 @@ type Server struct { wsBag *WebsocketBag wsBagLocker sync.Mutex - sinks map[SinkName]*sinkPool + sinks map[system.SinkName]*system.SinkPool - logSink *sinkPool - installSink *sinkPool + logSink *system.SinkPool + installSink *system.SinkPool } // New returns a new server instance with a context and all of the default @@ -88,9 +88,9 @@ func New(client remote.Client) (*Server, error) { transferring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false), powerLock: system.NewLocker(), - sinks: map[SinkName]*sinkPool{ - LogSink: newSinkPool(), - InstallSink: newSinkPool(), + sinks: map[system.SinkName]*system.SinkPool{ + system.LogSink: system.NewSinkPool(), + system.InstallSink: system.NewSinkPool(), }, } if err := defaults.Set(&s); err != nil { diff --git a/server/sink.go b/system/sink_pool.go similarity index 73% rename from server/sink.go rename to system/sink_pool.go index 4d056ae..1c27cd7 100644 --- a/server/sink.go +++ b/system/sink_pool.go @@ -1,4 +1,4 @@ -package server +package system import ( "sync" @@ -16,20 +16,20 @@ const ( InstallSink SinkName = "install" ) -// sinkPool represents a pool with sinks. -type sinkPool struct { +// SinkPool represents a pool with sinks. +type SinkPool struct { mu sync.RWMutex sinks []chan []byte } -// newSinkPool returns a new empty sinkPool. A sink pool generally lives with a +// NewSinkPool returns a new empty SinkPool. A sink pool generally lives with a // server instance for it's full lifetime. -func newSinkPool() *sinkPool { - return &sinkPool{} +func NewSinkPool() *SinkPool { + return &SinkPool{} } // On adds a channel to the sink pool instance. -func (p *sinkPool) On(c chan []byte) { +func (p *SinkPool) On(c chan []byte) { p.mu.Lock() p.sinks = append(p.sinks, c) p.mu.Unlock() @@ -37,7 +37,7 @@ func (p *sinkPool) On(c chan []byte) { // Off removes a given channel from the sink pool. If no matching sink is found // this function is a no-op. If a matching channel is found, it will be removed. -func (p *sinkPool) Off(c chan []byte) { +func (p *SinkPool) Off(c chan []byte) { p.mu.Lock() defer p.mu.Unlock() @@ -66,7 +66,7 @@ func (p *sinkPool) Off(c chan []byte) { // Destroy destroys the pool by removing and closing all sinks and destroying // all of the channels that are present. -func (p *sinkPool) Destroy() { +func (p *SinkPool) Destroy() { p.mu.Lock() defer p.mu.Unlock() @@ -95,7 +95,7 @@ func (p *sinkPool) Destroy() { // likely the best option anyways. This uses waitgroups to allow every channel // to attempt its send concurrently thus making the total blocking time of this // function "O(1)" instead of "O(n)". -func (p *sinkPool) Push(data []byte) { +func (p *SinkPool) Push(data []byte) { p.mu.RLock() defer p.mu.RUnlock() var wg sync.WaitGroup @@ -119,24 +119,3 @@ func (p *sinkPool) Push(data []byte) { } wg.Wait() } - -// Sink returns the instantiated and named sink for a server. If the sink has -// not been configured yet this function will cause a panic condition. -func (s *Server) Sink(name SinkName) *sinkPool { - sink, ok := s.sinks[name] - if !ok { - s.Log().Fatalf("attempt to access nil sink: %s", name) - } - return sink -} - -// DestroyAllSinks iterates over all of the sinks configured for the server and -// destroys their instances. Note that this will cause a panic if you attempt -// to call Server.Sink() again after. This function is only used when a server -// is being deleted from the system. -func (s *Server) DestroyAllSinks() { - s.Log().Info("destroying all registered sinks for server instance") - for _, sink := range s.sinks { - sink.Destroy() - } -} diff --git a/server/sink_test.go b/system/sink_pool_test.go similarity index 97% rename from server/sink_test.go rename to system/sink_pool_test.go index d309244..1f5ea70 100644 --- a/server/sink_test.go +++ b/system/sink_pool_test.go @@ -1,4 +1,4 @@ -package server +package system import ( "fmt" @@ -23,7 +23,7 @@ func TestSink(t *testing.T) { g.Describe("SinkPool#On", func() { g.It("pushes additional channels to a sink", func() { - pool := &sinkPool{} + pool := &SinkPool{} g.Assert(pool.sinks).IsZero() @@ -36,9 +36,9 @@ func TestSink(t *testing.T) { }) g.Describe("SinkPool#Off", func() { - var pool *sinkPool + var pool *SinkPool g.BeforeEach(func() { - pool = &sinkPool{} + pool = &SinkPool{} }) g.It("works when no sinks are registered", func() { @@ -97,9 +97,9 @@ func TestSink(t *testing.T) { }) g.Describe("SinkPool#Push", func() { - var pool *sinkPool + var pool *SinkPool g.BeforeEach(func() { - pool = &sinkPool{} + pool = &SinkPool{} }) g.It("works when no sinks are registered", func() { @@ -190,9 +190,9 @@ func TestSink(t *testing.T) { }) g.Describe("SinkPool#Destroy", func() { - var pool *sinkPool + var pool *SinkPool g.BeforeEach(func() { - pool = &sinkPool{} + pool = &SinkPool{} }) g.It("works if no sinks are registered", func() {