Change filewalker implementation to use a pool
This commit is contained in:
		
							parent
							
								
									7e1b7e7f36
								
							
						
					
					
						commit
						f3c8220bd9
					
				
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							| 
						 | 
					@ -31,6 +31,7 @@ require (
 | 
				
			||||||
	github.com/docker/go-units v0.3.3 // indirect
 | 
						github.com/docker/go-units v0.3.3 // indirect
 | 
				
			||||||
	github.com/fatih/color v1.9.0
 | 
						github.com/fatih/color v1.9.0
 | 
				
			||||||
	github.com/gabriel-vasile/mimetype v0.1.4
 | 
						github.com/gabriel-vasile/mimetype v0.1.4
 | 
				
			||||||
 | 
						github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753
 | 
				
			||||||
	github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0
 | 
						github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0
 | 
				
			||||||
	github.com/gin-gonic/gin v1.6.3
 | 
						github.com/gin-gonic/gin v1.6.3
 | 
				
			||||||
	github.com/golang/protobuf v1.3.5 // indirect
 | 
						github.com/golang/protobuf v1.3.5 // indirect
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| 
						 | 
					@ -79,6 +79,10 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
 | 
				
			||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 | 
					github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 | 
				
			||||||
github.com/gabriel-vasile/mimetype v0.1.4 h1:5mcsq3+DXypREUkW+1juhjeKmE/XnWgs+paHMJn7lf8=
 | 
					github.com/gabriel-vasile/mimetype v0.1.4 h1:5mcsq3+DXypREUkW+1juhjeKmE/XnWgs+paHMJn7lf8=
 | 
				
			||||||
github.com/gabriel-vasile/mimetype v0.1.4/go.mod h1:kMJbg3SlWZCsj4R73F1WDzbT9AyGCOVmUtIxxwO5pmI=
 | 
					github.com/gabriel-vasile/mimetype v0.1.4/go.mod h1:kMJbg3SlWZCsj4R73F1WDzbT9AyGCOVmUtIxxwO5pmI=
 | 
				
			||||||
 | 
					github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8=
 | 
				
			||||||
 | 
					github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w=
 | 
				
			||||||
 | 
					github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 h1:oSQ61LxZkz3Z4La0O5cbyVDvLWEfbNgiD43cSPdjPQQ=
 | 
				
			||||||
 | 
					github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs=
 | 
				
			||||||
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 h1:7KeiSrO5puFH1+vdAdbpiie2TrNnkvFc/eOQzT60Z2k=
 | 
					github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 h1:7KeiSrO5puFH1+vdAdbpiie2TrNnkvFc/eOQzT60Z2k=
 | 
				
			||||||
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0/go.mod h1:D1+3UtCYAJ1os1PI+zhTVEj6Tb+IHJvXjXKz83OstmM=
 | 
					github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0/go.mod h1:D1+3UtCYAJ1os1PI+zhTVEj6Tb+IHJvXjXKz83OstmM=
 | 
				
			||||||
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
 | 
					github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,7 +33,7 @@ func TrackedError(err error) *RequestError {
 | 
				
			||||||
// generated this server for the purposes of logging.
 | 
					// generated this server for the purposes of logging.
 | 
				
			||||||
func TrackedServerError(err error, s *server.Server) *RequestError {
 | 
					func TrackedServerError(err error, s *server.Server) *RequestError {
 | 
				
			||||||
	return &RequestError{
 | 
						return &RequestError{
 | 
				
			||||||
		Err:     err,
 | 
							Err:     errors.WithStack(err),
 | 
				
			||||||
		Uuid:    uuid.Must(uuid.NewRandom()).String(),
 | 
							Uuid:    uuid.Must(uuid.NewRandom()).String(),
 | 
				
			||||||
		Message: "",
 | 
							Message: "",
 | 
				
			||||||
		server:  s,
 | 
							server:  s,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -222,17 +222,13 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
 | 
				
			||||||
// through all of the folders. Returns the size in bytes. This can be a fairly taxing operation
 | 
					// through all of the folders. Returns the size in bytes. This can be a fairly taxing operation
 | 
				
			||||||
// on locations with tons of files, so it is recommended that you cache the output.
 | 
					// on locations with tons of files, so it is recommended that you cache the output.
 | 
				
			||||||
func (fs *Filesystem) DirectorySize(dir string) (int64, error) {
 | 
					func (fs *Filesystem) DirectorySize(dir string) (int64, error) {
 | 
				
			||||||
	w := fs.NewWalker()
 | 
					 | 
				
			||||||
	ctx := context.Background()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var size int64
 | 
						var size int64
 | 
				
			||||||
	err := w.Walk(dir, ctx, func(f os.FileInfo, _ string) bool {
 | 
						err := fs.Walk(dir, func(_ string, f os.FileInfo, err error) error {
 | 
				
			||||||
		// Only increment the size when we're dealing with a file specifically, otherwise
 | 
					 | 
				
			||||||
		// just continue digging deeper until there are no more directories to iterate over.
 | 
					 | 
				
			||||||
		if !f.IsDir() {
 | 
							if !f.IsDir() {
 | 
				
			||||||
			atomic.AddInt64(&size, f.Size())
 | 
								atomic.AddInt64(&size, f.Size())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return true
 | 
					
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return size, err
 | 
						return size, err
 | 
				
			||||||
| 
						 | 
					@ -657,9 +653,6 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	w := fs.NewWalker()
 | 
					 | 
				
			||||||
	ctx := context.Background()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	i, err := ignore.CompileIgnoreLines(ignored...)
 | 
						i, err := ignore.CompileIgnoreLines(ignored...)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
| 
						 | 
					@ -668,7 +661,8 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
 | 
				
			||||||
	// Walk through all of the files and directories on a server. This callback only returns
 | 
						// Walk through all of the files and directories on a server. This callback only returns
 | 
				
			||||||
	// files found, and will keep walking deeper and deeper into directories.
 | 
						// files found, and will keep walking deeper and deeper into directories.
 | 
				
			||||||
	inc := new(backup.IncludedFiles)
 | 
						inc := new(backup.IncludedFiles)
 | 
				
			||||||
	if err := w.Walk(cleaned, ctx, func(f os.FileInfo, p string) bool {
 | 
					
 | 
				
			||||||
 | 
						if err := fs.Walk(cleaned, func(p string, f os.FileInfo, err error) error {
 | 
				
			||||||
		// Avoid unnecessary parsing if there are no ignored files, nothing will match anyways
 | 
							// Avoid unnecessary parsing if there are no ignored files, nothing will match anyways
 | 
				
			||||||
		// so no reason to call the function.
 | 
							// so no reason to call the function.
 | 
				
			||||||
		if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) {
 | 
							if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) {
 | 
				
			||||||
| 
						 | 
					@ -678,7 +672,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
 | 
				
			||||||
		// We can't just abort if the path is technically ignored. It is possible there is a nested
 | 
							// We can't just abort if the path is technically ignored. It is possible there is a nested
 | 
				
			||||||
		// file or folder that should not be excluded, so in this case we need to just keep going
 | 
							// file or folder that should not be excluded, so in this case we need to just keep going
 | 
				
			||||||
		// until we get to a final state.
 | 
							// until we get to a final state.
 | 
				
			||||||
		return true
 | 
							return nil
 | 
				
			||||||
	}); err != nil {
 | 
						}); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -709,42 +703,33 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	w := fs.NewWalker()
 | 
					 | 
				
			||||||
	wg := new(sync.WaitGroup)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	inc := new(backup.IncludedFiles)
 | 
						inc := new(backup.IncludedFiles)
 | 
				
			||||||
	// Iterate over all of the cleaned paths and merge them into a large object of final file
 | 
						// Iterate over all of the cleaned paths and merge them into a large object of final file
 | 
				
			||||||
	// paths to pass into the archiver. As directories are encountered this will drop into them
 | 
						// paths to pass into the archiver. As directories are encountered this will drop into them
 | 
				
			||||||
	// and look for all of the files.
 | 
						// and look for all of the files.
 | 
				
			||||||
	for _, p := range cleaned {
 | 
						for _, p := range cleaned {
 | 
				
			||||||
		wg.Add(1)
 | 
							f, err := os.Stat(p)
 | 
				
			||||||
 | 
					 | 
				
			||||||
		go func(pa string) {
 | 
					 | 
				
			||||||
			defer wg.Done()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			f, err := os.Stat(pa)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
				fs.Server.Log().WithField("error", err).WithField("path", pa).Warn("failed to stat file or directory for compression")
 | 
								fs.Server.Log().WithField("error", err).WithField("path", p).Debug("failed to stat file or directory for compression")
 | 
				
			||||||
				return
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if f.IsDir() {
 | 
							if f.IsDir() {
 | 
				
			||||||
				// Recursively drop into directory and get all of the additional files and directories within
 | 
								err := fs.Walk(p, func(s string, info os.FileInfo, err error) error {
 | 
				
			||||||
				// it that should be included in this backup.
 | 
					 | 
				
			||||||
				w.Walk(pa, context.Background(), func(info os.FileInfo, s string) bool {
 | 
					 | 
				
			||||||
				if !info.IsDir() {
 | 
									if !info.IsDir() {
 | 
				
			||||||
					inc.Push(&info, s)
 | 
										inc.Push(&info, s)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					return true
 | 
									return nil
 | 
				
			||||||
			})
 | 
								})
 | 
				
			||||||
			} else {
 | 
					 | 
				
			||||||
				inc.Push(&f, pa)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}(p)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	wg.Wait()
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								inc.Push(&f, p)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	a := &backup.Archive{TrimPrefix: fs.Path(), Files: inc}
 | 
						a := &backup.Archive{TrimPrefix: fs.Path(), Files: inc}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,70 +1,101 @@
 | 
				
			||||||
package server
 | 
					package server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"github.com/gammazero/workerpool"
 | 
				
			||||||
	"golang.org/x/sync/errgroup"
 | 
					 | 
				
			||||||
	"io/ioutil"
 | 
						"io/ioutil"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
 | 
						"runtime"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type FileWalker struct {
 | 
					type FileWalker struct {
 | 
				
			||||||
	*Filesystem
 | 
						*Filesystem
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type PooledFileWalker struct {
 | 
				
			||||||
 | 
						wg       sync.WaitGroup
 | 
				
			||||||
 | 
						pool     *workerpool.WorkerPool
 | 
				
			||||||
 | 
						callback filepath.WalkFunc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						Filesystem *Filesystem
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Returns a new walker instance.
 | 
					// Returns a new walker instance.
 | 
				
			||||||
func (fs *Filesystem) NewWalker() *FileWalker {
 | 
					func (fs *Filesystem) NewWalker() *FileWalker {
 | 
				
			||||||
	return &FileWalker{fs}
 | 
						return &FileWalker{fs}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Iterate over all of the files and directories within a given directory. When a file is
 | 
					// Creates a new pooled file walker that will concurrently walk over a given directory but limit itself
 | 
				
			||||||
// found the callback will be called with the file information. If a directory is encountered
 | 
					// to a worker pool as to not completely flood out the system or cause a process crash.
 | 
				
			||||||
// it will be recursively passed back through to this function.
 | 
					func newPooledWalker(fs *Filesystem) *PooledFileWalker {
 | 
				
			||||||
func (fw *FileWalker) Walk(dir string, ctx context.Context, callback func (os.FileInfo, string) bool) error {
 | 
						return &PooledFileWalker{
 | 
				
			||||||
	cleaned, err := fw.SafePath(dir)
 | 
							Filesystem: fs,
 | 
				
			||||||
 | 
							// Create a worker pool that is the same size as the number of processors available on the
 | 
				
			||||||
 | 
							// system. Going much higher doesn't provide much of a performance boost, and is only more
 | 
				
			||||||
 | 
							// likely to lead to resource overloading anyways.
 | 
				
			||||||
 | 
							pool: workerpool.New(runtime.GOMAXPROCS(0)),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Process a given path by calling the callback function for all of the files and directories within
 | 
				
			||||||
 | 
					// the path, and then dropping into any directories that we come across.
 | 
				
			||||||
 | 
					func (w *PooledFileWalker) process(path string) error {
 | 
				
			||||||
 | 
						defer w.wg.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						p, err := w.Filesystem.SafePath(path)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Get all of the files from this directory.
 | 
						files, err := ioutil.ReadDir(p)
 | 
				
			||||||
	files, err := ioutil.ReadDir(cleaned)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create an error group that we can use to run processes in parallel while retaining
 | 
						// Loop over all of the files and directories in the given directory and call the provided
 | 
				
			||||||
	// the ability to cancel the entire process immediately should any of it fail.
 | 
						// callback function. If we encounter a directory, push that directory onto the worker queue
 | 
				
			||||||
	g, ctx := errgroup.WithContext(ctx)
 | 
						// to be processed.
 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, f := range files {
 | 
						for _, f := range files {
 | 
				
			||||||
		if f.IsDir() {
 | 
							sp := filepath.Join(p, f.Name())
 | 
				
			||||||
			fi := f
 | 
							i, err := os.Stat(sp)
 | 
				
			||||||
			p := filepath.Join(cleaned, f.Name())
 | 
					
 | 
				
			||||||
			// Recursively call this function to continue digging through the directory tree within
 | 
							if err = w.callback(sp, i, err); err != nil {
 | 
				
			||||||
			// a seperate goroutine. If the context is canceled abort this process.
 | 
								if err == filepath.SkipDir {
 | 
				
			||||||
			g.Go(func() error {
 | 
									return nil
 | 
				
			||||||
				select {
 | 
								}
 | 
				
			||||||
				case <-ctx.Done():
 | 
					
 | 
				
			||||||
					return ctx.Err()
 | 
								return err
 | 
				
			||||||
				default:
 | 
							}
 | 
				
			||||||
					// If the callback returns true, go ahead and keep walking deeper. This allows
 | 
					
 | 
				
			||||||
					// us to programatically continue deeper into directories, or stop digging
 | 
							if i.IsDir() {
 | 
				
			||||||
					// if that pathway knows it needs nothing else.
 | 
								w.push(sp)
 | 
				
			||||||
					if callback(fi, p) {
 | 
							}
 | 
				
			||||||
						return fw.Walk(p, ctx, callback)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Push a new path into the worker pool.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// @todo probably helps to handle errors.
 | 
				
			||||||
 | 
					func (w *PooledFileWalker) push(path string) {
 | 
				
			||||||
 | 
						w.wg.Add(1)
 | 
				
			||||||
 | 
						w.pool.Submit(func() {
 | 
				
			||||||
 | 
							w.process(path)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			// If this isn't a directory, go ahead and pass the file information into the
 | 
					 | 
				
			||||||
			// callback. We don't care about the response since we won't be stepping into
 | 
					 | 
				
			||||||
			// anything from here.
 | 
					 | 
				
			||||||
			callback(f, filepath.Join(cleaned, f.Name()))
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Block until all of the routines finish and have returned a value.
 | 
					// Walks the given directory and executes the callback function for all of the files and directories
 | 
				
			||||||
	return g.Wait()
 | 
					// that are encountered.
 | 
				
			||||||
 | 
					func (fs *Filesystem) Walk(dir string, callback filepath.WalkFunc) error {
 | 
				
			||||||
 | 
						w := newPooledWalker(fs)
 | 
				
			||||||
 | 
						w.callback = callback
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w.push(dir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w.wg.Wait()
 | 
				
			||||||
 | 
						w.pool.StopWait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user