From cbf914e7a1a1c4ea5d7b9eaf10650b7f199ca10c Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Mon, 31 Aug 2020 20:45:51 -0700 Subject: [PATCH] Additional code cleanup for #53 --- server/filesystem.go | 53 ++++++++++++++++++++-------------- server/filesystem_unarchive.go | 33 +++++---------------- 2 files changed, 38 insertions(+), 48 deletions(-) diff --git a/server/filesystem.go b/server/filesystem.go index 2946647..cc439bf 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -45,7 +45,7 @@ type Filesystem struct { lastLookupTime time.Time lookupInProgress int32 - diskUsage int64 + disk int64 Server *Server } @@ -207,15 +207,20 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) { return cleaned, g.Wait() } +type SpaceCheckingOpts struct { + AllowStaleResponse bool +} + // Determines if the directory a file is trying to be added to has enough space available // for the file to be written to. // // Because determining the amount of space being used by a server is a taxing operation we // will load it all up into a cache and pull from that as long as the key is not expired. // -// This operation will potentially block unless nonBlocking is true -func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool { - size, err := fs.getCachedDiskUsage(nonBlocking) +// This operation will potentially block unless allowStaleValue is set to true. See the +// documentation on DiskUsage for how this affects the call. +func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool { + size, err := fs.DiskUsage(allowStaleValue) if err != nil { fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size") } @@ -241,36 +246,40 @@ func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool { // as needed without overly taxing the system. This will prioritize the value from the cache to avoid // excessive IO usage. We will only walk the filesystem and determine the size of the directory if there // is no longer a cached value. -// This will potentially block unless nonBlocking is true. -func (fs *Filesystem) getCachedDiskUsage(nonBlocking bool) (int64, error) { - +// +// If "allowStaleValue" is set to true, a stale value MAY be returned to the caller if there is an +// expired cache value AND there is currently another lookup in progress. If there is no cached value but +// no other lookup is in progress, a fresh disk space response will be returned to the caller. +// +// This is primarily to avoid a bunch of I/O operations from piling up on the server, especially on servers +// with a large amount of files. +func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) { // Check if cache is expired... if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) { - // We're OK with blocking, so go ahead and block - if !nonBlocking { + // If we are now allowing a stale response, or there is no lookup currently in progress, go ahead + // and perform the lookup and return the fresh value. This is a blocking operation to the calling + // process. + if !allowStaleValue || atomic.LoadInt32(&fs.lookupInProgress) == 0 { return fs.updateCachedDiskUsage() } - // Otherwise, we're fine with not blocking, but still need to update the cache. (If it isn't being done already) - if atomic.LoadInt32(&fs.lookupInProgress) != 1 { - go fs.updateCachedDiskUsage() - } - + // Otherwise, just go ahead and perform the cached disk usage update. + go fs.updateCachedDiskUsage() } - // Go ahead and return the cached value - return atomic.LoadInt64(&fs.diskUsage), nil + // Return the currently cached value back to the calling function. + return atomic.LoadInt64(&fs.disk), nil } +// Updates the currently used disk space for a server. func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { - // Obtain an exclusive lock on this process so that we don't unintentionally run it at the same // time as another running process. Once the lock is available it'll read from the cache for the // second call rather than hitting the disk in parallel. fs.mu.Lock() defer fs.mu.Unlock() - // Always clear the in progress flag + // Always clear the in progress flag when this process finishes. defer atomic.StoreInt32(&fs.lookupInProgress, 0) // Signal that we're currently updating the disk size, to prevent other routines to block on this. @@ -286,7 +295,7 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { // so that we don't cause an endless loop of determining the disk size if there is a temporary // error encountered. fs.lastLookupTime = time.Now() - atomic.StoreInt64(&fs.diskUsage, size) + atomic.StoreInt64(&fs.disk, size) return size, err } @@ -391,7 +400,7 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error { sz, err := io.CopyBuffer(file, r, buf) // Adjust the disk usage to account for the old size and the new size of the file. - atomic.AddInt64(&fs.diskUsage, sz-currentSize) + atomic.AddInt64(&fs.disk, sz-currentSize) // Finally, chown the file to ensure the permissions don't end up out-of-whack // if we had just created it. @@ -658,11 +667,11 @@ func (fs *Filesystem) Delete(p string) error { } } else { if !st.IsDir() { - atomic.SwapInt64(&fs.diskUsage, -st.Size()) + atomic.SwapInt64(&fs.disk, -st.Size()) } else { go func(st os.FileInfo, resolved string) { if s, err := fs.DirectorySize(resolved); err == nil { - atomic.AddInt64(&fs.diskUsage, -s) + atomic.AddInt64(&fs.disk, -s) } }(st, resolved) } diff --git a/server/filesystem_unarchive.go b/server/filesystem_unarchive.go index ef55bf4..70ff01a 100644 --- a/server/filesystem_unarchive.go +++ b/server/filesystem_unarchive.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "reflect" - "sync" "sync/atomic" ) @@ -28,37 +27,19 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b return false, err } - wg := new(sync.WaitGroup) - - var dirSize int64 - var cErr error // Get the cached size in a parallel process so that if it is not cached we are not // waiting an unnecessary amount of time on this call. - go func() { - wg.Add(1) - defer wg.Done() - - dirSize, cErr = fs.getCachedDiskUsage(true) - }() + dirSize, err := fs.DiskUsage(false) var size int64 - // In a seperate thread, walk over the archive and figure out just how large the final - // output would be from dearchiving it. - go func() { - wg.Add(1) - defer wg.Done() + // Walk over the archive and figure out just how large the final output would be from unarchiving it. + archiver.Walk(source, func(f archiver.File) error { + atomic.AddInt64(&size, f.Size()) - // Walk all of the files and calculate the total decompressed size of this archive. - archiver.Walk(source, func(f archiver.File) error { - atomic.AddInt64(&size, f.Size()) + return nil + }) - return nil - }) - }() - - wg.Wait() - - return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), cErr + return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), errors.WithStack(err) } // Decompress a file in a given directory by using the archiver tool to infer the file