From f3c8220bd9bad82c3a9626406f4ea1bcf557ed12 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Thu, 16 Jul 2020 21:51:31 -0700 Subject: [PATCH] Change filewalker implementation to use a pool --- go.mod | 1 + go.sum | 4 ++ router/error.go | 2 +- server/filesystem.go | 65 ++++++++------------- server/filesystem_walker.go | 111 +++++++++++++++++++++++------------- 5 files changed, 102 insertions(+), 81 deletions(-) diff --git a/go.mod b/go.mod index 8f61d89..0de71c2 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/docker/go-units v0.3.3 // indirect github.com/fatih/color v1.9.0 github.com/gabriel-vasile/mimetype v0.1.4 + github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 github.com/gin-gonic/gin v1.6.3 github.com/golang/protobuf v1.3.5 // indirect diff --git a/go.sum b/go.sum index 889a55f..2ac6433 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,10 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v0.1.4 h1:5mcsq3+DXypREUkW+1juhjeKmE/XnWgs+paHMJn7lf8= github.com/gabriel-vasile/mimetype v0.1.4/go.mod h1:kMJbg3SlWZCsj4R73F1WDzbT9AyGCOVmUtIxxwO5pmI= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 h1:oSQ61LxZkz3Z4La0O5cbyVDvLWEfbNgiD43cSPdjPQQ= +github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 h1:7KeiSrO5puFH1+vdAdbpiie2TrNnkvFc/eOQzT60Z2k= github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0/go.mod h1:D1+3UtCYAJ1os1PI+zhTVEj6Tb+IHJvXjXKz83OstmM= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/router/error.go b/router/error.go index 5755b7f..0ad58b8 100644 --- a/router/error.go +++ b/router/error.go @@ -33,7 +33,7 @@ func TrackedError(err error) *RequestError { // generated this server for the purposes of logging. func TrackedServerError(err error, s *server.Server) *RequestError { return &RequestError{ - Err: err, + Err: errors.WithStack(err), Uuid: uuid.Must(uuid.NewRandom()).String(), Message: "", server: s, diff --git a/server/filesystem.go b/server/filesystem.go index 63d7986..0e8c8b0 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -29,9 +29,9 @@ import ( var InvalidPathResolution = errors.New("invalid path resolution") type Filesystem struct { - Server *Server + Server *Server Configuration *config.SystemConfiguration - cacheDiskMu sync.Mutex + cacheDiskMu sync.Mutex } // Returns the root path that contains all of a server's data. @@ -222,17 +222,13 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) { // through all of the folders. Returns the size in bytes. This can be a fairly taxing operation // on locations with tons of files, so it is recommended that you cache the output. func (fs *Filesystem) DirectorySize(dir string) (int64, error) { - w := fs.NewWalker() - ctx := context.Background() - var size int64 - err := w.Walk(dir, ctx, func(f os.FileInfo, _ string) bool { - // Only increment the size when we're dealing with a file specifically, otherwise - // just continue digging deeper until there are no more directories to iterate over. + err := fs.Walk(dir, func(_ string, f os.FileInfo, err error) error { if !f.IsDir() { atomic.AddInt64(&size, f.Size()) } - return true + + return nil }) return size, err @@ -657,9 +653,6 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In return nil, err } - w := fs.NewWalker() - ctx := context.Background() - i, err := ignore.CompileIgnoreLines(ignored...) if err != nil { return nil, err @@ -668,7 +661,8 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In // Walk through all of the files and directories on a server. This callback only returns // files found, and will keep walking deeper and deeper into directories. inc := new(backup.IncludedFiles) - if err := w.Walk(cleaned, ctx, func(f os.FileInfo, p string) bool { + + if err := fs.Walk(cleaned, func(p string, f os.FileInfo, err error) error { // Avoid unnecessary parsing if there are no ignored files, nothing will match anyways // so no reason to call the function. if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) { @@ -678,7 +672,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In // We can't just abort if the path is technically ignored. It is possible there is a nested // file or folder that should not be excluded, so in this case we need to just keep going // until we get to a final state. - return true + return nil }); err != nil { return nil, err } @@ -709,43 +703,34 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er return nil, err } - w := fs.NewWalker() - wg := new(sync.WaitGroup) - inc := new(backup.IncludedFiles) // Iterate over all of the cleaned paths and merge them into a large object of final file // paths to pass into the archiver. As directories are encountered this will drop into them // and look for all of the files. for _, p := range cleaned { - wg.Add(1) + f, err := os.Stat(p) + if err != nil { + fs.Server.Log().WithField("error", err).WithField("path", p).Debug("failed to stat file or directory for compression") + continue + } - go func(pa string) { - defer wg.Done() + if f.IsDir() { + err := fs.Walk(p, func(s string, info os.FileInfo, err error) error { + if !info.IsDir() { + inc.Push(&info, s) + } + + return nil + }) - f, err := os.Stat(pa) if err != nil { - fs.Server.Log().WithField("error", err).WithField("path", pa).Warn("failed to stat file or directory for compression") - return + return nil, err } - - if f.IsDir() { - // Recursively drop into directory and get all of the additional files and directories within - // it that should be included in this backup. - w.Walk(pa, context.Background(), func(info os.FileInfo, s string) bool { - if !info.IsDir() { - inc.Push(&info, s) - } - - return true - }) - } else { - inc.Push(&f, pa) - } - }(p) + } else { + inc.Push(&f, p) + } } - wg.Wait() - a := &backup.Archive{TrimPrefix: fs.Path(), Files: inc} d := path.Join(cleanedRootDir, fmt.Sprintf("archive-%s.tar.gz", strings.ReplaceAll(time.Now().Format(time.RFC3339), ":", ""))) diff --git a/server/filesystem_walker.go b/server/filesystem_walker.go index 0db761a..bb3bf6b 100644 --- a/server/filesystem_walker.go +++ b/server/filesystem_walker.go @@ -1,70 +1,101 @@ package server import ( - "context" - "golang.org/x/sync/errgroup" + "github.com/gammazero/workerpool" "io/ioutil" "os" "path/filepath" + "runtime" + "sync" ) type FileWalker struct { *Filesystem } +type PooledFileWalker struct { + wg sync.WaitGroup + pool *workerpool.WorkerPool + callback filepath.WalkFunc + + Filesystem *Filesystem +} + // Returns a new walker instance. func (fs *Filesystem) NewWalker() *FileWalker { return &FileWalker{fs} } -// Iterate over all of the files and directories within a given directory. When a file is -// found the callback will be called with the file information. If a directory is encountered -// it will be recursively passed back through to this function. -func (fw *FileWalker) Walk(dir string, ctx context.Context, callback func (os.FileInfo, string) bool) error { - cleaned, err := fw.SafePath(dir) +// Creates a new pooled file walker that will concurrently walk over a given directory but limit itself +// to a worker pool as to not completely flood out the system or cause a process crash. +func newPooledWalker(fs *Filesystem) *PooledFileWalker { + return &PooledFileWalker{ + Filesystem: fs, + // Create a worker pool that is the same size as the number of processors available on the + // system. Going much higher doesn't provide much of a performance boost, and is only more + // likely to lead to resource overloading anyways. + pool: workerpool.New(runtime.GOMAXPROCS(0)), + } +} + +// Process a given path by calling the callback function for all of the files and directories within +// the path, and then dropping into any directories that we come across. +func (w *PooledFileWalker) process(path string) error { + defer w.wg.Done() + + p, err := w.Filesystem.SafePath(path) if err != nil { return err } - // Get all of the files from this directory. - files, err := ioutil.ReadDir(cleaned) + files, err := ioutil.ReadDir(p) if err != nil { return err } - // Create an error group that we can use to run processes in parallel while retaining - // the ability to cancel the entire process immediately should any of it fail. - g, ctx := errgroup.WithContext(ctx) - + // Loop over all of the files and directories in the given directory and call the provided + // callback function. If we encounter a directory, push that directory onto the worker queue + // to be processed. for _, f := range files { - if f.IsDir() { - fi := f - p := filepath.Join(cleaned, f.Name()) - // Recursively call this function to continue digging through the directory tree within - // a seperate goroutine. If the context is canceled abort this process. - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // If the callback returns true, go ahead and keep walking deeper. This allows - // us to programatically continue deeper into directories, or stop digging - // if that pathway knows it needs nothing else. - if callback(fi, p) { - return fw.Walk(p, ctx, callback) - } + sp := filepath.Join(p, f.Name()) + i, err := os.Stat(sp) - return nil - } - }) - } else { - // If this isn't a directory, go ahead and pass the file information into the - // callback. We don't care about the response since we won't be stepping into - // anything from here. - callback(f, filepath.Join(cleaned, f.Name())) + if err = w.callback(sp, i, err); err != nil { + if err == filepath.SkipDir { + return nil + } + + return err + } + + if i.IsDir() { + w.push(sp) } } - // Block until all of the routines finish and have returned a value. - return g.Wait() -} \ No newline at end of file + return nil +} + +// Push a new path into the worker pool. +// +// @todo probably helps to handle errors. +func (w *PooledFileWalker) push(path string) { + w.wg.Add(1) + w.pool.Submit(func() { + w.process(path) + }) +} + +// Walks the given directory and executes the callback function for all of the files and directories +// that are encountered. +func (fs *Filesystem) Walk(dir string, callback filepath.WalkFunc) error { + w := newPooledWalker(fs) + w.callback = callback + + w.push(dir) + + w.wg.Wait() + w.pool.StopWait() + + return nil +}