From 5475cb02c14d51575bccccb40b2642961ad3b4bc Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Sun, 1 Nov 2020 10:30:25 -0700 Subject: [PATCH] Cleanup s3 backup uploads, add request timeouts --- api/backup_endpoints.go | 8 ++-- server/backup/backup_s3.go | 95 ++++++++++++++++++++++---------------- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/api/backup_endpoints.go b/api/backup_endpoints.go index 54cb7f0..6cd0617 100644 --- a/api/backup_endpoints.go +++ b/api/backup_endpoints.go @@ -7,10 +7,10 @@ import ( ) type BackupRemoteUploadResponse struct { - CompleteMultipartUpload string - AbortMultipartUpload string - Parts []string - PartSize int64 + CompleteMultipartUpload string `json:"complete_multipart_upload"` + AbortMultipartUpload string `json:"abort_multipart_upload"` + Parts []string `json:"parts"` + PartSize int64 `json:"part_size"` } func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) { diff --git a/server/backup/backup_s3.go b/server/backup/backup_s3.go index ce83679..9a4a6de 100644 --- a/server/backup/backup_s3.go +++ b/server/backup/backup_s3.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "strconv" + "time" ) type S3Backup struct { @@ -87,85 +88,99 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { r.Header.Add("Content-Length", strconv.Itoa(int(size))) r.Header.Add("Content-Type", "application/x-gzip") + // Limit the reader to the size of the part. r.Body = Reader{io.LimitReader(rc, size)} + // This http request can block forever due to it not having a timeout, + // but we are uploading up to 5GB of data, so there is not really + // a good way to handle a timeout on this. res, err := http.DefaultClient.Do(r) if err != nil { return "", err } - defer res.Body.Close() + // Handle non-200 status codes. if res.StatusCode != http.StatusOK { return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status) } + // Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request. return res.Header.Get("ETag"), nil } - // Keep track of errors from individual part uploads. - hasError := true - defer func() { - if !hasError { - return - } - - r, err := http.NewRequest(http.MethodPost, urls.AbortMultipartUpload, nil) - if err != nil { - log.WithError(err).Warn("failed to create http request (AbortMultipartUpload)") - return - } - - res, err := http.DefaultClient.Do(r) - if err != nil { - log.WithError(err).Warn("failed to make http request (AbortMultipartUpload)") - return - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - log.Warnf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status) - } - }() - - var completeBody bytes.Buffer - completeBody.WriteString("\n") + // Start assembling the body that will be sent as apart of the CompleteMultipartUpload request. + var completeUploadBody bytes.Buffer + completeUploadBody.WriteString("\n") partCount := len(urls.Parts) for i, part := range urls.Parts { - var s int64 + // Get the size for the current part. + var partSize int64 if i+1 < partCount { - s = urls.PartSize + partSize = urls.PartSize } else { - s = size - (int64(i) * urls.PartSize) + // This is the remaining size for the last part, + // there is not a minimum size limit for the last part. + partSize = size - (int64(i) * urls.PartSize) } - etag, err := handlePart(part, s) + // Attempt to upload the part. + etag, err := handlePart(part, partSize) if err != nil { + log.WithError(err).Warn("failed to upload part") + + // Send an AbortMultipartUpload request. + if err := s.finishUpload(urls.AbortMultipartUpload, nil); err != nil { + log.WithError(err).Warn("failed to abort multipart backup upload") + } + return err } - completeBody.WriteString("\t\n") - completeBody.WriteString("\t\t\"" + etag + "\"\n") - completeBody.WriteString("\t\t" + strconv.Itoa(i+1) + "\n") - completeBody.WriteString("\t\n") + // Add the part to the CompleteMultipartUpload body. + completeUploadBody.WriteString("\t\n") + completeUploadBody.WriteString("\t\t\"" + etag + "\"\n") + completeUploadBody.WriteString("\t\t" + strconv.Itoa(i+1) + "\n") + completeUploadBody.WriteString("\t\n") } - hasError = false + completeUploadBody.WriteString("") - completeBody.WriteString("") + // Send a CompleteMultipartUpload request. + if err := s.finishUpload(urls.CompleteMultipartUpload, &completeUploadBody); err != nil { + return err + } - r, err := http.NewRequest(http.MethodPost, urls.CompleteMultipartUpload, &completeBody) + return nil +} + +// finishUpload sends a requests to the specified url to either complete or abort the upload. +func (s *S3Backup) finishUpload(url string, body io.Reader) error { + r, err := http.NewRequest(http.MethodPost, url, body) if err != nil { return err } - res, err := http.DefaultClient.Do(r) + // Create a new http client with a 10 second timeout. + c := &http.Client{ + Timeout: 10 * time.Second, + } + + res, err := c.Do(r) if err != nil { return err } defer res.Body.Close() + // Handle non-200 status codes. if res.StatusCode != http.StatusOK { + // If no body was sent, we were aborting the upload. + if body == nil { + return fmt.Errorf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status) + } + + // If a body was sent we were completing the upload. + // TODO: Attempt to send abort request? return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status) }