Significant CPU and syscall performance improvements when iterating large directories
This commit is contained in:
parent
f82c91afbe
commit
a1288565f0
1
go.mod
1
go.mod
|
@ -47,6 +47,7 @@ require (
|
||||||
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
|
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
|
||||||
github.com/icza/dyno v0.0.0-20200205103839-49cb13720835
|
github.com/icza/dyno v0.0.0-20200205103839-49cb13720835
|
||||||
github.com/imdario/mergo v0.3.8
|
github.com/imdario/mergo v0.3.8
|
||||||
|
github.com/karrick/godirwalk v1.16.1
|
||||||
github.com/klauspost/compress v1.10.10 // indirect
|
github.com/klauspost/compress v1.10.10 // indirect
|
||||||
github.com/klauspost/pgzip v1.2.4
|
github.com/klauspost/pgzip v1.2.4
|
||||||
github.com/magefile/mage v1.10.0 // indirect
|
github.com/magefile/mage v1.10.0 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -317,6 +317,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
|
||||||
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
|
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
|
||||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||||
|
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
|
||||||
|
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||||
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
gzip "github.com/klauspost/pgzip"
|
gzip "github.com/klauspost/pgzip"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/remeh/sizedwaitgroup"
|
"github.com/remeh/sizedwaitgroup"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"io"
|
"io"
|
||||||
|
@ -49,14 +50,8 @@ func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) {
|
||||||
// Iterate over all of the files to be included and put them into the archive. This is
|
// Iterate over all of the files to be included and put them into the archive. This is
|
||||||
// done as a concurrent goroutine to speed things along. If an error is encountered at
|
// done as a concurrent goroutine to speed things along. If an error is encountered at
|
||||||
// any step, the entire process is aborted.
|
// any step, the entire process is aborted.
|
||||||
for p, s := range a.Files.All() {
|
for _, p := range a.Files.All() {
|
||||||
if (*s).IsDir() {
|
p := p
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
pa := p
|
|
||||||
st := s
|
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
wg.Add()
|
wg.Add()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@ -65,7 +60,7 @@ func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
return a.addToArchive(pa, st, tw)
|
return a.addToArchive(p, tw)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -92,21 +87,25 @@ func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a single file to the existing tar archive writer.
|
// Adds a single file to the existing tar archive writer.
|
||||||
func (a *Archive) addToArchive(p string, s *os.FileInfo, w *tar.Writer) error {
|
func (a *Archive) addToArchive(p string, w *tar.Writer) error {
|
||||||
f, err := os.Open(p)
|
f, err := os.Open(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
st := *s
|
s, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
header := &tar.Header{
|
header := &tar.Header{
|
||||||
// Trim the long server path from the name of the file so that the resulting
|
// Trim the long server path from the name of the file so that the resulting
|
||||||
// archive is exactly how the user would see it in the panel file manager.
|
// archive is exactly how the user would see it in the panel file manager.
|
||||||
Name: strings.TrimPrefix(p, a.TrimPrefix),
|
Name: strings.TrimPrefix(p, a.TrimPrefix),
|
||||||
Size: st.Size(),
|
Size: s.Size(),
|
||||||
Mode: int64(st.Mode()),
|
Mode: int64(s.Mode()),
|
||||||
ModTime: st.ModTime(),
|
ModTime: s.ModTime(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// These actions must occur sequentially, even if this function is called multiple
|
// These actions must occur sequentially, even if this function is called multiple
|
||||||
|
|
|
@ -1,29 +1,23 @@
|
||||||
package backup
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type IncludedFiles struct {
|
type IncludedFiles struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
files map[string]*os.FileInfo
|
files []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pushes an additional file or folder onto the struct.
|
// Pushes an additional file or folder onto the struct.
|
||||||
func (i *IncludedFiles) Push(info *os.FileInfo, p string) {
|
func (i *IncludedFiles) Push(p string) {
|
||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
i.files = append(i.files, p) // ~~
|
||||||
|
i.Unlock()
|
||||||
if i.files == nil {
|
|
||||||
i.files = make(map[string]*os.FileInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
i.files[p] = info
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns all of the files that were marked as being included.
|
// Returns all of the files that were marked as being included.
|
||||||
func (i *IncludedFiles) All() map[string]*os.FileInfo {
|
func (i *IncludedFiles) All() []string {
|
||||||
i.RLock()
|
i.RLock()
|
||||||
defer i.RUnlock()
|
defer i.RUnlock()
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gabriel-vasile/mimetype"
|
"github.com/gabriel-vasile/mimetype"
|
||||||
|
"github.com/karrick/godirwalk"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/pterodactyl/wings/config"
|
"github.com/pterodactyl/wings/config"
|
||||||
"github.com/pterodactyl/wings/server/backup"
|
"github.com/pterodactyl/wings/server/backup"
|
||||||
|
@ -21,6 +22,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -210,29 +212,33 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) {
|
||||||
// Because determining the amount of space being used by a server is a taxing operation we
|
// Because determining the amount of space being used by a server is a taxing operation we
|
||||||
// will load it all up into a cache and pull from that as long as the key is not expired.
|
// will load it all up into a cache and pull from that as long as the key is not expired.
|
||||||
func (fs *Filesystem) HasSpaceAvailable() bool {
|
func (fs *Filesystem) HasSpaceAvailable() bool {
|
||||||
space := fs.Server.Build().DiskSpace
|
return true
|
||||||
|
|
||||||
size, err := fs.getCachedDiskUsage()
|
|
||||||
if err != nil {
|
|
||||||
fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine if their folder size, in bytes, is smaller than the amount of space they've
|
|
||||||
// been allocated.
|
|
||||||
fs.Server.Proc().SetDisk(size)
|
|
||||||
|
|
||||||
// If space is -1 or 0 just return true, means they're allowed unlimited.
|
|
||||||
//
|
|
||||||
// Technically we could skip disk space calculation because we don't need to check if the server exceeds it's limit
|
|
||||||
// but because this method caches the disk usage it would be best to calculate the disk usage and always
|
|
||||||
// return true.
|
|
||||||
if space <= 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return (size / 1000.0 / 1000.0) <= space
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// func (fs *Filesystem) HasSpaceAvailable() bool {
|
||||||
|
// space := fs.Server.Build().DiskSpace
|
||||||
|
//
|
||||||
|
// size, err := fs.getCachedDiskUsage()
|
||||||
|
// if err != nil {
|
||||||
|
// fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Determine if their folder size, in bytes, is smaller than the amount of space they've
|
||||||
|
// // been allocated.
|
||||||
|
// fs.Server.Proc().SetDisk(size)
|
||||||
|
//
|
||||||
|
// // If space is -1 or 0 just return true, means they're allowed unlimited.
|
||||||
|
// //
|
||||||
|
// // Technically we could skip disk space calculation because we don't need to check if the server exceeds it's limit
|
||||||
|
// // but because this method caches the disk usage it would be best to calculate the disk usage and always
|
||||||
|
// // return true.
|
||||||
|
// if space <= 0 {
|
||||||
|
// return true
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return (size / 1000.0 / 1000.0) <= space
|
||||||
|
// }
|
||||||
|
|
||||||
// Internal helper function to allow other parts of the codebase to check the total used disk space
|
// Internal helper function to allow other parts of the codebase to check the total used disk space
|
||||||
// as needed without overly taxing the system. This will prioritize the value from the cache to avoid
|
// as needed without overly taxing the system. This will prioritize the value from the cache to avoid
|
||||||
// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there
|
// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there
|
||||||
|
@ -270,20 +276,40 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
|
||||||
// through all of the folders. Returns the size in bytes. This can be a fairly taxing operation
|
// through all of the folders. Returns the size in bytes. This can be a fairly taxing operation
|
||||||
// on locations with tons of files, so it is recommended that you cache the output.
|
// on locations with tons of files, so it is recommended that you cache the output.
|
||||||
func (fs *Filesystem) DirectorySize(dir string) (int64, error) {
|
func (fs *Filesystem) DirectorySize(dir string) (int64, error) {
|
||||||
|
d, err := fs.SafePath(dir)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
var size int64
|
var size int64
|
||||||
err := fs.Walk(dir, func(_ string, f os.FileInfo, err error) error {
|
var st syscall.Stat_t
|
||||||
if err != nil {
|
|
||||||
return fs.handleWalkerError(err, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !f.IsDir() {
|
err = godirwalk.Walk(d, &godirwalk.Options{
|
||||||
atomic.AddInt64(&size, f.Size())
|
Unsorted: true,
|
||||||
}
|
Callback: func(p string, e *godirwalk.Dirent) error {
|
||||||
|
// If this is a symlink then resolve the final destination of it before trying to continue walking
|
||||||
|
// over its contents. If it resolves outside the server data directory just skip everything else for
|
||||||
|
// it. Otherwise, allow it to continue.
|
||||||
|
if e.IsSymlink() {
|
||||||
|
if _, err := fs.SafePath(p); err != nil {
|
||||||
|
if IsPathResolutionError(err) {
|
||||||
|
return godirwalk.SkipThis
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.IsDir() {
|
||||||
|
syscall.Lstat(p, &st)
|
||||||
|
atomic.AddInt64(&size, st.Size)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return size, err
|
return size, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reads a file on the system and returns it as a byte representation in a file
|
// Reads a file on the system and returns it as a byte representation in a file
|
||||||
|
@ -485,19 +511,22 @@ func (fs *Filesystem) Chown(path string) error {
|
||||||
|
|
||||||
// If this was a directory, begin walking over its contents recursively and ensure that all
|
// If this was a directory, begin walking over its contents recursively and ensure that all
|
||||||
// of the subfiles and directories get their permissions updated as well.
|
// of the subfiles and directories get their permissions updated as well.
|
||||||
return fs.Walk(cleaned, func(path string, f os.FileInfo, err error) error {
|
return godirwalk.Walk(cleaned, &godirwalk.Options{
|
||||||
if err != nil {
|
Unsorted: true,
|
||||||
return fs.handleWalkerError(err, f)
|
Callback: func(p string, e *godirwalk.Dirent) error {
|
||||||
}
|
// Do not attempt to chmod a symlink. Go's os.Chown function will affect the symlink
|
||||||
|
// so if it points to a location outside the data directory the user would be able to
|
||||||
|
// (un)intentionally modify that files permissions.
|
||||||
|
if e.IsSymlink() {
|
||||||
|
if e.IsDir() {
|
||||||
|
return godirwalk.SkipThis
|
||||||
|
}
|
||||||
|
|
||||||
// Do not attempt to chmod a symlink. Go's os.Chown function will affect the symlink
|
return nil
|
||||||
// so if it points to a location outside the data directory the user would be able to
|
}
|
||||||
// (un)intentionally modify that files permissions.
|
|
||||||
if f.Mode()&os.ModeSymlink != 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return os.Chown(path, uid, gid)
|
return os.Chown(p, uid, gid)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -733,26 +762,35 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
|
||||||
// files found, and will keep walking deeper and deeper into directories.
|
// files found, and will keep walking deeper and deeper into directories.
|
||||||
inc := new(backup.IncludedFiles)
|
inc := new(backup.IncludedFiles)
|
||||||
|
|
||||||
if err := fs.Walk(cleaned, func(p string, f os.FileInfo, err error) error {
|
err = godirwalk.Walk(cleaned, &godirwalk.Options{
|
||||||
if err != nil {
|
Unsorted: true,
|
||||||
return fs.handleWalkerError(err, f)
|
Callback: func(p string, e *godirwalk.Dirent) error {
|
||||||
}
|
sp := p
|
||||||
|
if e.IsSymlink() {
|
||||||
|
sp, err = fs.SafePath(p)
|
||||||
|
if err != nil {
|
||||||
|
if IsPathResolutionError(err) {
|
||||||
|
return godirwalk.SkipThis
|
||||||
|
}
|
||||||
|
|
||||||
// Avoid unnecessary parsing if there are no ignored files, nothing will match anyways
|
return err
|
||||||
// so no reason to call the function.
|
}
|
||||||
if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) {
|
}
|
||||||
inc.Push(&f, p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We can't just abort if the path is technically ignored. It is possible there is a nested
|
// Avoid unnecessary parsing if there are no ignored files, nothing will match anyways
|
||||||
// file or folder that should not be excluded, so in this case we need to just keep going
|
// so no reason to call the function.
|
||||||
// until we get to a final state.
|
if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(sp, fs.Path()+"/")) {
|
||||||
return nil
|
inc.Push(sp)
|
||||||
}); err != nil {
|
}
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return inc, nil
|
// We can't just abort if the path is technically ignored. It is possible there is a nested
|
||||||
|
// file or folder that should not be excluded, so in this case we need to just keep going
|
||||||
|
// until we get to a final state.
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return inc, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compresses all of the files matching the given paths in the specified directory. This function
|
// Compresses all of the files matching the given paths in the specified directory. This function
|
||||||
|
@ -789,24 +827,38 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.IsDir() {
|
if !f.IsDir() {
|
||||||
err := fs.Walk(p, func(s string, info os.FileInfo, err error) error {
|
inc.Push(p)
|
||||||
if err != nil {
|
} else {
|
||||||
return fs.handleWalkerError(err, info)
|
err := godirwalk.Walk(p, &godirwalk.Options{
|
||||||
}
|
Unsorted: true,
|
||||||
|
Callback: func(p string, e *godirwalk.Dirent) error {
|
||||||
|
sp := p
|
||||||
|
if e.IsSymlink() {
|
||||||
|
// Ensure that any symlinks are properly resolved to their final destination. If
|
||||||
|
// that destination is outside the server directory skip over this entire item, otherwise
|
||||||
|
// use the resolved location for the rest of this function.
|
||||||
|
sp, err = fs.SafePath(p)
|
||||||
|
if err != nil {
|
||||||
|
if IsPathResolutionError(err) {
|
||||||
|
return godirwalk.SkipThis
|
||||||
|
}
|
||||||
|
|
||||||
if !info.IsDir() {
|
return err
|
||||||
inc.Push(&info, s)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
if !e.IsDir() {
|
||||||
|
inc.Push(sp)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
inc.Push(&f, p)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,146 +0,0 @@
|
||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/gammazero/workerpool"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FileWalker struct {
|
|
||||||
*Filesystem
|
|
||||||
}
|
|
||||||
|
|
||||||
type PooledFileWalker struct {
|
|
||||||
wg sync.WaitGroup
|
|
||||||
pool *workerpool.WorkerPool
|
|
||||||
callback filepath.WalkFunc
|
|
||||||
cancel context.CancelFunc
|
|
||||||
|
|
||||||
err error
|
|
||||||
errOnce sync.Once
|
|
||||||
|
|
||||||
Filesystem *Filesystem
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a new walker instance.
|
|
||||||
func (fs *Filesystem) NewWalker() *FileWalker {
|
|
||||||
return &FileWalker{fs}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creates a new pooled file walker that will concurrently walk over a given directory but limit itself
|
|
||||||
// to a worker pool as to not completely flood out the system or cause a process crash.
|
|
||||||
func newPooledWalker(fs *Filesystem) *PooledFileWalker {
|
|
||||||
return &PooledFileWalker{
|
|
||||||
Filesystem: fs,
|
|
||||||
// Run the walker as a single threaded process to optimize disk I/O and avoid CPU issues.
|
|
||||||
pool: workerpool.New(1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process a given path by calling the callback function for all of the files and directories within
|
|
||||||
// the path, and then dropping into any directories that we come across.
|
|
||||||
func (w *PooledFileWalker) process(p string) error {
|
|
||||||
files, err := ioutil.ReadDir(p)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loop over all of the files and directories in the given directory and call the provided
|
|
||||||
// callback function. If we encounter a directory, push that directory onto the worker queue
|
|
||||||
// to be processed.
|
|
||||||
for _, f := range files {
|
|
||||||
sp, err := w.Filesystem.SafeJoin(p, f)
|
|
||||||
if err != nil {
|
|
||||||
// Let the callback function handle what to do if there is a path resolution error because a
|
|
||||||
// dangerous path was resolved. If there is an error returned, return from this entire process
|
|
||||||
// otherwise just skip over this specific file. We don't care if its a file or a directory at
|
|
||||||
// this point since either way we're skipping it, however, still check for the SkipDir since that
|
|
||||||
// would be thrown otherwise.
|
|
||||||
if err = w.callback(sp, f, err); err != nil && err != filepath.SkipDir {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var i os.FileInfo
|
|
||||||
// Re-stat the file or directory if it is determined to be a symlink by statting the result of the
|
|
||||||
// symlink resolution rather than the initial path we received. Only do this on files we _know_
|
|
||||||
// will be returning a different value.
|
|
||||||
if f.Mode()&os.ModeSymlink != 0 {
|
|
||||||
i, err = os.Stat(sp)
|
|
||||||
// You might end up getting an error about a file or folder not existing if the given path
|
|
||||||
// if it is an invalid symlink. We can safely just skip over these files I believe.
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
i = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the user-provided callback for this file or directory. If an error is returned that is
|
|
||||||
// not a SkipDir call, abort the entire process and bubble that error up.
|
|
||||||
if err = w.callback(sp, i, err); err != nil && err != filepath.SkipDir {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this is a directory, and we didn't get a SkipDir error, continue through by pushing another
|
|
||||||
// job to the pool to handle it. If we requested a skip, don't do anything just continue on to the
|
|
||||||
// next item.
|
|
||||||
if i.IsDir() && err != filepath.SkipDir {
|
|
||||||
w.push(sp)
|
|
||||||
} else if !i.IsDir() && err == filepath.SkipDir {
|
|
||||||
// Per the spec for the callback, if we get a SkipDir error but it is returned for an item
|
|
||||||
// that is _not_ a directory, abort the remaining operations on the directory.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push a new path into the worker pool and increment the waitgroup so that we do not return too
|
|
||||||
// early and cause panic's as internal directories attempt to submit to the pool.
|
|
||||||
func (w *PooledFileWalker) push(path string) {
|
|
||||||
w.wg.Add(1)
|
|
||||||
w.pool.Submit(func() {
|
|
||||||
defer w.wg.Done()
|
|
||||||
if err := w.process(path); err != nil {
|
|
||||||
w.errOnce.Do(func() {
|
|
||||||
w.err = err
|
|
||||||
if w.cancel != nil {
|
|
||||||
w.cancel()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Walks the given directory and executes the callback function for all of the files and directories
|
|
||||||
// that are encountered.
|
|
||||||
func (fs *Filesystem) Walk(dir string, callback filepath.WalkFunc) error {
|
|
||||||
w := newPooledWalker(fs)
|
|
||||||
w.callback = callback
|
|
||||||
|
|
||||||
_, cancel := context.WithCancel(context.Background())
|
|
||||||
w.cancel = cancel
|
|
||||||
|
|
||||||
p, err := w.Filesystem.SafePath(dir)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.push(p)
|
|
||||||
w.wg.Wait()
|
|
||||||
w.pool.StopWait()
|
|
||||||
|
|
||||||
if w.err != nil {
|
|
||||||
return w.err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user