backups(s3): send uploaded parts in complete request

This commit is contained in:
Matthew Penner 2022-09-26 11:14:57 -06:00
parent 02cbf2df5b
commit 7245791214
No known key found for this signature in database
GPG Key ID: 31311906AD4CF6D6
5 changed files with 44 additions and 29 deletions

View File

@ -3,10 +3,11 @@ package remote
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/pterodactyl/wings/internal/models"
"strconv" "strconv"
"sync" "sync"
"github.com/pterodactyl/wings/internal/models"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"

View File

@ -2,11 +2,12 @@ package remote
import ( import (
"bytes" "bytes"
"github.com/apex/log"
"github.com/goccy/go-json"
"regexp" "regexp"
"strings" "strings"
"github.com/apex/log"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/parser" "github.com/pterodactyl/wings/parser"
) )
@ -156,9 +157,15 @@ type BackupRemoteUploadResponse struct {
PartSize int64 `json:"part_size"` PartSize int64 `json:"part_size"`
} }
type BackupPart struct {
ETag string `json:"etag"`
PartNumber int `json:"part_number"`
}
type BackupRequest struct { type BackupRequest struct {
Checksum string `json:"checksum"` Checksum string `json:"checksum"`
ChecksumType string `json:"checksum_type"` ChecksumType string `json:"checksum_type"`
Size int64 `json:"size"` Size int64 `json:"size"`
Successful bool `json:"successful"` Successful bool `json:"successful"`
Parts []BackupPart `json:"parts"`
} }

View File

@ -32,7 +32,7 @@ type RestoreCallback func(file string, r io.Reader, mode fs.FileMode, atime, mti
// noinspection GoNameStartsWithPackageName // noinspection GoNameStartsWithPackageName
type BackupInterface interface { type BackupInterface interface {
// SetClient sets the API request client on the backup interface. // SetClient sets the API request client on the backup interface.
SetClient(c remote.Client) SetClient(remote.Client)
// Identifier returns the UUID of this backup as tracked by the panel // Identifier returns the UUID of this backup as tracked by the panel
// instance. // instance.
Identifier() string Identifier() string
@ -41,7 +41,7 @@ type BackupInterface interface {
WithLogContext(map[string]interface{}) WithLogContext(map[string]interface{})
// Generate creates a backup in whatever the configured source for the // Generate creates a backup in whatever the configured source for the
// specific implementation is. // specific implementation is.
Generate(ctx context.Context, basePath string, ignore string) (*ArchiveDetails, error) Generate(context.Context, string, string) (*ArchiveDetails, error)
// Ignored returns the ignored files for this backup instance. // Ignored returns the ignored files for this backup instance.
Ignored() string Ignored() string
// Checksum returns a SHA1 checksum for the generated backup. // Checksum returns a SHA1 checksum for the generated backup.
@ -53,13 +53,13 @@ type BackupInterface interface {
// to store it until it is moved to the final spot. // to store it until it is moved to the final spot.
Path() string Path() string
// Details returns details about the archive. // Details returns details about the archive.
Details(ctx context.Context) (*ArchiveDetails, error) Details(context.Context, []remote.BackupPart) (*ArchiveDetails, error)
// Remove removes a backup file. // Remove removes a backup file.
Remove() error Remove() error
// Restore is called when a backup is ready to be restored to the disk from // Restore is called when a backup is ready to be restored to the disk from
// the given source. Not every backup implementation will support this nor // the given source. Not every backup implementation will support this nor
// will every implementation require a reader be provided. // will every implementation require a reader be provided.
Restore(ctx context.Context, reader io.Reader, callback RestoreCallback) error Restore(context.Context, io.Reader, RestoreCallback) error
} }
type Backup struct { type Backup struct {
@ -119,8 +119,8 @@ func (b *Backup) Checksum() ([]byte, error) {
// Details returns both the checksum and size of the archive currently stored on // Details returns both the checksum and size of the archive currently stored on
// the disk to the caller. // the disk to the caller.
func (b *Backup) Details(ctx context.Context) (*ArchiveDetails, error) { func (b *Backup) Details(ctx context.Context, parts []remote.BackupPart) (*ArchiveDetails, error) {
ad := ArchiveDetails{ChecksumType: "sha1"} ad := ArchiveDetails{ChecksumType: "sha1", Parts: parts}
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { g.Go(func() error {
@ -165,6 +165,7 @@ type ArchiveDetails struct {
Checksum string `json:"checksum"` Checksum string `json:"checksum"`
ChecksumType string `json:"checksum_type"` ChecksumType string `json:"checksum_type"`
Size int64 `json:"size"` Size int64 `json:"size"`
Parts []remote.BackupPart `json:"parts"`
} }
// ToRequest returns a request object. // ToRequest returns a request object.
@ -174,5 +175,6 @@ func (ad *ArchiveDetails) ToRequest(successful bool) remote.BackupRequest {
ChecksumType: ad.ChecksumType, ChecksumType: ad.ChecksumType,
Size: ad.Size, Size: ad.Size,
Successful: successful, Successful: successful,
Parts: ad.Parts,
} }
} }

View File

@ -69,7 +69,7 @@ func (b *LocalBackup) Generate(ctx context.Context, basePath, ignore string) (*A
} }
b.log().Info("created backup successfully") b.log().Info("created backup successfully")
ad, err := b.Details(ctx) ad, err := b.Details(ctx, nil)
if err != nil { if err != nil {
return nil, errors.WrapIf(err, "backup: failed to get archive details for local backup") return nil, errors.WrapIf(err, "backup: failed to get archive details for local backup")
} }

View File

@ -71,10 +71,11 @@ func (s *S3Backup) Generate(ctx context.Context, basePath, ignore string) (*Arch
} }
defer rc.Close() defer rc.Close()
if err := s.generateRemoteRequest(ctx, rc); err != nil { parts, err := s.generateRemoteRequest(ctx, rc)
if err != nil {
return nil, err return nil, err
} }
ad, err := s.Details(ctx) ad, err := s.Details(ctx, parts)
if err != nil { if err != nil {
return nil, errors.WrapIf(err, "backup: failed to get archive details after upload") return nil, errors.WrapIf(err, "backup: failed to get archive details after upload")
} }
@ -125,20 +126,20 @@ func (s *S3Backup) Restore(ctx context.Context, r io.Reader, callback RestoreCal
} }
// Generates the remote S3 request and begins the upload. // Generates the remote S3 request and begins the upload.
func (s *S3Backup) generateRemoteRequest(ctx context.Context, rc io.ReadCloser) error { func (s *S3Backup) generateRemoteRequest(ctx context.Context, rc io.ReadCloser) ([]remote.BackupPart, error) {
defer rc.Close() defer rc.Close()
s.log().Debug("attempting to get size of backup...") s.log().Debug("attempting to get size of backup...")
size, err := s.Backup.Size() size, err := s.Backup.Size()
if err != nil { if err != nil {
return err return nil, err
} }
s.log().WithField("size", size).Debug("got size of backup") s.log().WithField("size", size).Debug("got size of backup")
s.log().Debug("attempting to get S3 upload urls from Panel...") s.log().Debug("attempting to get S3 upload urls from Panel...")
urls, err := s.client.GetBackupRemoteUploadURLs(context.Background(), s.Backup.Uuid, size) urls, err := s.client.GetBackupRemoteUploadURLs(context.Background(), s.Backup.Uuid, size)
if err != nil { if err != nil {
return err return nil, err
} }
s.log().Debug("got S3 upload urls from the Panel") s.log().Debug("got S3 upload urls from the Panel")
s.log().WithField("parts", len(urls.Parts)).Info("attempting to upload backup to s3 endpoint...") s.log().WithField("parts", len(urls.Parts)).Info("attempting to upload backup to s3 endpoint...")
@ -156,22 +157,26 @@ func (s *S3Backup) generateRemoteRequest(ctx context.Context, rc io.ReadCloser)
} }
// Attempt to upload the part. // Attempt to upload the part.
if _, err := uploader.uploadPart(ctx, part, partSize); err != nil { etag, err := uploader.uploadPart(ctx, part, partSize)
if err != nil {
s.log().WithField("part_id", i+1).WithError(err).Warn("failed to upload part") s.log().WithField("part_id", i+1).WithError(err).Warn("failed to upload part")
return err return nil, err
} }
uploader.uploadedParts = append(uploader.uploadedParts, remote.BackupPart{
ETag: etag,
PartNumber: i + 1,
})
s.log().WithField("part_id", i+1).Info("successfully uploaded backup part") s.log().WithField("part_id", i+1).Info("successfully uploaded backup part")
} }
s.log().WithField("parts", len(urls.Parts)).Info("backup has been successfully uploaded") s.log().WithField("parts", len(urls.Parts)).Info("backup has been successfully uploaded")
return nil return uploader.uploadedParts, nil
} }
type s3FileUploader struct { type s3FileUploader struct {
io.ReadCloser io.ReadCloser
client *http.Client client *http.Client
uploadedParts []remote.BackupPart
} }
// newS3FileUploader returns a new file uploader instance. // newS3FileUploader returns a new file uploader instance.