Cleanup s3 backup uploads, add request timeouts

This commit is contained in:
Matthew Penner 2020-11-01 10:30:25 -07:00
parent 1239b1c0ca
commit 5475cb02c1
2 changed files with 59 additions and 44 deletions

View File

@ -7,10 +7,10 @@ import (
) )
type BackupRemoteUploadResponse struct { type BackupRemoteUploadResponse struct {
CompleteMultipartUpload string CompleteMultipartUpload string `json:"complete_multipart_upload"`
AbortMultipartUpload string AbortMultipartUpload string `json:"abort_multipart_upload"`
Parts []string Parts []string `json:"parts"`
PartSize int64 PartSize int64 `json:"part_size"`
} }
func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) { func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) {

View File

@ -11,6 +11,7 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"time"
) )
type S3Backup struct { type S3Backup struct {
@ -87,85 +88,99 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
r.Header.Add("Content-Length", strconv.Itoa(int(size))) r.Header.Add("Content-Length", strconv.Itoa(int(size)))
r.Header.Add("Content-Type", "application/x-gzip") r.Header.Add("Content-Type", "application/x-gzip")
// Limit the reader to the size of the part.
r.Body = Reader{io.LimitReader(rc, size)} r.Body = Reader{io.LimitReader(rc, size)}
// This http request can block forever due to it not having a timeout,
// but we are uploading up to 5GB of data, so there is not really
// a good way to handle a timeout on this.
res, err := http.DefaultClient.Do(r) res, err := http.DefaultClient.Do(r)
if err != nil { if err != nil {
return "", err return "", err
} }
defer res.Body.Close() defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status) return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status)
} }
// Get the ETag from the uploaded part, this should be sent with the CompleteMultipartUpload request.
return res.Header.Get("ETag"), nil return res.Header.Get("ETag"), nil
} }
// Keep track of errors from individual part uploads. // Start assembling the body that will be sent as apart of the CompleteMultipartUpload request.
hasError := true var completeUploadBody bytes.Buffer
defer func() { completeUploadBody.WriteString("<CompleteMultipartUpload>\n")
if !hasError {
return
}
r, err := http.NewRequest(http.MethodPost, urls.AbortMultipartUpload, nil)
if err != nil {
log.WithError(err).Warn("failed to create http request (AbortMultipartUpload)")
return
}
res, err := http.DefaultClient.Do(r)
if err != nil {
log.WithError(err).Warn("failed to make http request (AbortMultipartUpload)")
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Warnf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
}()
var completeBody bytes.Buffer
completeBody.WriteString("<CompleteMultipartUpload>\n")
partCount := len(urls.Parts) partCount := len(urls.Parts)
for i, part := range urls.Parts { for i, part := range urls.Parts {
var s int64 // Get the size for the current part.
var partSize int64
if i+1 < partCount { if i+1 < partCount {
s = urls.PartSize partSize = urls.PartSize
} else { } else {
s = size - (int64(i) * urls.PartSize) // This is the remaining size for the last part,
// there is not a minimum size limit for the last part.
partSize = size - (int64(i) * urls.PartSize)
} }
etag, err := handlePart(part, s) // Attempt to upload the part.
etag, err := handlePart(part, partSize)
if err != nil {
log.WithError(err).Warn("failed to upload part")
// Send an AbortMultipartUpload request.
if err := s.finishUpload(urls.AbortMultipartUpload, nil); err != nil {
log.WithError(err).Warn("failed to abort multipart backup upload")
}
return err
}
// Add the part to the CompleteMultipartUpload body.
completeUploadBody.WriteString("\t<Part>\n")
completeUploadBody.WriteString("\t\t<ETag>\"" + etag + "\"</ETag>\n")
completeUploadBody.WriteString("\t\t<PartNumber>" + strconv.Itoa(i+1) + "</PartNumber>\n")
completeUploadBody.WriteString("\t</Part>\n")
}
completeUploadBody.WriteString("</CompleteMultipartUpload>")
// Send a CompleteMultipartUpload request.
if err := s.finishUpload(urls.CompleteMultipartUpload, &completeUploadBody); err != nil {
return err
}
return nil
}
// finishUpload sends a requests to the specified url to either complete or abort the upload.
func (s *S3Backup) finishUpload(url string, body io.Reader) error {
r, err := http.NewRequest(http.MethodPost, url, body)
if err != nil { if err != nil {
return err return err
} }
completeBody.WriteString("\t<Part>\n") // Create a new http client with a 10 second timeout.
completeBody.WriteString("\t\t<ETag>\"" + etag + "\"</ETag>\n") c := &http.Client{
completeBody.WriteString("\t\t<PartNumber>" + strconv.Itoa(i+1) + "</PartNumber>\n") Timeout: 10 * time.Second,
completeBody.WriteString("\t</Part>\n")
}
hasError = false
completeBody.WriteString("</CompleteMultipartUpload>")
r, err := http.NewRequest(http.MethodPost, urls.CompleteMultipartUpload, &completeBody)
if err != nil {
return err
} }
res, err := http.DefaultClient.Do(r) res, err := c.Do(r)
if err != nil { if err != nil {
return err return err
} }
defer res.Body.Close() defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
// If no body was sent, we were aborting the upload.
if body == nil {
return fmt.Errorf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
// If a body was sent we were completing the upload.
// TODO: Attempt to send abort request?
return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status) return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status)
} }