Additional code cleanup for #53

This commit is contained in:
Dane Everitt 2020-08-31 20:45:51 -07:00
parent d742acf308
commit cbf914e7a1
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
2 changed files with 38 additions and 48 deletions

View File

@ -45,7 +45,7 @@ type Filesystem struct {
lastLookupTime time.Time lastLookupTime time.Time
lookupInProgress int32 lookupInProgress int32
diskUsage int64 disk int64
Server *Server Server *Server
} }
@ -207,15 +207,20 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) {
return cleaned, g.Wait() return cleaned, g.Wait()
} }
type SpaceCheckingOpts struct {
AllowStaleResponse bool
}
// Determines if the directory a file is trying to be added to has enough space available // Determines if the directory a file is trying to be added to has enough space available
// for the file to be written to. // for the file to be written to.
// //
// Because determining the amount of space being used by a server is a taxing operation we // Because determining the amount of space being used by a server is a taxing operation we
// will load it all up into a cache and pull from that as long as the key is not expired. // will load it all up into a cache and pull from that as long as the key is not expired.
// //
// This operation will potentially block unless nonBlocking is true // This operation will potentially block unless allowStaleValue is set to true. See the
func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool { // documentation on DiskUsage for how this affects the call.
size, err := fs.getCachedDiskUsage(nonBlocking) func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
size, err := fs.DiskUsage(allowStaleValue)
if err != nil { if err != nil {
fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size") fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
} }
@ -241,36 +246,40 @@ func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool {
// as needed without overly taxing the system. This will prioritize the value from the cache to avoid // as needed without overly taxing the system. This will prioritize the value from the cache to avoid
// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there // excessive IO usage. We will only walk the filesystem and determine the size of the directory if there
// is no longer a cached value. // is no longer a cached value.
// This will potentially block unless nonBlocking is true. //
func (fs *Filesystem) getCachedDiskUsage(nonBlocking bool) (int64, error) { // If "allowStaleValue" is set to true, a stale value MAY be returned to the caller if there is an
// expired cache value AND there is currently another lookup in progress. If there is no cached value but
// no other lookup is in progress, a fresh disk space response will be returned to the caller.
//
// This is primarily to avoid a bunch of I/O operations from piling up on the server, especially on servers
// with a large amount of files.
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
// Check if cache is expired... // Check if cache is expired...
if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) { if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) {
// We're OK with blocking, so go ahead and block // If we are now allowing a stale response, or there is no lookup currently in progress, go ahead
if !nonBlocking { // and perform the lookup and return the fresh value. This is a blocking operation to the calling
// process.
if !allowStaleValue || atomic.LoadInt32(&fs.lookupInProgress) == 0 {
return fs.updateCachedDiskUsage() return fs.updateCachedDiskUsage()
} }
// Otherwise, we're fine with not blocking, but still need to update the cache. (If it isn't being done already) // Otherwise, just go ahead and perform the cached disk usage update.
if atomic.LoadInt32(&fs.lookupInProgress) != 1 {
go fs.updateCachedDiskUsage() go fs.updateCachedDiskUsage()
} }
} // Return the currently cached value back to the calling function.
return atomic.LoadInt64(&fs.disk), nil
// Go ahead and return the cached value
return atomic.LoadInt64(&fs.diskUsage), nil
} }
// Updates the currently used disk space for a server.
func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
// Obtain an exclusive lock on this process so that we don't unintentionally run it at the same // Obtain an exclusive lock on this process so that we don't unintentionally run it at the same
// time as another running process. Once the lock is available it'll read from the cache for the // time as another running process. Once the lock is available it'll read from the cache for the
// second call rather than hitting the disk in parallel. // second call rather than hitting the disk in parallel.
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
// Always clear the in progress flag // Always clear the in progress flag when this process finishes.
defer atomic.StoreInt32(&fs.lookupInProgress, 0) defer atomic.StoreInt32(&fs.lookupInProgress, 0)
// Signal that we're currently updating the disk size, to prevent other routines to block on this. // Signal that we're currently updating the disk size, to prevent other routines to block on this.
@ -286,7 +295,7 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
// so that we don't cause an endless loop of determining the disk size if there is a temporary // so that we don't cause an endless loop of determining the disk size if there is a temporary
// error encountered. // error encountered.
fs.lastLookupTime = time.Now() fs.lastLookupTime = time.Now()
atomic.StoreInt64(&fs.diskUsage, size) atomic.StoreInt64(&fs.disk, size)
return size, err return size, err
} }
@ -391,7 +400,7 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
sz, err := io.CopyBuffer(file, r, buf) sz, err := io.CopyBuffer(file, r, buf)
// Adjust the disk usage to account for the old size and the new size of the file. // Adjust the disk usage to account for the old size and the new size of the file.
atomic.AddInt64(&fs.diskUsage, sz-currentSize) atomic.AddInt64(&fs.disk, sz-currentSize)
// Finally, chown the file to ensure the permissions don't end up out-of-whack // Finally, chown the file to ensure the permissions don't end up out-of-whack
// if we had just created it. // if we had just created it.
@ -658,11 +667,11 @@ func (fs *Filesystem) Delete(p string) error {
} }
} else { } else {
if !st.IsDir() { if !st.IsDir() {
atomic.SwapInt64(&fs.diskUsage, -st.Size()) atomic.SwapInt64(&fs.disk, -st.Size())
} else { } else {
go func(st os.FileInfo, resolved string) { go func(st os.FileInfo, resolved string) {
if s, err := fs.DirectorySize(resolved); err == nil { if s, err := fs.DirectorySize(resolved); err == nil {
atomic.AddInt64(&fs.diskUsage, -s) atomic.AddInt64(&fs.disk, -s)
} }
}(st, resolved) }(st, resolved)
} }

View File

@ -10,7 +10,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"sync"
"sync/atomic" "sync/atomic"
) )
@ -28,37 +27,19 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
return false, err return false, err
} }
wg := new(sync.WaitGroup)
var dirSize int64
var cErr error
// Get the cached size in a parallel process so that if it is not cached we are not // Get the cached size in a parallel process so that if it is not cached we are not
// waiting an unnecessary amount of time on this call. // waiting an unnecessary amount of time on this call.
go func() { dirSize, err := fs.DiskUsage(false)
wg.Add(1)
defer wg.Done()
dirSize, cErr = fs.getCachedDiskUsage(true)
}()
var size int64 var size int64
// In a seperate thread, walk over the archive and figure out just how large the final // Walk over the archive and figure out just how large the final output would be from unarchiving it.
// output would be from dearchiving it.
go func() {
wg.Add(1)
defer wg.Done()
// Walk all of the files and calculate the total decompressed size of this archive.
archiver.Walk(source, func(f archiver.File) error { archiver.Walk(source, func(f archiver.File) error {
atomic.AddInt64(&size, f.Size()) atomic.AddInt64(&size, f.Size())
return nil return nil
}) })
}()
wg.Wait() return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), errors.WithStack(err)
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), cErr
} }
// Decompress a file in a given directory by using the archiver tool to infer the file // Decompress a file in a given directory by using the archiver tool to infer the file