Modify backup process to utilize contexts and exponential backoffs
If a request to upload a file part to S3 fails for any 5xx reason it will begin using an exponential backoff to keep re-trying the upload until we've reached a minute of trying to access the endpoint. This should resolve temporary resolution issues with URLs and certain S3 compatiable systems such as B2 that sometimes return a 5xx error and just need a retry to be successful. Also supports using the server context to ensure backups are terminated when a server is deleted, and removes the http call without a timeout, replacing it with a 2 hour timeout to account for connections as slow as 10Mbps on a huge file upload.
This commit is contained in:
parent
da74ac8291
commit
ddfd6d9cce
1
go.mod
1
go.mod
|
@ -14,6 +14,7 @@ require (
|
||||||
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef
|
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef
|
||||||
github.com/beevik/etree v1.1.0
|
github.com/beevik/etree v1.1.0
|
||||||
github.com/buger/jsonparser v1.1.0
|
github.com/buger/jsonparser v1.1.0
|
||||||
|
github.com/cenkalti/backoff/v4 v4.1.0
|
||||||
github.com/cobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249
|
github.com/cobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249
|
||||||
github.com/containerd/containerd v1.4.3 // indirect
|
github.com/containerd/containerd v1.4.3 // indirect
|
||||||
github.com/containerd/fifo v0.0.0-20201026212402-0724c46b320c // indirect
|
github.com/containerd/fifo v0.0.0-20201026212402-0724c46b320c // indirect
|
||||||
|
|
3
go.sum
3
go.sum
|
@ -73,7 +73,10 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
|
||||||
github.com/buger/jsonparser v1.1.0 h1:EPAGdKZgZCON4ZcMD+h4l/NN4ndr6ijSpj4INh8PbUY=
|
github.com/buger/jsonparser v1.1.0 h1:EPAGdKZgZCON4ZcMD+h4l/NN4ndr6ijSpj4INh8PbUY=
|
||||||
github.com/buger/jsonparser v1.1.0/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
github.com/buger/jsonparser v1.1.0/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
||||||
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
||||||
|
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
|
||||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||||
|
|
|
@ -66,7 +66,7 @@ func (s *Server) Backup(b backup.BackupInterface) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ad, err := b.Generate(s.Filesystem().Path(), ignored)
|
ad, err := b.Generate(s.Context(), s.Filesystem().Path(), ignored)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := s.notifyPanelOfBackup(b.Identifier(), &backup.ArchiveDetails{}, false); err != nil {
|
if err := s.notifyPanelOfBackup(b.Identifier(), &backup.ArchiveDetails{}, false); err != nil {
|
||||||
s.Log().WithFields(log.Fields{
|
s.Log().WithFields(log.Fields{
|
||||||
|
@ -150,7 +150,7 @@ func (s *Server) RestoreBackup(b backup.BackupInterface, reader io.ReadCloser) (
|
||||||
// Attempt to restore the backup to the server by running through each entry
|
// Attempt to restore the backup to the server by running through each entry
|
||||||
// in the file one at a time and writing them to the disk.
|
// in the file one at a time and writing them to the disk.
|
||||||
s.Log().Debug("starting file writing process for backup restoration")
|
s.Log().Debug("starting file writing process for backup restoration")
|
||||||
err = b.Restore(reader, func(file string, r io.Reader) error {
|
err = b.Restore(s.Context(), reader, func(file string, r io.Reader) error {
|
||||||
s.Events().Publish(DaemonMessageEvent, "(restoring): "+file)
|
s.Events().Publish(DaemonMessageEvent, "(restoring): "+file)
|
||||||
return s.Filesystem().Writefile(file, r)
|
return s.Filesystem().Writefile(file, r)
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,16 +1,18 @@
|
||||||
package backup
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/sha1"
|
"crypto/sha1"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
|
||||||
|
|
||||||
|
"emperror.dev/errors"
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/pterodactyl/wings/config"
|
"github.com/pterodactyl/wings/config"
|
||||||
"github.com/pterodactyl/wings/remote"
|
"github.com/pterodactyl/wings/remote"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AdapterType string
|
type AdapterType string
|
||||||
|
@ -24,20 +26,37 @@ const (
|
||||||
// and remote backups allowing the files to be restored.
|
// and remote backups allowing the files to be restored.
|
||||||
type RestoreCallback func(file string, r io.Reader) error
|
type RestoreCallback func(file string, r io.Reader) error
|
||||||
|
|
||||||
type ArchiveDetails struct {
|
// noinspection GoNameStartsWithPackageName
|
||||||
Checksum string `json:"checksum"`
|
type BackupInterface interface {
|
||||||
ChecksumType string `json:"checksum_type"`
|
// SetClient sets the API request client on the backup interface.
|
||||||
Size int64 `json:"size"`
|
SetClient(c remote.Client)
|
||||||
}
|
// Identifier returns the UUID of this backup as tracked by the panel
|
||||||
|
// instance.
|
||||||
// ToRequest returns a request object.
|
Identifier() string
|
||||||
func (ad *ArchiveDetails) ToRequest(successful bool) remote.BackupRequest {
|
// WithLogContext attaches additional context to the log output for this
|
||||||
return remote.BackupRequest{
|
// backup.
|
||||||
Checksum: ad.Checksum,
|
WithLogContext(map[string]interface{})
|
||||||
ChecksumType: ad.ChecksumType,
|
// Generate creates a backup in whatever the configured source for the
|
||||||
Size: ad.Size,
|
// specific implementation is.
|
||||||
Successful: successful,
|
Generate(ctx context.Context, basePath string, ignore string) (*ArchiveDetails, error)
|
||||||
}
|
// Ignored returns the ignored files for this backup instance.
|
||||||
|
Ignored() string
|
||||||
|
// Checksum returns a SHA1 checksum for the generated backup.
|
||||||
|
Checksum() ([]byte, error)
|
||||||
|
// Size returns the size of the generated backup.
|
||||||
|
Size() (int64, error)
|
||||||
|
// Path returns the path to the backup on the machine. This is not always
|
||||||
|
// the final storage location of the backup, simply the location we're using
|
||||||
|
// to store it until it is moved to the final spot.
|
||||||
|
Path() string
|
||||||
|
// Details returns details about the archive.
|
||||||
|
Details(ctx context.Context) (*ArchiveDetails, error)
|
||||||
|
// Remove removes a backup file.
|
||||||
|
Remove() error
|
||||||
|
// Restore is called when a backup is ready to be restored to the disk from
|
||||||
|
// the given source. Not every backup implementation will support this nor
|
||||||
|
// will every implementation require a reader be provided.
|
||||||
|
Restore(ctx context.Context, reader io.Reader, callback RestoreCallback) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backup struct {
|
type Backup struct {
|
||||||
|
@ -54,39 +73,6 @@ type Backup struct {
|
||||||
logContext map[string]interface{}
|
logContext map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// noinspection GoNameStartsWithPackageName
|
|
||||||
type BackupInterface interface {
|
|
||||||
// SetClient sets the API request client on the backup interface.
|
|
||||||
SetClient(c remote.Client)
|
|
||||||
// Identifier returns the UUID of this backup as tracked by the panel
|
|
||||||
// instance.
|
|
||||||
Identifier() string
|
|
||||||
// WithLogContext attaches additional context to the log output for this
|
|
||||||
// backup.
|
|
||||||
WithLogContext(map[string]interface{})
|
|
||||||
// Generate creates a backup in whatever the configured source for the
|
|
||||||
// specific implementation is.
|
|
||||||
Generate(string, string) (*ArchiveDetails, error)
|
|
||||||
// Ignored returns the ignored files for this backup instance.
|
|
||||||
Ignored() string
|
|
||||||
// Checksum returns a SHA1 checksum for the generated backup.
|
|
||||||
Checksum() ([]byte, error)
|
|
||||||
// Size returns the size of the generated backup.
|
|
||||||
Size() (int64, error)
|
|
||||||
// Path returns the path to the backup on the machine. This is not always
|
|
||||||
// the final storage location of the backup, simply the location we're using
|
|
||||||
// to store it until it is moved to the final spot.
|
|
||||||
Path() string
|
|
||||||
// Details returns details about the archive.
|
|
||||||
Details() *ArchiveDetails
|
|
||||||
// Remove removes a backup file.
|
|
||||||
Remove() error
|
|
||||||
// Restore is called when a backup is ready to be restored to the disk from
|
|
||||||
// the given source. Not every backup implementation will support this nor
|
|
||||||
// will every implementation require a reader be provided.
|
|
||||||
Restore(reader io.Reader, callback RestoreCallback) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Backup) SetClient(c remote.Client) {
|
func (b *Backup) SetClient(c remote.Client) {
|
||||||
b.client = c
|
b.client = c
|
||||||
}
|
}
|
||||||
|
@ -95,12 +81,12 @@ func (b *Backup) Identifier() string {
|
||||||
return b.Uuid
|
return b.Uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the path for this specific backup.
|
// Path returns the path for this specific backup.
|
||||||
func (b *Backup) Path() string {
|
func (b *Backup) Path() string {
|
||||||
return path.Join(config.Get().System.BackupDirectory, b.Identifier()+".tar.gz")
|
return path.Join(config.Get().System.BackupDirectory, b.Identifier()+".tar.gz")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the size of the generated backup.
|
// Size returns the size of the generated backup.
|
||||||
func (b *Backup) Size() (int64, error) {
|
func (b *Backup) Size() (int64, error) {
|
||||||
st, err := os.Stat(b.Path())
|
st, err := os.Stat(b.Path())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -110,7 +96,7 @@ func (b *Backup) Size() (int64, error) {
|
||||||
return st.Size(), nil
|
return st.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the SHA256 checksum of a backup.
|
// Checksum returns the SHA256 checksum of a backup.
|
||||||
func (b *Backup) Checksum() ([]byte, error) {
|
func (b *Backup) Checksum() ([]byte, error) {
|
||||||
h := sha1.New()
|
h := sha1.New()
|
||||||
|
|
||||||
|
@ -128,51 +114,34 @@ func (b *Backup) Checksum() ([]byte, error) {
|
||||||
return h.Sum(nil), nil
|
return h.Sum(nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns details of the archive by utilizing two go-routines to get the checksum and
|
// Details returns both the checksum and size of the archive currently stored on
|
||||||
// the size of the archive.
|
// the disk to the caller.
|
||||||
func (b *Backup) Details() *ArchiveDetails {
|
func (b *Backup) Details(ctx context.Context) (*ArchiveDetails, error) {
|
||||||
wg := sync.WaitGroup{}
|
ad := ArchiveDetails{ChecksumType: "sha1"}
|
||||||
wg.Add(2)
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
l := log.WithField("backup_id", b.Uuid)
|
g.Go(func() error {
|
||||||
|
|
||||||
var checksum string
|
|
||||||
// Calculate the checksum for the file.
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
l.Info("computing checksum for backup...")
|
|
||||||
resp, err := b.Checksum()
|
resp, err := b.Checksum()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
return err
|
||||||
"backup": b.Identifier(),
|
|
||||||
"error": err,
|
|
||||||
}).Error("failed to calculate checksum for backup")
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
ad.Checksum = hex.EncodeToString(resp)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
checksum = hex.EncodeToString(resp)
|
g.Go(func() error {
|
||||||
l.WithField("checksum", checksum).Info("computed checksum for backup")
|
s, err := b.Size()
|
||||||
}()
|
if err != nil {
|
||||||
|
return err
|
||||||
var sz int64
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
if s, err := b.Size(); err != nil {
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
sz = s
|
|
||||||
}
|
}
|
||||||
}()
|
ad.Size = s
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
wg.Wait()
|
if err := g.Wait(); err != nil {
|
||||||
|
return nil, errors.WithStackDepth(err, 1)
|
||||||
return &ArchiveDetails{
|
|
||||||
Checksum: checksum,
|
|
||||||
ChecksumType: "sha1",
|
|
||||||
Size: sz,
|
|
||||||
}
|
}
|
||||||
|
return &ad, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backup) Ignored() string {
|
func (b *Backup) Ignored() string {
|
||||||
|
@ -188,3 +157,19 @@ func (b *Backup) log() *log.Entry {
|
||||||
}
|
}
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ArchiveDetails struct {
|
||||||
|
Checksum string `json:"checksum"`
|
||||||
|
ChecksumType string `json:"checksum_type"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToRequest returns a request object.
|
||||||
|
func (ad *ArchiveDetails) ToRequest(successful bool) remote.BackupRequest {
|
||||||
|
return remote.BackupRequest{
|
||||||
|
Checksum: ad.Checksum,
|
||||||
|
ChecksumType: ad.ChecksumType,
|
||||||
|
Size: ad.Size,
|
||||||
|
Successful: successful,
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,10 +1,11 @@
|
||||||
package backup
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"emperror.dev/errors"
|
||||||
"github.com/pterodactyl/wings/server/filesystem"
|
"github.com/pterodactyl/wings/server/filesystem"
|
||||||
|
|
||||||
"github.com/mholt/archiver/v3"
|
"github.com/mholt/archiver/v3"
|
||||||
|
@ -56,28 +57,40 @@ func (b *LocalBackup) WithLogContext(c map[string]interface{}) {
|
||||||
|
|
||||||
// Generate generates a backup of the selected files and pushes it to the
|
// Generate generates a backup of the selected files and pushes it to the
|
||||||
// defined location for this instance.
|
// defined location for this instance.
|
||||||
func (b *LocalBackup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
func (b *LocalBackup) Generate(ctx context.Context, basePath, ignore string) (*ArchiveDetails, error) {
|
||||||
a := &filesystem.Archive{
|
a := &filesystem.Archive{
|
||||||
BasePath: basePath,
|
BasePath: basePath,
|
||||||
Ignore: ignore,
|
Ignore: ignore,
|
||||||
}
|
}
|
||||||
|
|
||||||
b.log().Info("creating backup for server...")
|
b.log().WithField("path", b.Path()).Info("creating backup for server")
|
||||||
if err := a.Create(b.Path()); err != nil {
|
if err := a.Create(b.Path()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
b.log().Info("created backup successfully")
|
b.log().Info("created backup successfully")
|
||||||
|
|
||||||
return b.Details(), nil
|
ad, err := b.Details(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WrapIf(err, "backup: failed to get archive details for local backup")
|
||||||
|
}
|
||||||
|
return ad, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore will walk over the archive and call the callback function for each
|
// Restore will walk over the archive and call the callback function for each
|
||||||
// file encountered.
|
// file encountered.
|
||||||
func (b *LocalBackup) Restore(_ io.Reader, callback RestoreCallback) error {
|
func (b *LocalBackup) Restore(ctx context.Context, _ io.Reader, callback RestoreCallback) error {
|
||||||
return archiver.Walk(b.Path(), func(f archiver.File) error {
|
return archiver.Walk(b.Path(), func(f archiver.File) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Stop walking if the context is canceled.
|
||||||
|
return archiver.ErrStopWalk
|
||||||
|
default:
|
||||||
|
{
|
||||||
if f.IsDir() {
|
if f.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return callback(filesystem.ExtractNameFromArchive(f), f)
|
return callback(filesystem.ExtractNameFromArchive(f), f)
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,15 @@ import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/pterodactyl/wings/server/filesystem"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"emperror.dev/errors"
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
|
"github.com/pterodactyl/wings/server/filesystem"
|
||||||
|
|
||||||
"github.com/juju/ratelimit"
|
"github.com/juju/ratelimit"
|
||||||
"github.com/pterodactyl/wings/config"
|
"github.com/pterodactyl/wings/config"
|
||||||
|
@ -45,7 +49,7 @@ func (s *S3Backup) WithLogContext(c map[string]interface{}) {
|
||||||
|
|
||||||
// Generate creates a new backup on the disk, moves it into the S3 bucket via
|
// Generate creates a new backup on the disk, moves it into the S3 bucket via
|
||||||
// the provided presigned URL, and then deletes the backup from the disk.
|
// the provided presigned URL, and then deletes the backup from the disk.
|
||||||
func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
func (s *S3Backup) Generate(ctx context.Context, basePath, ignore string) (*ArchiveDetails, error) {
|
||||||
defer s.Remove()
|
defer s.Remove()
|
||||||
|
|
||||||
a := &filesystem.Archive{
|
a := &filesystem.Archive{
|
||||||
|
@ -53,7 +57,7 @@ func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
||||||
Ignore: ignore,
|
Ignore: ignore,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log().Info("creating backup for server...")
|
s.log().WithField("path", s.Path()).Info("creating backup for server")
|
||||||
if err := a.Create(s.Path()); err != nil {
|
if err := a.Create(s.Path()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -61,29 +65,65 @@ func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
||||||
|
|
||||||
rc, err := os.Open(s.Path())
|
rc, err := os.Open(s.Path())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, "backup: could not read archive from disk")
|
||||||
}
|
}
|
||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
if err := s.generateRemoteRequest(rc); err != nil {
|
if err := s.generateRemoteRequest(ctx, rc); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ad, err := s.Details(ctx)
|
||||||
return s.Details(), nil
|
if err != nil {
|
||||||
|
return nil, errors.WrapIf(err, "backup: failed to get archive details after upload")
|
||||||
|
}
|
||||||
|
return ad, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reader provides a wrapper around an existing io.Reader
|
// Restore will read from the provided reader assuming that it is a gzipped
|
||||||
// but implements io.Closer in order to satisfy an io.ReadCloser.
|
// tar reader. When a file is encountered in the archive the callback function
|
||||||
type Reader struct {
|
// will be triggered. If the callback returns an error the entire process is
|
||||||
io.Reader
|
// stopped, otherwise this function will run until all files have been written.
|
||||||
}
|
//
|
||||||
|
// This restoration uses a workerpool to use up to the number of CPUs available
|
||||||
func (Reader) Close() error {
|
// on the machine when writing files to the disk.
|
||||||
|
func (s *S3Backup) Restore(ctx context.Context, r io.Reader, callback RestoreCallback) error {
|
||||||
|
reader := r
|
||||||
|
// Steal the logic we use for making backups which will be applied when restoring
|
||||||
|
// this specific backup. This allows us to prevent overloading the disk unintentionally.
|
||||||
|
if writeLimit := int64(config.Get().System.Backups.WriteLimit * 1024 * 1024); writeLimit > 0 {
|
||||||
|
reader = ratelimit.Reader(r, ratelimit.NewBucketWithRate(float64(writeLimit), writeLimit))
|
||||||
|
}
|
||||||
|
gr, err := gzip.NewReader(reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer gr.Close()
|
||||||
|
tr := tar.NewReader(gr)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
// Do nothing, fall through to the next block of code in this loop.
|
||||||
|
}
|
||||||
|
header, err := tr.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if header.Typeflag == tar.TypeReg {
|
||||||
|
if err := callback(header.Name, tr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generates the remote S3 request and begins the upload.
|
// Generates the remote S3 request and begins the upload.
|
||||||
func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
|
func (s *S3Backup) generateRemoteRequest(ctx context.Context, rc io.ReadCloser) error {
|
||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
s.log().Debug("attempting to get size of backup...")
|
s.log().Debug("attempting to get size of backup...")
|
||||||
|
@ -101,37 +141,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
|
||||||
s.log().Debug("got S3 upload urls from the Panel")
|
s.log().Debug("got S3 upload urls from the Panel")
|
||||||
s.log().WithField("parts", len(urls.Parts)).Info("attempting to upload backup to s3 endpoint...")
|
s.log().WithField("parts", len(urls.Parts)).Info("attempting to upload backup to s3 endpoint...")
|
||||||
|
|
||||||
handlePart := func(part string, size int64) (string, error) {
|
uploader := newS3FileUploader(rc)
|
||||||
r, err := http.NewRequest(http.MethodPut, part, nil)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
r.ContentLength = size
|
|
||||||
r.Header.Add("Content-Length", strconv.Itoa(int(size)))
|
|
||||||
r.Header.Add("Content-Type", "application/x-gzip")
|
|
||||||
|
|
||||||
// Limit the reader to the size of the part.
|
|
||||||
r.Body = Reader{Reader: io.LimitReader(rc, size)}
|
|
||||||
|
|
||||||
// This http request can block forever due to it not having a timeout,
|
|
||||||
// but we are uploading up to 5GB of data, so there is not really
|
|
||||||
// a good way to handle a timeout on this.
|
|
||||||
res, err := http.DefaultClient.Do(r)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
// Handle non-200 status codes.
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request.
|
|
||||||
return res.Header.Get("ETag"), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, part := range urls.Parts {
|
for i, part := range urls.Parts {
|
||||||
// Get the size for the current part.
|
// Get the size for the current part.
|
||||||
var partSize int64
|
var partSize int64
|
||||||
|
@ -144,7 +154,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to upload the part.
|
// Attempt to upload the part.
|
||||||
if _, err := handlePart(part, partSize); err != nil {
|
if _, err := uploader.uploadPart(ctx, part, partSize); err != nil {
|
||||||
s.log().WithField("part_id", i+1).WithError(err).Warn("failed to upload part")
|
s.log().WithField("part_id", i+1).WithError(err).Warn("failed to upload part")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -157,39 +167,97 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore will read from the provided reader assuming that it is a gzipped
|
type s3FileUploader struct {
|
||||||
// tar reader. When a file is encountered in the archive the callback function
|
io.ReadCloser
|
||||||
// will be triggered. If the callback returns an error the entire process is
|
client *http.Client
|
||||||
// stopped, otherwise this function will run until all files have been written.
|
}
|
||||||
|
|
||||||
|
// newS3FileUploader returns a new file uploader instance.
|
||||||
|
func newS3FileUploader(file io.ReadCloser) *s3FileUploader {
|
||||||
|
return &s3FileUploader{
|
||||||
|
ReadCloser: file,
|
||||||
|
// We purposefully use a super high timeout on this request since we need to upload
|
||||||
|
// a 5GB file. This assumes at worst a 10Mbps connection for uploading. While technically
|
||||||
|
// you could go slower we're targeting mostly hosted servers that should have 100Mbps
|
||||||
|
// connections anyways.
|
||||||
|
client: &http.Client{Timeout: time.Hour * 2},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// backoff returns a new expoential backoff implementation using a context that
|
||||||
|
// will also stop the backoff if it is canceled.
|
||||||
|
func (fu *s3FileUploader) backoff(ctx context.Context) backoff.BackOffContext {
|
||||||
|
b := backoff.NewExponentialBackOff()
|
||||||
|
b.Multiplier = 2
|
||||||
|
b.MaxElapsedTime = time.Minute
|
||||||
|
|
||||||
|
return backoff.WithContext(b, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadPart attempts to upload a given S3 file part to the S3 system. If a
|
||||||
|
// 5xx error is returned from the endpoint this will continue with an exponential
|
||||||
|
// backoff to try and successfully upload the part.
|
||||||
//
|
//
|
||||||
// This restoration uses a workerpool to use up to the number of CPUs available
|
// Once uploaded the ETag is returned to the caller.
|
||||||
// on the machine when writing files to the disk.
|
func (fu *s3FileUploader) uploadPart(ctx context.Context, part string, size int64) (string, error) {
|
||||||
func (s *S3Backup) Restore(r io.Reader, callback RestoreCallback) error {
|
r, err := http.NewRequestWithContext(ctx, http.MethodPut, part, nil)
|
||||||
reader := r
|
|
||||||
// Steal the logic we use for making backups which will be applied when restoring
|
|
||||||
// this specific backup. This allows us to prevent overloading the disk unintentionally.
|
|
||||||
if writeLimit := int64(config.Get().System.Backups.WriteLimit * 1024 * 1024); writeLimit > 0 {
|
|
||||||
reader = ratelimit.Reader(r, ratelimit.NewBucketWithRate(float64(writeLimit), writeLimit))
|
|
||||||
}
|
|
||||||
gr, err := gzip.NewReader(reader)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", errors.Wrap(err, "backup: could not create request for S3")
|
||||||
}
|
}
|
||||||
defer gr.Close()
|
|
||||||
tr := tar.NewReader(gr)
|
r.ContentLength = size
|
||||||
for {
|
r.Header.Add("Content-Length", strconv.Itoa(int(size)))
|
||||||
header, err := tr.Next()
|
r.Header.Add("Content-Type", "application/x-gzip")
|
||||||
|
|
||||||
|
// Limit the reader to the size of the part.
|
||||||
|
r.Body = Reader{Reader: io.LimitReader(fu.ReadCloser, size)}
|
||||||
|
|
||||||
|
var etag string
|
||||||
|
err = backoff.Retry(func() error {
|
||||||
|
res, err := fu.client.Do(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||||
break
|
return backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
// Don't use a permanent error here, if there is a temporary resolution error with
|
||||||
|
// the URL due to DNS issues we want to keep re-trying.
|
||||||
|
return errors.Wrap(err, "backup: S3 HTTP request failed")
|
||||||
|
}
|
||||||
|
_ = res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
err := errors.New(fmt.Sprintf("backup: failed to put S3 object: [HTTP/%d] %s", res.StatusCode, res.Status))
|
||||||
|
// Only attempt a backoff retry if this error is because of a 5xx error from
|
||||||
|
// the S3 endpoint. Any 4xx error should be treated as an error that a retry
|
||||||
|
// would not fix.
|
||||||
|
if res.StatusCode >= http.StatusInternalServerError {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if header.Typeflag == tar.TypeReg {
|
return backoff.Permanent(err)
|
||||||
if err := callback(header.Name, tr); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the ETag from the uploaded part, this should be sent with the
|
||||||
|
// CompleteMultipartUpload request.
|
||||||
|
etag = res.Header.Get("ETag")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, fu.backoff(ctx))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if v, ok := err.(*backoff.PermanentError); ok {
|
||||||
|
return "", v.Unwrap()
|
||||||
}
|
}
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
return etag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reader provides a wrapper around an existing io.Reader
|
||||||
|
// but implements io.Closer in order to satisfy an io.ReadCloser.
|
||||||
|
type Reader struct {
|
||||||
|
io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (Reader) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user