Switch to s3 multipart uploads for backups
This commit is contained in:
parent
b8598e90d4
commit
1239b1c0ca
|
@ -185,7 +185,7 @@ func (r *Response) Error() error {
|
||||||
var bag RequestErrorBag
|
var bag RequestErrorBag
|
||||||
_ = r.Bind(&bag)
|
_ = r.Bind(&bag)
|
||||||
|
|
||||||
e := new(RequestError)
|
e := &RequestError{}
|
||||||
if len(bag.Errors) > 0 {
|
if len(bag.Errors) > 0 {
|
||||||
e = &bag.Errors[0]
|
e = &bag.Errors[0]
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,35 @@ package api
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type BackupRemoteUploadResponse struct {
|
||||||
|
CompleteMultipartUpload string
|
||||||
|
AbortMultipartUpload string
|
||||||
|
Parts []string
|
||||||
|
PartSize int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupRemoteUploadResponse, error) {
|
||||||
|
resp, err := r.Get(fmt.Sprintf("/backups/%s", backup), Q{"size": strconv.FormatInt(size, 10)})
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.HasError() {
|
||||||
|
return nil, resp.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
var res BackupRemoteUploadResponse
|
||||||
|
if err := resp.Bind(&res); err != nil {
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &res, nil
|
||||||
|
}
|
||||||
|
|
||||||
type BackupRequest struct {
|
type BackupRequest struct {
|
||||||
Checksum string `json:"checksum"`
|
Checksum string `json:"checksum"`
|
||||||
ChecksumType string `json:"checksum_type"`
|
ChecksumType string `json:"checksum_type"`
|
||||||
|
|
|
@ -9,7 +9,6 @@ type Request struct {
|
||||||
Adapter string `json:"adapter"`
|
Adapter string `json:"adapter"`
|
||||||
Uuid string `json:"uuid"`
|
Uuid string `json:"uuid"`
|
||||||
IgnoredFiles []string `json:"ignored_files"`
|
IgnoredFiles []string `json:"ignored_files"`
|
||||||
PresignedUrl string `json:"presigned_url"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generates a new local backup struct.
|
// Generates a new local backup struct.
|
||||||
|
@ -32,15 +31,10 @@ func (r *Request) NewS3Backup() (*S3Backup, error) {
|
||||||
return nil, errors.New(fmt.Sprintf("cannot create s3 backup using [%s] adapter", r.Adapter))
|
return nil, errors.New(fmt.Sprintf("cannot create s3 backup using [%s] adapter", r.Adapter))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(r.PresignedUrl) == 0 {
|
|
||||||
return nil, errors.New("a valid presigned S3 upload URL must be provided to use the [s3] adapter")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &S3Backup{
|
return &S3Backup{
|
||||||
Backup: Backup{
|
Backup: Backup{
|
||||||
Uuid: r.Uuid,
|
Uuid: r.Uuid,
|
||||||
IgnoredFiles: r.IgnoredFiles,
|
IgnoredFiles: r.IgnoredFiles,
|
||||||
},
|
},
|
||||||
PresignedUrl: r.PresignedUrl,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package backup
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/pterodactyl/wings/api"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -13,12 +15,6 @@ import (
|
||||||
|
|
||||||
type S3Backup struct {
|
type S3Backup struct {
|
||||||
Backup
|
Backup
|
||||||
|
|
||||||
// The pre-signed upload endpoint for the generated backup. This must be
|
|
||||||
// provided otherwise this request will fail. This allows us to keep all
|
|
||||||
// of the keys off the daemon instances and the panel can handle generating
|
|
||||||
// the credentials for us.
|
|
||||||
PresignedUrl string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ BackupInterface = (*S3Backup)(nil)
|
var _ BackupInterface = (*S3Backup)(nil)
|
||||||
|
@ -43,14 +39,8 @@ func (s *S3Backup) Generate(included *IncludedFiles, prefix string) (*ArchiveDet
|
||||||
}
|
}
|
||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
if resp, err := s.generateRemoteRequest(rc); err != nil {
|
if err := s.generateRemoteRequest(rc); err != nil {
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
} else {
|
|
||||||
resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("failed to put S3 object, %d:%s", resp.StatusCode, resp.Status)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.Details(), err
|
return s.Details(), err
|
||||||
|
@ -61,27 +51,123 @@ func (s *S3Backup) Remove() error {
|
||||||
return os.Remove(s.Path())
|
return os.Remove(s.Path())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reader provides a wrapper around an existing io.Reader
|
||||||
|
// but implements io.Closer in order to satisfy an io.ReadCloser.
|
||||||
|
type Reader struct {
|
||||||
|
io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (Reader) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Generates the remote S3 request and begins the upload.
|
// Generates the remote S3 request and begins the upload.
|
||||||
func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) (*http.Response, error) {
|
func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
|
||||||
r, err := http.NewRequest(http.MethodPut, s.PresignedUrl, nil)
|
defer rc.Close()
|
||||||
|
|
||||||
|
size, err := s.Backup.Size()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if sz, err := s.Size(); err != nil {
|
urls, err := api.New().GetBackupRemoteUploadURLs(s.Backup.Uuid, size)
|
||||||
return nil, err
|
if err != nil {
|
||||||
} else {
|
return err
|
||||||
r.ContentLength = sz
|
}
|
||||||
r.Header.Add("Content-Length", strconv.Itoa(int(sz)))
|
|
||||||
|
log.Debug("attempting to upload backup to remote S3 endpoint")
|
||||||
|
|
||||||
|
handlePart := func(part string, size int64) (string, error) {
|
||||||
|
r, err := http.NewRequest(http.MethodPut, part, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.ContentLength = size
|
||||||
|
r.Header.Add("Content-Length", strconv.Itoa(int(size)))
|
||||||
r.Header.Add("Content-Type", "application/x-gzip")
|
r.Header.Add("Content-Type", "application/x-gzip")
|
||||||
|
|
||||||
|
r.Body = Reader{io.LimitReader(rc, size)}
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Body = rc
|
defer res.Body.Close()
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
if res.StatusCode != http.StatusOK {
|
||||||
"endpoint": s.PresignedUrl,
|
return "", fmt.Errorf("failed to put S3 object part, %d:%s", res.StatusCode, res.Status)
|
||||||
"headers": r.Header,
|
}
|
||||||
}).Debug("uploading backup to remote S3 endpoint")
|
|
||||||
|
return res.Header.Get("ETag"), nil
|
||||||
return http.DefaultClient.Do(r)
|
}
|
||||||
|
|
||||||
|
// Keep track of errors from individual part uploads.
|
||||||
|
hasError := true
|
||||||
|
defer func() {
|
||||||
|
if !hasError {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := http.NewRequest(http.MethodPost, urls.AbortMultipartUpload, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warn("failed to create http request (AbortMultipartUpload)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warn("failed to make http request (AbortMultipartUpload)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
log.Warnf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var completeBody bytes.Buffer
|
||||||
|
completeBody.WriteString("<CompleteMultipartUpload>\n")
|
||||||
|
|
||||||
|
partCount := len(urls.Parts)
|
||||||
|
for i, part := range urls.Parts {
|
||||||
|
var s int64
|
||||||
|
if i+1 < partCount {
|
||||||
|
s = urls.PartSize
|
||||||
|
} else {
|
||||||
|
s = size - (int64(i) * urls.PartSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
etag, err := handlePart(part, s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
completeBody.WriteString("\t<Part>\n")
|
||||||
|
completeBody.WriteString("\t\t<ETag>\"" + etag + "\"</ETag>\n")
|
||||||
|
completeBody.WriteString("\t\t<PartNumber>" + strconv.Itoa(i+1) + "</PartNumber>\n")
|
||||||
|
completeBody.WriteString("\t</Part>\n")
|
||||||
|
}
|
||||||
|
hasError = false
|
||||||
|
|
||||||
|
completeBody.WriteString("</CompleteMultipartUpload>")
|
||||||
|
|
||||||
|
r, err := http.NewRequest(http.MethodPost, urls.CompleteMultipartUpload, &completeBody)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user