Minimize blocking in Filesystem.getCachedDiskUsage (#53)

This commit is contained in:
CyberKitsune
2020-08-31 20:27:41 -07:00
committed by GitHub
parent 5f1d9ff151
commit d742acf308
8 changed files with 45 additions and 25 deletions

View File

@@ -41,10 +41,11 @@ func IsPathResolutionError(err error) bool {
}
type Filesystem struct {
mu sync.RWMutex
mu sync.Mutex
lastLookupTime time.Time
diskUsage int64
lastLookupTime time.Time
lookupInProgress int32
diskUsage int64
Server *Server
}
@@ -211,8 +212,10 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) {
//
// Because determining the amount of space being used by a server is a taxing operation we
// will load it all up into a cache and pull from that as long as the key is not expired.
func (fs *Filesystem) HasSpaceAvailable() bool {
size, err := fs.getCachedDiskUsage()
//
// This operation will potentially block unless nonBlocking is true
func (fs *Filesystem) HasSpaceAvailable(nonBlocking bool) bool {
size, err := fs.getCachedDiskUsage(nonBlocking)
if err != nil {
fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
}
@@ -238,20 +241,40 @@ func (fs *Filesystem) HasSpaceAvailable() bool {
// as needed without overly taxing the system. This will prioritize the value from the cache to avoid
// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there
// is no longer a cached value.
func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
// This will potentially block unless nonBlocking is true.
func (fs *Filesystem) getCachedDiskUsage(nonBlocking bool) (int64, error) {
// Check if cache is expired...
if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) {
// We're OK with blocking, so go ahead and block
if !nonBlocking {
return fs.updateCachedDiskUsage()
}
// Otherwise, we're fine with not blocking, but still need to update the cache. (If it isn't being done already)
if atomic.LoadInt32(&fs.lookupInProgress) != 1 {
go fs.updateCachedDiskUsage()
}
}
// Go ahead and return the cached value
return atomic.LoadInt64(&fs.diskUsage), nil
}
func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
// Obtain an exclusive lock on this process so that we don't unintentionally run it at the same
// time as another running process. Once the lock is available it'll read from the cache for the
// second call rather than hitting the disk in parallel.
//
// This effectively the same speed as running this call in parallel since this cache will return
// instantly on the second call.
fs.mu.Lock()
defer fs.mu.Unlock()
// Expire the cache after 2.5 minutes.
if fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) {
return fs.diskUsage, nil
}
// Always clear the in progress flag
defer atomic.StoreInt32(&fs.lookupInProgress, 0)
// Signal that we're currently updating the disk size, to prevent other routines to block on this.
atomic.StoreInt32(&fs.lookupInProgress, 1)
// If there is no size its either because there is no data (in which case running this function
// will have effectively no impact), or there is nothing in the cache, in which case we need to

View File

@@ -38,7 +38,7 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
wg.Add(1)
defer wg.Done()
dirSize, cErr = fs.getCachedDiskUsage()
dirSize, cErr = fs.getCachedDiskUsage(true)
}()
var size int64

View File

@@ -46,10 +46,7 @@ func (s *Server) StartEventListeners() {
s.resources.Stats = *st
s.resources.mu.Unlock()
// TODO: we'll need to handle this better since calling it in rapid succession will
// cause it to block until the first call is done calculating disk usage, which will
// case stat events to pile up for the server.
s.Filesystem.HasSpaceAvailable()
s.Filesystem.HasSpaceAvailable(true)
s.emitProcUsage()
}

View File

@@ -111,7 +111,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables())
meta := docker.Metadata{
Image: s.Config().Container.Image,
Image: s.Config().Container.Image,
}
if env, err := docker.New(s.Id(), &meta, envCfg); err != nil {
@@ -128,7 +128,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
// If the server's data directory exists, force disk usage calculation.
if _, err := os.Stat(s.Filesystem.Path()); err == nil {
go s.Filesystem.HasSpaceAvailable()
s.Filesystem.HasSpaceAvailable(true)
}
return s, nil

View File

@@ -142,7 +142,7 @@ func (s *Server) onBeforeStart() error {
s.SyncWithEnvironment()
s.PublishConsoleOutputFromDaemon("Checking server disk space usage, this could take a few seconds...")
if !s.Filesystem.HasSpaceAvailable() {
if !s.Filesystem.HasSpaceAvailable(false) {
return errors.New("cannot start server, not enough disk space available")
}