From d742acf308058ee601952e0321b2d3fb4d6e38ca Mon Sep 17 00:00:00 2001 From: CyberKitsune <920550+cyberkitsune@users.noreply.github.com> Date: Mon, 31 Aug 2020 20:27:41 -0700 Subject: [PATCH] Minimize blocking in Filesystem.getCachedDiskUsage (#53) --- router/router_server_files.go | 4 +-- router/websocket/websocket.go | 2 +- server/filesystem.go | 49 +++++++++++++++++++++++++--------- server/filesystem_unarchive.go | 2 +- server/listeners.go | 5 +--- server/loader.go | 4 +-- server/power.go | 2 +- sftp/sftp.go | 2 +- 8 files changed, 45 insertions(+), 25 deletions(-) diff --git a/router/router_server_files.go b/router/router_server_files.go index 2ae8477..a8ab478 100644 --- a/router/router_server_files.go +++ b/router/router_server_files.go @@ -287,7 +287,7 @@ func postServerCompressFiles(c *gin.Context) { return } - if !s.Filesystem.HasSpaceAvailable() { + if !s.Filesystem.HasSpaceAvailable(true) { c.AbortWithStatusJSON(http.StatusConflict, gin.H{ "error": "This server does not have enough available disk space to generate a compressed archive.", }) @@ -361,7 +361,7 @@ func postServerUploadFiles(c *gin.Context) { return } - if !s.Filesystem.HasSpaceAvailable() { + if !s.Filesystem.HasSpaceAvailable(true) { c.AbortWithStatusJSON(http.StatusConflict, gin.H{ "error": "This server does not have enough available disk space to accept any file uploads.", }) diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index 3ed2fa0..e45aa14 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -261,7 +261,7 @@ func (h *Handler) HandleInbound(m Message) error { // Only send the current disk usage if the server is offline, if docker container is running, // Environment#EnableResourcePolling() will send this data to all clients. if state == environment.ProcessOfflineState { - _ = h.server.Filesystem.HasSpaceAvailable() + _ = h.server.Filesystem.HasSpaceAvailable(false) b, _ := json.Marshal(h.server.Proc()) h.SendJson(&Message{ diff --git a/server/filesystem.go b/server/filesystem.go index 911b702..2946647 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -41,10 +41,11 @@ func IsPathResolutionError(err error) bool { } type Filesystem struct { - mu sync.RWMutex + mu sync.Mutex - lastLookupTime time.Time - diskUsage int64 + lastLookupTime time.Time + lookupInProgress int32 + diskUsage int64 Server *Server } @@ -211,8 +212,10 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) { // // Because determining the amount of space being used by a server is a taxing operation we // will load it all up into a cache and pull from that as long as the key is not expired. -func (fs *Filesystem) HasSpaceAvailable() bool { - size, err := fs.getCachedDiskUsage() +// +// This operation will potentially block unless nonBlocking is true +func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool { + size, err := fs.getCachedDiskUsage(nonBlocking) if err != nil { fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size") } @@ -238,20 +241,40 @@ func (fs *Filesystem) HasSpaceAvailable() bool { // as needed without overly taxing the system. This will prioritize the value from the cache to avoid // excessive IO usage. We will only walk the filesystem and determine the size of the directory if there // is no longer a cached value. -func (fs *Filesystem) getCachedDiskUsage() (int64, error) { +// This will potentially block unless nonBlocking is true. +func (fs *Filesystem) getCachedDiskUsage(nonBlocking bool) (int64, error) { + + // Check if cache is expired... + if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) { + // We're OK with blocking, so go ahead and block + if !nonBlocking { + return fs.updateCachedDiskUsage() + } + + // Otherwise, we're fine with not blocking, but still need to update the cache. (If it isn't being done already) + if atomic.LoadInt32(&fs.lookupInProgress) != 1 { + go fs.updateCachedDiskUsage() + } + + } + + // Go ahead and return the cached value + return atomic.LoadInt64(&fs.diskUsage), nil +} + +func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { + // Obtain an exclusive lock on this process so that we don't unintentionally run it at the same // time as another running process. Once the lock is available it'll read from the cache for the // second call rather than hitting the disk in parallel. - // - // This effectively the same speed as running this call in parallel since this cache will return - // instantly on the second call. fs.mu.Lock() defer fs.mu.Unlock() - // Expire the cache after 2.5 minutes. - if fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) { - return fs.diskUsage, nil - } + // Always clear the in progress flag + defer atomic.StoreInt32(&fs.lookupInProgress, 0) + + // Signal that we're currently updating the disk size, to prevent other routines to block on this. + atomic.StoreInt32(&fs.lookupInProgress, 1) // If there is no size its either because there is no data (in which case running this function // will have effectively no impact), or there is nothing in the cache, in which case we need to diff --git a/server/filesystem_unarchive.go b/server/filesystem_unarchive.go index c5cd06f..ef55bf4 100644 --- a/server/filesystem_unarchive.go +++ b/server/filesystem_unarchive.go @@ -38,7 +38,7 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b wg.Add(1) defer wg.Done() - dirSize, cErr = fs.getCachedDiskUsage() + dirSize, cErr = fs.getCachedDiskUsage(true) }() var size int64 diff --git a/server/listeners.go b/server/listeners.go index 675af39..a6bf84b 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -46,10 +46,7 @@ func (s *Server) StartEventListeners() { s.resources.Stats = *st s.resources.mu.Unlock() - // TODO: we'll need to handle this better since calling it in rapid succession will - // cause it to block until the first call is done calculating disk usage, which will - // case stat events to pile up for the server. - s.Filesystem.HasSpaceAvailable() + s.Filesystem.HasSpaceAvailable(true) s.emitProcUsage() } diff --git a/server/loader.go b/server/loader.go index 8250655..45de2ab 100644 --- a/server/loader.go +++ b/server/loader.go @@ -111,7 +111,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables()) meta := docker.Metadata{ - Image: s.Config().Container.Image, + Image: s.Config().Container.Image, } if env, err := docker.New(s.Id(), &meta, envCfg); err != nil { @@ -128,7 +128,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { // If the server's data directory exists, force disk usage calculation. if _, err := os.Stat(s.Filesystem.Path()); err == nil { - go s.Filesystem.HasSpaceAvailable() + s.Filesystem.HasSpaceAvailable(true) } return s, nil diff --git a/server/power.go b/server/power.go index bb4210a..36e802c 100644 --- a/server/power.go +++ b/server/power.go @@ -142,7 +142,7 @@ func (s *Server) onBeforeStart() error { s.SyncWithEnvironment() s.PublishConsoleOutputFromDaemon("Checking server disk space usage, this could take a few seconds...") - if !s.Filesystem.HasSpaceAvailable() { + if !s.Filesystem.HasSpaceAvailable(false) { return errors.New("cannot start server, not enough disk space available") } diff --git a/sftp/sftp.go b/sftp/sftp.go index 9833dd9..6a77f13 100644 --- a/sftp/sftp.go +++ b/sftp/sftp.go @@ -63,7 +63,7 @@ func validateDiskSpace(fs FileSystem) bool { return false } - return s.Filesystem.HasSpaceAvailable() + return s.Filesystem.HasSpaceAvailable(true) } // Validates a set of credentials for a SFTP login aganist Pterodactyl Panel and returns