Merge pull request #67 from pterodactyl/issue/2599

Switch to s3 multipart uploads for backups
This commit is contained in:
Dane Everitt 2020-11-01 14:34:27 -08:00 committed by GitHub
commit 317e54acc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 43 deletions

View File

@ -185,7 +185,7 @@ func (r *Response) Error() error {
var bag RequestErrorBag
_ = r.Bind(&bag)
e := new(RequestError)
e := &RequestError{}
if len(bag.Errors) > 0 {
e = &bag.Errors[0]
}

View File

@ -3,8 +3,35 @@ package api
import (
"fmt"
"github.com/pkg/errors"
"strconv"
)
type BackupRemoteUploadResponse struct {
CompleteMultipartUpload string `json:"complete_multipart_upload"`
AbortMultipartUpload string `json:"abort_multipart_upload"`
Parts []string `json:"parts"`
PartSize int64 `json:"part_size"`
}
func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) {
resp, err := r.Get(fmt.Sprintf("/backups/%s", backup), Q{"size": strconv.FormatInt(size, 10)})
if err != nil {
return nil, errors.WithStack(err)
}
defer resp.Body.Close()
if resp.HasError() {
return nil, resp.Error()
}
var res BackupRemoteUploadResponse
if err := resp.Bind(&res); err != nil {
return nil, errors.WithStack(err)
}
return &res, nil
}
type BackupRequest struct {
Checksum string `json:"checksum"`
ChecksumType string `json:"checksum_type"`

View File

@ -9,7 +9,6 @@ type Request struct {
Adapter string `json:"adapter"`
Uuid string `json:"uuid"`
IgnoredFiles []string `json:"ignored_files"`
PresignedUrl string `json:"presigned_url"`
}
// Generates a new local backup struct.
@ -32,15 +31,10 @@ func (r *Request) NewS3Backup() (*S3Backup, error) {
return nil, errors.New(fmt.Sprintf("cannot create s3 backup using [%s] adapter", r.Adapter))
}
if len(r.PresignedUrl) == 0 {
return nil, errors.New("a valid presigned S3 upload URL must be provided to use the [s3] adapter")
}
return &S3Backup{
Backup: Backup{
Uuid: r.Uuid,
IgnoredFiles: r.IgnoredFiles,
},
PresignedUrl: r.PresignedUrl,
}, nil
}

View File

@ -1,24 +1,21 @@
package backup
import (
"bytes"
"context"
"fmt"
"github.com/apex/log"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"io"
"net/http"
"os"
"strconv"
"time"
)
type S3Backup struct {
Backup
// The pre-signed upload endpoint for the generated backup. This must be
// provided otherwise this request will fail. This allows us to keep all
// of the keys off the daemon instances and the panel can handle generating
// the credentials for us.
PresignedUrl string
}
var _ BackupInterface = (*S3Backup)(nil)
@ -43,14 +40,8 @@ func (s *S3Backup) Generate(included *IncludedFiles, prefix string) (*ArchiveDet
}
defer rc.Close()
if resp, err := s.generateRemoteRequest(rc); err != nil {
if err := s.generateRemoteRequest(rc); err != nil {
return nil, errors.WithStack(err)
} else {
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to put S3 object, %d:%s", resp.StatusCode, resp.Status)
}
}
return s.Details(), err
@ -61,27 +52,137 @@ func (s *S3Backup) Remove() error {
return os.Remove(s.Path())
}
// Reader provides a wrapper around an existing io.Reader
// but implements io.Closer in order to satisfy an io.ReadCloser.
type Reader struct {
io.Reader
}
func (Reader) Close() error {
return nil
}
// Generates the remote S3 request and begins the upload.
func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) (*http.Response, error) {
r, err := http.NewRequest(http.MethodPut, s.PresignedUrl, nil)
func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
defer rc.Close()
size, err := s.Backup.Size()
if err != nil {
return nil, err
return err
}
if sz, err := s.Size(); err != nil {
return nil, err
} else {
r.ContentLength = sz
r.Header.Add("Content-Length", strconv.Itoa(int(sz)))
urls, err := api.New().GetBackupRemoteUploadURLs(s.Backup.Uuid, size)
if err != nil {
return err
}
log.Debug("attempting to upload backup to remote S3 endpoint")
handlePart := func(part string, size int64) (string, error) {
r, err := http.NewRequest(http.MethodPut, part, nil)
if err != nil {
return "", err
}
r.ContentLength = size
r.Header.Add("Content-Length", strconv.Itoa(int(size)))
r.Header.Add("Content-Type", "application/x-gzip")
// Limit the reader to the size of the part.
r.Body = Reader{io.LimitReader(rc, size)}
// This http request can block forever due to it not having a timeout,
// but we are uploading up to 5GB of data, so there is not really
// a good way to handle a timeout on this.
res, err := http.DefaultClient.Do(r)
if err != nil {
return "", err
}
defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status)
}
r.Body = rc
log.WithFields(log.Fields{
"endpoint": s.PresignedUrl,
"headers": r.Header,
}).Debug("uploading backup to remote S3 endpoint")
return http.DefaultClient.Do(r)
// Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request.
return res.Header.Get("ETag"), nil
}
// Start assembling the body that will be sent as apart of the CompleteMultipartUpload request.
var completeUploadBody bytes.Buffer
completeUploadBody.WriteString("<CompleteMultipartUpload>\n")
partCount := len(urls.Parts)
for i, part := range urls.Parts {
// Get the size for the current part.
var partSize int64
if i+1 < partCount {
partSize = urls.PartSize
} else {
// This is the remaining size for the last part,
// there is not a minimum size limit for the last part.
partSize = size - (int64(i) * urls.PartSize)
}
// Attempt to upload the part.
etag, err := handlePart(part, partSize)
if err != nil {
log.WithError(err).Warn("failed to upload part")
// Send an AbortMultipartUpload request.
if err := s.finishUpload(urls.AbortMultipartUpload, nil); err != nil {
log.WithError(err).Warn("failed to abort multipart backup upload")
}
return err
}
// Add the part to the CompleteMultipartUpload body.
completeUploadBody.WriteString("\t<Part>\n")
completeUploadBody.WriteString("\t\t<ETag>\"" + etag + "\"</ETag>\n")
completeUploadBody.WriteString("\t\t<PartNumber>" + strconv.Itoa(i+1) + "</PartNumber>\n")
completeUploadBody.WriteString("\t</Part>\n")
}
completeUploadBody.WriteString("</CompleteMultipartUpload>")
// Send a CompleteMultipartUpload request.
if err := s.finishUpload(urls.CompleteMultipartUpload, &completeUploadBody); err != nil {
return err
}
return nil
}
// finishUpload sends a requests to the specified url to either complete or abort the upload.
func (s *S3Backup) finishUpload(url string, body io.Reader) error {
r, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
return err
}
// Create a new http client with a 10 second timeout.
c := &http.Client{
Timeout: 10 * time.Second,
}
res, err := c.Do(r)
if err != nil {
return err
}
defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != http.StatusOK {
// If no body was sent, we were aborting the upload.
if body == nil {
return fmt.Errorf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
// If a body was sent we were completing the upload.
// TODO: Attempt to send abort request?
return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
return nil
}