From 0cd8dc2b5f576ccb9ccd943410172eb4d4c10b13 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Thu, 10 Sep 2020 20:05:01 -0700 Subject: [PATCH] avoid deadlocks while processing tons of data over server event listeners; closes pterodactyl/panel#2298 --- environment/docker/stats.go | 18 +++++----- server/filesystem.go | 42 ++++++++++++++--------- server/listeners.go | 66 +++++++++++++++++++------------------ server/loader.go | 2 +- 4 files changed, 72 insertions(+), 56 deletions(-) diff --git a/environment/docker/stats.go b/environment/docker/stats.go index 7053057..84b0034 100644 --- a/environment/docker/stats.go +++ b/environment/docker/stats.go @@ -15,11 +15,13 @@ import ( // Attach to the instance and then automatically emit an event whenever the resource usage for the // server process changes. func (e *Environment) pollResources(ctx context.Context) error { - log.WithField("container_id", e.Id).Debug("starting resource polling..") - defer log.WithField("container_id", e.Id).Debug("resource polling stopped") + l := log.WithField("container_id", e.Id) + + l.Debug("starting resource polling for container") + defer l.Debug("stopped resource polling for container") if e.State() == environment.ProcessOfflineState { - return errors.New("attempting to enable resource polling on a stopped server instance") + return errors.New("cannot enable resource polling on a stopped server") } stats, err := e.client.ContainerStats(context.Background(), e.Id, true) @@ -34,22 +36,22 @@ func (e *Environment) pollResources(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - default: var v *types.StatsJSON if err := dec.Decode(&v); err != nil { if err != io.EOF { - log.WithField("container_id", e.Id).Warn("encountered error processing docker stats output, stopping collection") + l.WithField("error", errors.WithStack(err)).Warn("error while processing Docker stats output for container") + } else { + l.Debug("io.EOF encountered during stats decode, stopping polling...") } - log.WithField("container_id", e.Id).Debug("detected io.EOF, stopping resource polling") return nil } // Disable collection if the server is in an offline state and this process is still running. if e.State() == environment.ProcessOfflineState { - log.WithField("container_id", e.Id).Debug("process in offline state while resource polling is still active; stopping poll") + l.Debug("process in offline state while resource polling is still active; stopping poll") return nil } @@ -74,7 +76,7 @@ func (e *Environment) pollResources(ctx context.Context) error { } if b, err := json.Marshal(st); err != nil { - log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while marshaling stats object for environment") + l.WithField("error", errors.WithStack(err)).Warn("error while marshaling stats object for environment") } else { e.Events().Publish(environment.ResourceEvent, string(b)) } diff --git a/server/filesystem.go b/server/filesystem.go index 346c6e9..0bd1c41 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -41,7 +41,8 @@ func IsPathResolutionError(err error) bool { } type Filesystem struct { - mu sync.Mutex + mu sync.Mutex + lookupTimeMu sync.RWMutex lastLookupTime time.Time lookupInProgress int32 @@ -254,17 +255,25 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool { // This is primarily to avoid a bunch of I/O operations from piling up on the server, especially on servers // with a large amount of files. func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) { - // Check if cache is expired... - if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) { - // If we are now allowing a stale response, or there is no lookup currently in progress, go ahead - // and perform the lookup and return the fresh value. This is a blocking operation to the calling - // process. - if !allowStaleValue || atomic.LoadInt32(&fs.lookupInProgress) == 0 { - return fs.updateCachedDiskUsage() - } + // Check if cache is expired. + fs.lookupTimeMu.RLock() + isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10)) + fs.lookupTimeMu.RUnlock() - // Otherwise, just go ahead and perform the cached disk usage update. - go fs.updateCachedDiskUsage() + if !isValidInCache { + // If we are now allowing a stale response go ahead and perform the lookup and return the fresh + // value. This is a blocking operation to the calling process. + if !allowStaleValue { + return fs.updateCachedDiskUsage() + } else if atomic.LoadInt32(&fs.lookupInProgress) == 0 { + // Otherwise, if we allow a stale value and there isn't a valid item in the cache and we aren't + // currently performing a lookup, just do the disk usage calculation in the background. + go func(fs *Filesystem) { + if _, err := fs.updateCachedDiskUsage(); err != nil { + fs.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to determine disk usage in go-routine") + } + }(fs) + } } // Return the currently cached value back to the calling function. @@ -279,11 +288,11 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { fs.mu.Lock() defer fs.mu.Unlock() - // Always clear the in progress flag when this process finishes. - defer atomic.StoreInt32(&fs.lookupInProgress, 0) - - // Signal that we're currently updating the disk size, to prevent other routines to block on this. + // Signal that we're currently updating the disk size so that other calls to the disk checking + // functions can determine if they should queue up additional calls to this function. Ensure that + // we always set this back to 0 when this process is done executing. atomic.StoreInt32(&fs.lookupInProgress, 1) + defer atomic.StoreInt32(&fs.lookupInProgress, 0) // If there is no size its either because there is no data (in which case running this function // will have effectively no impact), or there is nothing in the cache, in which case we need to @@ -294,7 +303,10 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { // Always cache the size, even if there is an error. We want to always return that value // so that we don't cause an endless loop of determining the disk size if there is a temporary // error encountered. + fs.lookupTimeMu.Lock() fs.lastLookupTime = time.Now() + fs.lookupTimeMu.Unlock() + atomic.StoreInt64(&fs.disk, size) return size, err diff --git a/server/listeners.go b/server/listeners.go index 439eab0..5f70027 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -21,40 +21,42 @@ func (s *Server) StartEventListeners() { s.Environment.Events().Subscribe(environment.StateChangeEvent, state) s.Environment.Events().Subscribe(environment.ResourceEvent, stats) - // TODO: this is leaky I imagine since the routines aren't destroyed when the server is? - go func() { - for { - select { - case data := <-console: - // Immediately emit this event back over the server event stream since it is - // being called from the environment event stream and things probably aren't - // listening to that event. - s.Events().Publish(ConsoleOutputEvent, data.Data) + go func(console chan events.Event) { + for data := range console { + // Immediately emit this event back over the server event stream since it is + // being called from the environment event stream and things probably aren't + // listening to that event. + s.Events().Publish(ConsoleOutputEvent, data.Data) - // Also pass the data along to the console output channel. - s.onConsoleOutput(data.Data) - - case data := <-state: - s.SetState(data.Data) - - case data := <-stats: - st := new(environment.Stats) - if err := json.Unmarshal([]byte(data.Data), st); err != nil { - s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats") - continue - } - - // Update the server resource tracking object with the resources we got here. - s.resources.mu.Lock() - s.resources.Stats = *st - s.resources.mu.Unlock() - - s.Filesystem.HasSpaceAvailable(true) - - s.emitProcUsage() - } + // Also pass the data along to the console output channel. + s.onConsoleOutput(data.Data) } - }() + }(console) + + go func(state chan events.Event) { + for data := range state { + s.SetState(data.Data) + } + }(state) + + go func(stats chan events.Event) { + for data := range stats { + st := new(environment.Stats) + if err := json.Unmarshal([]byte(data.Data), st); err != nil { + s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats") + continue + } + + // Update the server resource tracking object with the resources we got here. + s.resources.mu.Lock() + s.resources.Stats = *st + s.resources.mu.Unlock() + + s.Filesystem.HasSpaceAvailable(true) + + s.emitProcUsage() + } + }(stats) } var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") diff --git a/server/loader.go b/server/loader.go index 45de2ab..cd2bc78 100644 --- a/server/loader.go +++ b/server/loader.go @@ -118,7 +118,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { return nil, err } else { s.Environment = env - s.StartEventListeners() + go s.StartEventListeners() } // Forces the configuration to be synced with the panel.