From ddfd6d9cce4c025626ea72cecf8226495fba024f Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 2 May 2021 12:28:36 -0700 Subject: [PATCH] Modify backup process to utilize contexts and exponential backoffs If a request to upload a file part to S3 fails for any 5xx reason it will begin using an exponential backoff to keep re-trying the upload until we've reached a minute of trying to access the endpoint. This should resolve temporary resolution issues with URLs and certain S3 compatiable systems such as B2 that sometimes return a 5xx error and just need a retry to be successful. Also supports using the server context to ensure backups are terminated when a server is deleted, and removes the http call without a timeout, replacing it with a 2 hour timeout to account for connections as slow as 10Mbps on a huge file upload. --- go.mod | 1 + go.sum | 3 + server/backup.go | 4 +- server/backup/backup.go | 161 ++++++++++++-------------- server/backup/backup_local.go | 29 +++-- server/backup/backup_s3.go | 210 ++++++++++++++++++++++------------ 6 files changed, 239 insertions(+), 169 deletions(-) diff --git a/go.mod b/go.mod index 5fbfb95..7278b33 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef github.com/beevik/etree v1.1.0 github.com/buger/jsonparser v1.1.0 + github.com/cenkalti/backoff/v4 v4.1.0 github.com/cobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 github.com/containerd/containerd v1.4.3 // indirect github.com/containerd/fifo v0.0.0-20201026212402-0724c46b320c // indirect diff --git a/go.sum b/go.sum index e64fef9..07e2513 100644 --- a/go.sum +++ b/go.sum @@ -73,7 +73,10 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm github.com/buger/jsonparser v1.1.0 h1:EPAGdKZgZCON4ZcMD+h4l/NN4ndr6ijSpj4INh8PbUY= github.com/buger/jsonparser v1.1.0/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/server/backup.go b/server/backup.go index 7d65882..e94fa3e 100644 --- a/server/backup.go +++ b/server/backup.go @@ -66,7 +66,7 @@ func (s *Server) Backup(b backup.BackupInterface) error { } } - ad, err := b.Generate(s.Filesystem().Path(), ignored) + ad, err := b.Generate(s.Context(), s.Filesystem().Path(), ignored) if err != nil { if err := s.notifyPanelOfBackup(b.Identifier(), &backup.ArchiveDetails{}, false); err != nil { s.Log().WithFields(log.Fields{ @@ -150,7 +150,7 @@ func (s *Server) RestoreBackup(b backup.BackupInterface, reader io.ReadCloser) ( // Attempt to restore the backup to the server by running through each entry // in the file one at a time and writing them to the disk. s.Log().Debug("starting file writing process for backup restoration") - err = b.Restore(reader, func(file string, r io.Reader) error { + err = b.Restore(s.Context(), reader, func(file string, r io.Reader) error { s.Events().Publish(DaemonMessageEvent, "(restoring): "+file) return s.Filesystem().Writefile(file, r) }) diff --git a/server/backup/backup.go b/server/backup/backup.go index 5e32826..a3408c7 100644 --- a/server/backup/backup.go +++ b/server/backup/backup.go @@ -1,16 +1,18 @@ package backup import ( + "context" "crypto/sha1" "encoding/hex" "io" "os" "path" - "sync" + "emperror.dev/errors" "github.com/apex/log" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/remote" + "golang.org/x/sync/errgroup" ) type AdapterType string @@ -24,20 +26,37 @@ const ( // and remote backups allowing the files to be restored. type RestoreCallback func(file string, r io.Reader) error -type ArchiveDetails struct { - Checksum string `json:"checksum"` - ChecksumType string `json:"checksum_type"` - Size int64 `json:"size"` -} - -// ToRequest returns a request object. -func (ad *ArchiveDetails) ToRequest(successful bool) remote.BackupRequest { - return remote.BackupRequest{ - Checksum: ad.Checksum, - ChecksumType: ad.ChecksumType, - Size: ad.Size, - Successful: successful, - } +// noinspection GoNameStartsWithPackageName +type BackupInterface interface { + // SetClient sets the API request client on the backup interface. + SetClient(c remote.Client) + // Identifier returns the UUID of this backup as tracked by the panel + // instance. + Identifier() string + // WithLogContext attaches additional context to the log output for this + // backup. + WithLogContext(map[string]interface{}) + // Generate creates a backup in whatever the configured source for the + // specific implementation is. + Generate(ctx context.Context, basePath string, ignore string) (*ArchiveDetails, error) + // Ignored returns the ignored files for this backup instance. + Ignored() string + // Checksum returns a SHA1 checksum for the generated backup. + Checksum() ([]byte, error) + // Size returns the size of the generated backup. + Size() (int64, error) + // Path returns the path to the backup on the machine. This is not always + // the final storage location of the backup, simply the location we're using + // to store it until it is moved to the final spot. + Path() string + // Details returns details about the archive. + Details(ctx context.Context) (*ArchiveDetails, error) + // Remove removes a backup file. + Remove() error + // Restore is called when a backup is ready to be restored to the disk from + // the given source. Not every backup implementation will support this nor + // will every implementation require a reader be provided. + Restore(ctx context.Context, reader io.Reader, callback RestoreCallback) error } type Backup struct { @@ -54,39 +73,6 @@ type Backup struct { logContext map[string]interface{} } -// noinspection GoNameStartsWithPackageName -type BackupInterface interface { - // SetClient sets the API request client on the backup interface. - SetClient(c remote.Client) - // Identifier returns the UUID of this backup as tracked by the panel - // instance. - Identifier() string - // WithLogContext attaches additional context to the log output for this - // backup. - WithLogContext(map[string]interface{}) - // Generate creates a backup in whatever the configured source for the - // specific implementation is. - Generate(string, string) (*ArchiveDetails, error) - // Ignored returns the ignored files for this backup instance. - Ignored() string - // Checksum returns a SHA1 checksum for the generated backup. - Checksum() ([]byte, error) - // Size returns the size of the generated backup. - Size() (int64, error) - // Path returns the path to the backup on the machine. This is not always - // the final storage location of the backup, simply the location we're using - // to store it until it is moved to the final spot. - Path() string - // Details returns details about the archive. - Details() *ArchiveDetails - // Remove removes a backup file. - Remove() error - // Restore is called when a backup is ready to be restored to the disk from - // the given source. Not every backup implementation will support this nor - // will every implementation require a reader be provided. - Restore(reader io.Reader, callback RestoreCallback) error -} - func (b *Backup) SetClient(c remote.Client) { b.client = c } @@ -95,12 +81,12 @@ func (b *Backup) Identifier() string { return b.Uuid } -// Returns the path for this specific backup. +// Path returns the path for this specific backup. func (b *Backup) Path() string { return path.Join(config.Get().System.BackupDirectory, b.Identifier()+".tar.gz") } -// Return the size of the generated backup. +// Size returns the size of the generated backup. func (b *Backup) Size() (int64, error) { st, err := os.Stat(b.Path()) if err != nil { @@ -110,7 +96,7 @@ func (b *Backup) Size() (int64, error) { return st.Size(), nil } -// Returns the SHA256 checksum of a backup. +// Checksum returns the SHA256 checksum of a backup. func (b *Backup) Checksum() ([]byte, error) { h := sha1.New() @@ -128,51 +114,34 @@ func (b *Backup) Checksum() ([]byte, error) { return h.Sum(nil), nil } -// Returns details of the archive by utilizing two go-routines to get the checksum and -// the size of the archive. -func (b *Backup) Details() *ArchiveDetails { - wg := sync.WaitGroup{} - wg.Add(2) +// Details returns both the checksum and size of the archive currently stored on +// the disk to the caller. +func (b *Backup) Details(ctx context.Context) (*ArchiveDetails, error) { + ad := ArchiveDetails{ChecksumType: "sha1"} + g, ctx := errgroup.WithContext(ctx) - l := log.WithField("backup_id", b.Uuid) - - var checksum string - // Calculate the checksum for the file. - go func() { - defer wg.Done() - - l.Info("computing checksum for backup...") + g.Go(func() error { resp, err := b.Checksum() if err != nil { - log.WithFields(log.Fields{ - "backup": b.Identifier(), - "error": err, - }).Error("failed to calculate checksum for backup") - return + return err } + ad.Checksum = hex.EncodeToString(resp) + return nil + }) - checksum = hex.EncodeToString(resp) - l.WithField("checksum", checksum).Info("computed checksum for backup") - }() - - var sz int64 - go func() { - defer wg.Done() - - if s, err := b.Size(); err != nil { - return - } else { - sz = s + g.Go(func() error { + s, err := b.Size() + if err != nil { + return err } - }() + ad.Size = s + return nil + }) - wg.Wait() - - return &ArchiveDetails{ - Checksum: checksum, - ChecksumType: "sha1", - Size: sz, + if err := g.Wait(); err != nil { + return nil, errors.WithStackDepth(err, 1) } + return &ad, nil } func (b *Backup) Ignored() string { @@ -188,3 +157,19 @@ func (b *Backup) log() *log.Entry { } return l } + +type ArchiveDetails struct { + Checksum string `json:"checksum"` + ChecksumType string `json:"checksum_type"` + Size int64 `json:"size"` +} + +// ToRequest returns a request object. +func (ad *ArchiveDetails) ToRequest(successful bool) remote.BackupRequest { + return remote.BackupRequest{ + Checksum: ad.Checksum, + ChecksumType: ad.ChecksumType, + Size: ad.Size, + Successful: successful, + } +} \ No newline at end of file diff --git a/server/backup/backup_local.go b/server/backup/backup_local.go index f57a0ec..68a6603 100644 --- a/server/backup/backup_local.go +++ b/server/backup/backup_local.go @@ -1,10 +1,11 @@ package backup import ( - "errors" + "context" "io" "os" + "emperror.dev/errors" "github.com/pterodactyl/wings/server/filesystem" "github.com/mholt/archiver/v3" @@ -56,28 +57,40 @@ func (b *LocalBackup) WithLogContext(c map[string]interface{}) { // Generate generates a backup of the selected files and pushes it to the // defined location for this instance. -func (b *LocalBackup) Generate(basePath, ignore string) (*ArchiveDetails, error) { +func (b *LocalBackup) Generate(ctx context.Context, basePath, ignore string) (*ArchiveDetails, error) { a := &filesystem.Archive{ BasePath: basePath, Ignore: ignore, } - b.log().Info("creating backup for server...") + b.log().WithField("path", b.Path()).Info("creating backup for server") if err := a.Create(b.Path()); err != nil { return nil, err } b.log().Info("created backup successfully") - return b.Details(), nil + ad, err := b.Details(ctx) + if err != nil { + return nil, errors.WrapIf(err, "backup: failed to get archive details for local backup") + } + return ad, nil } // Restore will walk over the archive and call the callback function for each // file encountered. -func (b *LocalBackup) Restore(_ io.Reader, callback RestoreCallback) error { +func (b *LocalBackup) Restore(ctx context.Context, _ io.Reader, callback RestoreCallback) error { return archiver.Walk(b.Path(), func(f archiver.File) error { - if f.IsDir() { - return nil + select { + case <-ctx.Done(): + // Stop walking if the context is canceled. + return archiver.ErrStopWalk + default: + { + if f.IsDir() { + return nil + } + return callback(filesystem.ExtractNameFromArchive(f), f) + } } - return callback(filesystem.ExtractNameFromArchive(f), f) }) } diff --git a/server/backup/backup_s3.go b/server/backup/backup_s3.go index 88b2a5e..02ec13f 100644 --- a/server/backup/backup_s3.go +++ b/server/backup/backup_s3.go @@ -5,11 +5,15 @@ import ( "compress/gzip" "context" "fmt" - "github.com/pterodactyl/wings/server/filesystem" "io" "net/http" "os" "strconv" + "time" + + "emperror.dev/errors" + "github.com/cenkalti/backoff/v4" + "github.com/pterodactyl/wings/server/filesystem" "github.com/juju/ratelimit" "github.com/pterodactyl/wings/config" @@ -45,7 +49,7 @@ func (s *S3Backup) WithLogContext(c map[string]interface{}) { // Generate creates a new backup on the disk, moves it into the S3 bucket via // the provided presigned URL, and then deletes the backup from the disk. -func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) { +func (s *S3Backup) Generate(ctx context.Context, basePath, ignore string) (*ArchiveDetails, error) { defer s.Remove() a := &filesystem.Archive{ @@ -53,7 +57,7 @@ func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) { Ignore: ignore, } - s.log().Info("creating backup for server...") + s.log().WithField("path", s.Path()).Info("creating backup for server") if err := a.Create(s.Path()); err != nil { return nil, err } @@ -61,29 +65,65 @@ func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) { rc, err := os.Open(s.Path()) if err != nil { - return nil, err + return nil, errors.Wrap(err, "backup: could not read archive from disk") } defer rc.Close() - if err := s.generateRemoteRequest(rc); err != nil { + if err := s.generateRemoteRequest(ctx, rc); err != nil { return nil, err } - - return s.Details(), nil + ad, err := s.Details(ctx) + if err != nil { + return nil, errors.WrapIf(err, "backup: failed to get archive details after upload") + } + return ad, nil } -// Reader provides a wrapper around an existing io.Reader -// but implements io.Closer in order to satisfy an io.ReadCloser. -type Reader struct { - io.Reader -} - -func (Reader) Close() error { +// Restore will read from the provided reader assuming that it is a gzipped +// tar reader. When a file is encountered in the archive the callback function +// will be triggered. If the callback returns an error the entire process is +// stopped, otherwise this function will run until all files have been written. +// +// This restoration uses a workerpool to use up to the number of CPUs available +// on the machine when writing files to the disk. +func (s *S3Backup) Restore(ctx context.Context, r io.Reader, callback RestoreCallback) error { + reader := r + // Steal the logic we use for making backups which will be applied when restoring + // this specific backup. This allows us to prevent overloading the disk unintentionally. + if writeLimit := int64(config.Get().System.Backups.WriteLimit * 1024 * 1024); writeLimit > 0 { + reader = ratelimit.Reader(r, ratelimit.NewBucketWithRate(float64(writeLimit), writeLimit)) + } + gr, err := gzip.NewReader(reader) + if err != nil { + return err + } + defer gr.Close() + tr := tar.NewReader(gr) + for { + select { + case <-ctx.Done(): + return nil + default: + // Do nothing, fall through to the next block of code in this loop. + } + header, err := tr.Next() + if err != nil { + if err == io.EOF { + break + } + return err + } + if header.Typeflag == tar.TypeReg { + if err := callback(header.Name, tr); err != nil { + return err + } + } + } return nil } // Generates the remote S3 request and begins the upload. -func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { +func (s *S3Backup) generateRemoteRequest(ctx context.Context, rc io.ReadCloser) error { defer rc.Close() s.log().Debug("attempting to get size of backup...") @@ -101,37 +141,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { s.log().Debug("got S3 upload urls from the Panel") s.log().WithField("parts", len(urls.Parts)).Info("attempting to upload backup to s3 endpoint...") - handlePart := func(part string, size int64) (string, error) { - r, err := http.NewRequest(http.MethodPut, part, nil) - if err != nil { - return "", err - } - - r.ContentLength = size - r.Header.Add("Content-Length", strconv.Itoa(int(size))) - r.Header.Add("Content-Type", "application/x-gzip") - - // Limit the reader to the size of the part. - r.Body = Reader{Reader: io.LimitReader(rc, size)} - - // This http request can block forever due to it not having a timeout, - // but we are uploading up to 5GB of data, so there is not really - // a good way to handle a timeout on this. - res, err := http.DefaultClient.Do(r) - if err != nil { - return "", err - } - defer res.Body.Close() - - // Handle non-200 status codes. - if res.StatusCode != http.StatusOK { - return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status) - } - - // Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request. - return res.Header.Get("ETag"), nil - } - + uploader := newS3FileUploader(rc) for i, part := range urls.Parts { // Get the size for the current part. var partSize int64 @@ -144,7 +154,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { } // Attempt to upload the part. - if _, err := handlePart(part, partSize); err != nil { + if _, err := uploader.uploadPart(ctx, part, partSize); err != nil { s.log().WithField("part_id", i+1).WithError(err).Warn("failed to upload part") return err } @@ -157,39 +167,97 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { return nil } -// Restore will read from the provided reader assuming that it is a gzipped -// tar reader. When a file is encountered in the archive the callback function -// will be triggered. If the callback returns an error the entire process is -// stopped, otherwise this function will run until all files have been written. +type s3FileUploader struct { + io.ReadCloser + client *http.Client +} + +// newS3FileUploader returns a new file uploader instance. +func newS3FileUploader(file io.ReadCloser) *s3FileUploader { + return &s3FileUploader{ + ReadCloser: file, + // We purposefully use a super high timeout on this request since we need to upload + // a 5GB file. This assumes at worst a 10Mbps connection for uploading. While technically + // you could go slower we're targeting mostly hosted servers that should have 100Mbps + // connections anyways. + client: &http.Client{Timeout: time.Hour * 2}, + } +} + +// backoff returns a new expoential backoff implementation using a context that +// will also stop the backoff if it is canceled. +func (fu *s3FileUploader) backoff(ctx context.Context) backoff.BackOffContext { + b := backoff.NewExponentialBackOff() + b.Multiplier = 2 + b.MaxElapsedTime = time.Minute + + return backoff.WithContext(b, ctx) +} + +// uploadPart attempts to upload a given S3 file part to the S3 system. If a +// 5xx error is returned from the endpoint this will continue with an exponential +// backoff to try and successfully upload the part. // -// This restoration uses a workerpool to use up to the number of CPUs available -// on the machine when writing files to the disk. -func (s *S3Backup) Restore(r io.Reader, callback RestoreCallback) error { - reader := r - // Steal the logic we use for making backups which will be applied when restoring - // this specific backup. This allows us to prevent overloading the disk unintentionally. - if writeLimit := int64(config.Get().System.Backups.WriteLimit * 1024 * 1024); writeLimit > 0 { - reader = ratelimit.Reader(r, ratelimit.NewBucketWithRate(float64(writeLimit), writeLimit)) - } - gr, err := gzip.NewReader(reader) +// Once uploaded the ETag is returned to the caller. +func (fu *s3FileUploader) uploadPart(ctx context.Context, part string, size int64) (string, error) { + r, err := http.NewRequestWithContext(ctx, http.MethodPut, part, nil) if err != nil { - return err + return "", errors.Wrap(err, "backup: could not create request for S3") } - defer gr.Close() - tr := tar.NewReader(gr) - for { - header, err := tr.Next() + + r.ContentLength = size + r.Header.Add("Content-Length", strconv.Itoa(int(size))) + r.Header.Add("Content-Type", "application/x-gzip") + + // Limit the reader to the size of the part. + r.Body = Reader{Reader: io.LimitReader(fu.ReadCloser, size)} + + var etag string + err = backoff.Retry(func() error { + res, err := fu.client.Do(r) if err != nil { - if err == io.EOF { - break + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return backoff.Permanent(err) } - return err + // Don't use a permanent error here, if there is a temporary resolution error with + // the URL due to DNS issues we want to keep re-trying. + return errors.Wrap(err, "backup: S3 HTTP request failed") } - if header.Typeflag == tar.TypeReg { - if err := callback(header.Name, tr); err != nil { + _ = res.Body.Close() + + if res.StatusCode != http.StatusOK { + err := errors.New(fmt.Sprintf("backup: failed to put S3 object: [HTTP/%d] %s", res.StatusCode, res.Status)) + // Only attempt a backoff retry if this error is because of a 5xx error from + // the S3 endpoint. Any 4xx error should be treated as an error that a retry + // would not fix. + if res.StatusCode >= http.StatusInternalServerError { return err } + return backoff.Permanent(err) } + + // Get the ETag from the uploaded part, this should be sent with the + // CompleteMultipartUpload request. + etag = res.Header.Get("ETag") + + return nil + }, fu.backoff(ctx)) + + if err != nil { + if v, ok := err.(*backoff.PermanentError); ok { + return "", v.Unwrap() + } + return "", err } + return etag, nil +} + +// Reader provides a wrapper around an existing io.Reader +// but implements io.Closer in order to satisfy an io.ReadCloser. +type Reader struct { + io.Reader +} + +func (Reader) Close() error { return nil }