From 1239b1c0ca235f2c2ad5447c7b761bf24eec2cbf Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Sat, 31 Oct 2020 17:47:41 -0600 Subject: [PATCH 1/2] Switch to s3 multipart uploads for backups --- api/api.go | 2 +- api/backup_endpoints.go | 27 ++++++ server/backup/backup_request.go | 6 -- server/backup/backup_s3.go | 158 ++++++++++++++++++++++++-------- 4 files changed, 150 insertions(+), 43 deletions(-) diff --git a/api/api.go b/api/api.go index 4067f8d..4adb589 100644 --- a/api/api.go +++ b/api/api.go @@ -185,7 +185,7 @@ func (r *Response) Error() error { var bag RequestErrorBag _ = r.Bind(&bag) - e := new(RequestError) + e := &RequestError{} if len(bag.Errors) > 0 { e = &bag.Errors[0] } diff --git a/api/backup_endpoints.go b/api/backup_endpoints.go index 9d30404..54cb7f0 100644 --- a/api/backup_endpoints.go +++ b/api/backup_endpoints.go @@ -3,8 +3,35 @@ package api import ( "fmt" "github.com/pkg/errors" + "strconv" ) +type BackupRemoteUploadResponse struct { + CompleteMultipartUpload string + AbortMultipartUpload string + Parts []string + PartSize int64 +} + +func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) { + resp, err := r.Get(fmt.Sprintf("/backups/%s", backup), Q{"size": strconv.FormatInt(size, 10)}) + if err != nil { + return nil, errors.WithStack(err) + } + defer resp.Body.Close() + + if resp.HasError() { + return nil, resp.Error() + } + + var res BackupRemoteUploadResponse + if err := resp.Bind(&res); err != nil { + return nil, errors.WithStack(err) + } + + return &res, nil +} + type BackupRequest struct { Checksum string `json:"checksum"` ChecksumType string `json:"checksum_type"` diff --git a/server/backup/backup_request.go b/server/backup/backup_request.go index 29b2e6e..a240596 100644 --- a/server/backup/backup_request.go +++ b/server/backup/backup_request.go @@ -9,7 +9,6 @@ type Request struct { Adapter string `json:"adapter"` Uuid string `json:"uuid"` IgnoredFiles []string `json:"ignored_files"` - PresignedUrl string `json:"presigned_url"` } // Generates a new local backup struct. @@ -32,15 +31,10 @@ func (r *Request) NewS3Backup() (*S3Backup, error) { return nil, errors.New(fmt.Sprintf("cannot create s3 backup using [%s] adapter", r.Adapter)) } - if len(r.PresignedUrl) == 0 { - return nil, errors.New("a valid presigned S3 upload URL must be provided to use the [s3] adapter") - } - return &S3Backup{ Backup: Backup{ Uuid: r.Uuid, IgnoredFiles: r.IgnoredFiles, }, - PresignedUrl: r.PresignedUrl, }, nil } diff --git a/server/backup/backup_s3.go b/server/backup/backup_s3.go index 55bd904..ce83679 100644 --- a/server/backup/backup_s3.go +++ b/server/backup/backup_s3.go @@ -1,10 +1,12 @@ package backup import ( + "bytes" "context" "fmt" "github.com/apex/log" "github.com/pkg/errors" + "github.com/pterodactyl/wings/api" "io" "net/http" "os" @@ -13,12 +15,6 @@ import ( type S3Backup struct { Backup - - // The pre-signed upload endpoint for the generated backup. This must be - // provided otherwise this request will fail. This allows us to keep all - // of the keys off the daemon instances and the panel can handle generating - // the credentials for us. - PresignedUrl string } var _ BackupInterface = (*S3Backup)(nil) @@ -43,14 +39,8 @@ func (s *S3Backup) Generate(included *IncludedFiles, prefix string) (*ArchiveDet } defer rc.Close() - if resp, err := s.generateRemoteRequest(rc); err != nil { + if err := s.generateRemoteRequest(rc); err != nil { return nil, errors.WithStack(err) - } else { - resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("failed to put S3 object, %d:%s", resp.StatusCode, resp.Status) - } } return s.Details(), err @@ -61,27 +51,123 @@ func (s *S3Backup) Remove() error { return os.Remove(s.Path()) } -// Generates the remote S3 request and begins the upload. -func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) (*http.Response, error) { - r, err := http.NewRequest(http.MethodPut, s.PresignedUrl, nil) - if err != nil { - return nil, err - } - - if sz, err := s.Size(); err != nil { - return nil, err - } else { - r.ContentLength = sz - r.Header.Add("Content-Length", strconv.Itoa(int(sz))) - r.Header.Add("Content-Type", "application/x-gzip") - } - - r.Body = rc - - log.WithFields(log.Fields{ - "endpoint": s.PresignedUrl, - "headers": r.Header, - }).Debug("uploading backup to remote S3 endpoint") - - return http.DefaultClient.Do(r) +// Reader provides a wrapper around an existing io.Reader +// but implements io.Closer in order to satisfy an io.ReadCloser. +type Reader struct { + io.Reader +} + +func (Reader) Close() error { + return nil +} + +// Generates the remote S3 request and begins the upload. +func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { + defer rc.Close() + + size, err := s.Backup.Size() + if err != nil { + return err + } + + urls, err := api.New().GetBackupRemoteUploadURLs(s.Backup.Uuid, size) + if err != nil { + return err + } + + log.Debug("attempting to upload backup to remote S3 endpoint") + + handlePart := func(part string, size int64) (string, error) { + r, err := http.NewRequest(http.MethodPut, part, nil) + if err != nil { + return "", err + } + + r.ContentLength = size + r.Header.Add("Content-Length", strconv.Itoa(int(size))) + r.Header.Add("Content-Type", "application/x-gzip") + + r.Body = Reader{io.LimitReader(rc, size)} + + res, err := http.DefaultClient.Do(r) + if err != nil { + return "", err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status) + } + + return res.Header.Get("ETag"), nil + } + + // Keep track of errors from individual part uploads. + hasError := true + defer func() { + if !hasError { + return + } + + r, err := http.NewRequest(http.MethodPost, urls.AbortMultipartUpload, nil) + if err != nil { + log.WithError(err).Warn("failed to create http request (AbortMultipartUpload)") + return + } + + res, err := http.DefaultClient.Do(r) + if err != nil { + log.WithError(err).Warn("failed to make http request (AbortMultipartUpload)") + return + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + log.Warnf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status) + } + }() + + var completeBody bytes.Buffer + completeBody.WriteString("\n") + + partCount := len(urls.Parts) + for i, part := range urls.Parts { + var s int64 + if i+1 < partCount { + s = urls.PartSize + } else { + s = size - (int64(i) * urls.PartSize) + } + + etag, err := handlePart(part, s) + if err != nil { + return err + } + + completeBody.WriteString("\t\n") + completeBody.WriteString("\t\t\"" + etag + "\"\n") + completeBody.WriteString("\t\t" + strconv.Itoa(i+1) + "\n") + completeBody.WriteString("\t\n") + } + hasError = false + + completeBody.WriteString("") + + r, err := http.NewRequest(http.MethodPost, urls.CompleteMultipartUpload, &completeBody) + if err != nil { + return err + } + + res, err := http.DefaultClient.Do(r) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status) + } + + return nil } From 5475cb02c14d51575bccccb40b2642961ad3b4bc Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Sun, 1 Nov 2020 10:30:25 -0700 Subject: [PATCH 2/2] Cleanup s3 backup uploads, add request timeouts --- api/backup_endpoints.go | 8 ++-- server/backup/backup_s3.go | 95 ++++++++++++++++++++++---------------- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/api/backup_endpoints.go b/api/backup_endpoints.go index 54cb7f0..6cd0617 100644 --- a/api/backup_endpoints.go +++ b/api/backup_endpoints.go @@ -7,10 +7,10 @@ import ( ) type BackupRemoteUploadResponse struct { - CompleteMultipartUpload string - AbortMultipartUpload string - Parts []string - PartSize int64 + CompleteMultipartUpload string `json:"complete_multipart_upload"` + AbortMultipartUpload string `json:"abort_multipart_upload"` + Parts []string `json:"parts"` + PartSize int64 `json:"part_size"` } func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) { diff --git a/server/backup/backup_s3.go b/server/backup/backup_s3.go index ce83679..9a4a6de 100644 --- a/server/backup/backup_s3.go +++ b/server/backup/backup_s3.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "strconv" + "time" ) type S3Backup struct { @@ -87,85 +88,99 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { r.Header.Add("Content-Length", strconv.Itoa(int(size))) r.Header.Add("Content-Type", "application/x-gzip") + // Limit the reader to the size of the part. r.Body = Reader{io.LimitReader(rc, size)} + // This http request can block forever due to it not having a timeout, + // but we are uploading up to 5GB of data, so there is not really + // a good way to handle a timeout on this. res, err := http.DefaultClient.Do(r) if err != nil { return "", err } - defer res.Body.Close() + // Handle non-200 status codes. if res.StatusCode != http.StatusOK { return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status) } + // Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request. return res.Header.Get("ETag"), nil } - // Keep track of errors from individual part uploads. - hasError := true - defer func() { - if !hasError { - return - } - - r, err := http.NewRequest(http.MethodPost, urls.AbortMultipartUpload, nil) - if err != nil { - log.WithError(err).Warn("failed to create http request (AbortMultipartUpload)") - return - } - - res, err := http.DefaultClient.Do(r) - if err != nil { - log.WithError(err).Warn("failed to make http request (AbortMultipartUpload)") - return - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - log.Warnf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status) - } - }() - - var completeBody bytes.Buffer - completeBody.WriteString("\n") + // Start assembling the body that will be sent as apart of the CompleteMultipartUpload request. + var completeUploadBody bytes.Buffer + completeUploadBody.WriteString("\n") partCount := len(urls.Parts) for i, part := range urls.Parts { - var s int64 + // Get the size for the current part. + var partSize int64 if i+1 < partCount { - s = urls.PartSize + partSize = urls.PartSize } else { - s = size - (int64(i) * urls.PartSize) + // This is the remaining size for the last part, + // there is not a minimum size limit for the last part. + partSize = size - (int64(i) * urls.PartSize) } - etag, err := handlePart(part, s) + // Attempt to upload the part. + etag, err := handlePart(part, partSize) if err != nil { + log.WithError(err).Warn("failed to upload part") + + // Send an AbortMultipartUpload request. + if err := s.finishUpload(urls.AbortMultipartUpload, nil); err != nil { + log.WithError(err).Warn("failed to abort multipart backup upload") + } + return err } - completeBody.WriteString("\t\n") - completeBody.WriteString("\t\t\"" + etag + "\"\n") - completeBody.WriteString("\t\t" + strconv.Itoa(i+1) + "\n") - completeBody.WriteString("\t\n") + // Add the part to the CompleteMultipartUpload body. + completeUploadBody.WriteString("\t\n") + completeUploadBody.WriteString("\t\t\"" + etag + "\"\n") + completeUploadBody.WriteString("\t\t" + strconv.Itoa(i+1) + "\n") + completeUploadBody.WriteString("\t\n") } - hasError = false + completeUploadBody.WriteString("") - completeBody.WriteString("") + // Send a CompleteMultipartUpload request. + if err := s.finishUpload(urls.CompleteMultipartUpload, &completeUploadBody); err != nil { + return err + } - r, err := http.NewRequest(http.MethodPost, urls.CompleteMultipartUpload, &completeBody) + return nil +} + +// finishUpload sends a requests to the specified url to either complete or abort the upload. +func (s *S3Backup) finishUpload(url string, body io.Reader) error { + r, err := http.NewRequest(http.MethodPost, url, body) if err != nil { return err } - res, err := http.DefaultClient.Do(r) + // Create a new http client with a 10 second timeout. + c := &http.Client{ + Timeout: 10 * time.Second, + } + + res, err := c.Do(r) if err != nil { return err } defer res.Body.Close() + // Handle non-200 status codes. if res.StatusCode != http.StatusOK { + // If no body was sent, we were aborting the upload. + if body == nil { + return fmt.Errorf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status) + } + + // If a body was sent we were completing the upload. + // TODO: Attempt to send abort request? return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status) }