Performance improvements by using a smaller buffer size

This commit is contained in:
Dane Everitt 2020-08-23 17:18:40 -07:00
parent 999947e387
commit 09826fc7ad
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
10 changed files with 44 additions and 98 deletions

View File

@ -357,7 +357,7 @@ func configureLogging(logDir string, debug bool) error {
p := filepath.Join(logDir, "/wings.log") p := filepath.Join(logDir, "/wings.log")
w, err := logrotate.NewFile(p) w, err := logrotate.NewFile(p)
if err != nil { if err != nil {
panic(errors.WithMessage(err, "failed to open process log file")) panic(errors.Wrap(err, "failed to open process log file"))
} }
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)

View File

@ -220,7 +220,8 @@ func postTransfer(c *gin.Context) {
} }
// Copy the file. // Copy the file.
_, err = io.Copy(file, res.Body) buf := make([]byte, 1024 * 4)
_, err = io.CopyBuffer(file, res.Body, buf)
if err != nil { if err != nil {
zap.S().Errorw("failed to copy file to disk", zap.Error(err)) zap.S().Errorw("failed to copy file to disk", zap.Error(err))
return return
@ -242,7 +243,8 @@ func postTransfer(c *gin.Context) {
// Compute the sha256 checksum of the file. // Compute the sha256 checksum of the file.
hash := sha256.New() hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil { buf = make([]byte, 1024 * 4)
if _, err := io.CopyBuffer(hash, file, buf); err != nil {
zap.S().Errorw("failed to copy file for checksum verification", zap.Error(err)) zap.S().Errorw("failed to copy file for checksum verification", zap.Error(err))
return return
} }

View File

@ -101,7 +101,9 @@ func (a *Archiver) Checksum() (string, error) {
defer file.Close() defer file.Close()
hash := sha256.New() hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
buf := make([]byte, 1024*4)
if _, err := io.CopyBuffer(hash, file, buf); err != nil {
return "", err return "", err
} }

View File

@ -29,9 +29,11 @@ func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) {
defer f.Close() defer f.Close()
gzw := gzip.NewWriter(f) gzw := gzip.NewWriter(f)
defer gzw.Flush()
defer gzw.Close() defer gzw.Close()
tw := tar.NewWriter(gzw) tw := tar.NewWriter(gzw)
defer tw.Flush()
defer tw.Close() defer tw.Close()
wg := sizedwaitgroup.New(10) wg := sizedwaitgroup.New(10)
@ -108,7 +110,8 @@ func (a *Archive) addToArchive(p string, s *os.FileInfo, w *tar.Writer) error {
return err return err
} }
if _, err := io.Copy(w, f); err != nil { buf := make([]byte, 4*1024)
if _, err := io.CopyBuffer(w, f, buf); err != nil {
return err return err
} }

View File

@ -97,12 +97,13 @@ func (b *Backup) Checksum() ([]byte, error) {
f, err := os.Open(b.Path()) f, err := os.Open(b.Path())
if err != nil { if err != nil {
return []byte{}, errors.WithStack(err) return nil, errors.WithStack(err)
} }
defer f.Close() defer f.Close()
if _, err := io.Copy(h, f); err != nil { buf := make([]byte, 1024*4)
return []byte{}, errors.WithStack(err) if _, err := io.CopyBuffer(h, f, buf); err != nil {
return nil, err
} }
return h.Sum(nil), nil return h.Sum(nil), nil
@ -123,7 +124,7 @@ func (b *Backup) Details() *ArchiveDetails {
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"backup": b.Identifier(), "backup": b.Identifier(),
"error": err, "error": err,
}).Error("failed to calculate checksum for backup") }).Error("failed to calculate checksum for backup")
} }
@ -151,4 +152,4 @@ func (b *Backup) Details() *ArchiveDetails {
func (b *Backup) Ignored() []string { func (b *Backup) Ignored() []string {
return b.IgnoredFiles return b.IgnoredFiles
} }

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
@ -343,36 +342,11 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
} }
defer file.Close() defer file.Close()
// Create a new buffered writer that will write to the file we just opened buf := make([]byte, 1024*4)
// and stream in the contents from the reader. sz, err := io.CopyBuffer(file, r, buf)
w := bufio.NewWriter(file)
buf := make([]byte, 1024)
var sizeWritten int
for {
n, err := r.Read(buf)
if err != nil && err != io.EOF {
return errors.WithStack(err)
}
if n == 0 {
break
}
if sz, err := w.Write(buf[:n]); err != nil {
return errors.WithStack(err)
} else {
sizeWritten += sz
}
}
if err := w.Flush(); err != nil {
return errors.WithStack(err)
}
// Adjust the disk usage to account for the old size and the new size of the file. // Adjust the disk usage to account for the old size and the new size of the file.
atomic.AddInt64(&fs.diskUsage, int64(sizeWritten) - currentSize) atomic.AddInt64(&fs.diskUsage, sz-currentSize)
// Finally, chown the file to ensure the permissions don't end up out-of-whack // Finally, chown the file to ensure the permissions don't end up out-of-whack
// if we had just created it. // if we had just created it.

View File

@ -7,11 +7,9 @@ import (
"fmt" "fmt"
"github.com/mholt/archiver/v3" "github.com/mholt/archiver/v3"
"github.com/pkg/errors" "github.com/pkg/errors"
"io"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -88,53 +86,19 @@ func (fs *Filesystem) DecompressFile(dir string, file string) error {
return nil return nil
} }
return fs.extractFileFromArchive(f) var name string
switch s := f.Sys().(type) {
case *tar.Header:
name = s.Name
case *gzip.Header:
name = s.Name
case *zip.FileHeader:
name = s.Name
default:
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
}
return errors.Wrap(fs.Writefile(name, f), "could not extract file from archive")
}) })
} }
// Extracts a single file from the archive and writes it to the disk after verifying that it will end
// up in the server data directory.
func (fs *Filesystem) extractFileFromArchive(f archiver.File) error {
var name string
switch s := f.Sys().(type) {
case *tar.Header:
name = s.Name
case *gzip.Header:
name = s.Name
case *zip.FileHeader:
name = s.Name
default:
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
}
// Guard against a zip-slip attack and prevent writing a file to a destination outside of
// the server root directory.
p, err := fs.SafePath(name)
if err != nil {
return err
}
// Ensure the directory structure for this file exists before trying to write the file
// to the disk, otherwise we'll have some unexpected fun.
if err := os.MkdirAll(strings.TrimSuffix(p, filepath.Base(p)), 0755); err != nil {
return err
}
// Open the file and truncate it if it already exists.
o, err := os.OpenFile(p, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer o.Close()
sz, cerr := io.Copy(o, f)
if cerr != nil {
return errors.WithStack(err)
}
atomic.AddInt64(&fs.diskUsage, sz)
return nil
}

View File

@ -295,11 +295,11 @@ func (ip *InstallationProcess) pullInstallationImage() error {
func (ip *InstallationProcess) BeforeExecute() (string, error) { func (ip *InstallationProcess) BeforeExecute() (string, error) {
fileName, err := ip.writeScriptToDisk() fileName, err := ip.writeScriptToDisk()
if err != nil { if err != nil {
return "", errors.WithMessage(err, "failed to write installation script to disk") return "", errors.Wrap(err, "failed to write installation script to disk")
} }
if err := ip.pullInstallationImage(); err != nil { if err := ip.pullInstallationImage(); err != nil {
return "", errors.WithMessage(err, "failed to pull updated installation container image for server") return "", errors.Wrap(err, "failed to pull updated installation container image for server")
} }
opts := types.ContainerRemoveOptions{ opts := types.ContainerRemoveOptions{
@ -309,7 +309,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) {
if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil { if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil {
if !client.IsErrNotFound(err) { if !client.IsErrNotFound(err) {
return "", errors.WithMessage(err, "failed to remove existing install container for server") return "", errors.Wrap(err, "failed to remove existing install container for server")
} }
} }

View File

@ -84,12 +84,12 @@ func LoadDirectory() error {
func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
cfg := Configuration{} cfg := Configuration{}
if err := defaults.Set(&cfg); err != nil { if err := defaults.Set(&cfg); err != nil {
return nil, errors.WithMessage(err, "failed to set struct defaults for server configuration") return nil, errors.Wrap(err, "failed to set struct defaults for server configuration")
} }
s := new(Server) s := new(Server)
if err := defaults.Set(s); err != nil { if err := defaults.Set(s); err != nil {
return nil, errors.WithMessage(err, "failed to set struct defaults for server") return nil, errors.Wrap(err, "failed to set struct defaults for server")
} }
s.cfg = cfg s.cfg = cfg

View File

@ -62,13 +62,13 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
// time than that passes an error will be propagated back up the chain and this // time than that passes an error will be propagated back up the chain and this
// request will be aborted. // request will be aborted.
if err := s.powerLock.Acquire(ctx, 1); err != nil { if err := s.powerLock.Acquire(ctx, 1); err != nil {
return errors.WithMessage(err, "could not acquire lock on power state") return errors.Wrap(err, "could not acquire lock on power state")
} }
} else { } else {
// If no wait duration was provided we will attempt to immediately acquire the lock // If no wait duration was provided we will attempt to immediately acquire the lock
// and bail out with a context deadline error if it is not acquired immediately. // and bail out with a context deadline error if it is not acquired immediately.
if ok := s.powerLock.TryAcquire(1); !ok { if ok := s.powerLock.TryAcquire(1); !ok {
return errors.WithMessage(context.DeadlineExceeded, "could not acquire lock on power state") return errors.Wrap(context.DeadlineExceeded, "could not acquire lock on power state")
} }
} }
@ -132,7 +132,7 @@ func (s *Server) onBeforeStart() error {
s.Log().Info("syncing server configuration with panel") s.Log().Info("syncing server configuration with panel")
if err := s.Sync(); err != nil { if err := s.Sync(); err != nil {
return errors.WithMessage(err, "unable to sync server data from Panel instance") return errors.Wrap(err, "unable to sync server data from Panel instance")
} }
if !s.Filesystem.HasSpaceAvailable() { if !s.Filesystem.HasSpaceAvailable() {
@ -150,7 +150,7 @@ func (s *Server) onBeforeStart() error {
s.PublishConsoleOutputFromDaemon("Ensuring file permissions are set correctly, this could take a few seconds...") s.PublishConsoleOutputFromDaemon("Ensuring file permissions are set correctly, this could take a few seconds...")
// Ensure all of the server file permissions are set correctly before booting the process. // Ensure all of the server file permissions are set correctly before booting the process.
if err := s.Filesystem.Chown("/"); err != nil { if err := s.Filesystem.Chown("/"); err != nil {
return errors.WithMessage(err, "failed to chown root server directory during pre-boot process") return errors.Wrap(err, "failed to chown root server directory during pre-boot process")
} }
return nil return nil