s3 backups: handle CompleteMultipartUpload and AbortMultipartUpload on the panel

This commit is contained in:
Matthew Penner 2020-12-06 13:56:17 -07:00
parent 83f0d2c953
commit ee08829a28
6 changed files with 56 additions and 85 deletions

View File

@ -3,11 +3,16 @@ package api
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"sync"
)
var (
backupUploadIDsMx sync.Mutex
backupUploadIDs = map[string]string{}
) )
type BackupRemoteUploadResponse struct { type BackupRemoteUploadResponse struct {
CompleteMultipartUpload string `json:"complete_multipart_upload"` UploadID string `json:"upload_id"`
AbortMultipartUpload string `json:"abort_multipart_upload"`
Parts []string `json:"parts"` Parts []string `json:"parts"`
PartSize int64 `json:"part_size"` PartSize int64 `json:"part_size"`
} }
@ -28,10 +33,16 @@ func (r *Request) GetBackupRemoteUploadURLs(backup string, size int64) (*BackupR
return nil, err return nil, err
} }
// Store the backup upload id for later use, this is a janky way to be able to use it later with SendBackupStatus.
backupUploadIDsMx.Lock()
backupUploadIDs[backup] = res.UploadID
backupUploadIDsMx.Unlock()
return &res, nil return &res, nil
} }
type BackupRequest struct { type BackupRequest struct {
UploadID string `json:"upload_id"`
Checksum string `json:"checksum"` Checksum string `json:"checksum"`
ChecksumType string `json:"checksum_type"` ChecksumType string `json:"checksum_type"`
Size int64 `json:"size"` Size int64 `json:"size"`
@ -41,6 +52,13 @@ type BackupRequest struct {
// Notifies the panel that a specific backup has been completed and is now // Notifies the panel that a specific backup has been completed and is now
// available for a user to view and download. // available for a user to view and download.
func (r *Request) SendBackupStatus(backup string, data BackupRequest) error { func (r *Request) SendBackupStatus(backup string, data BackupRequest) error {
// Set the UploadID on the data.
backupUploadIDsMx.Lock()
if v, ok := backupUploadIDs[backup]; ok {
data.UploadID = v
}
backupUploadIDsMx.Unlock()
resp, err := r.Post(fmt.Sprintf("/backups/%s", backup), data) resp, err := r.Post(fmt.Sprintf("/backups/%s", backup), data)
if err != nil { if err != nil {
return err return err

View File

@ -59,11 +59,6 @@ type SystemConfiguration struct {
// disk usage is not a concern. // disk usage is not a concern.
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"` DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
// Determines if Wings should detect a server that stops with a normal exit code of
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
// the user did not press the stop button, but the process stopped cleanly.
DetectCleanExitAsCrash bool `default:"true" yaml:"detect_clean_exit_as_crash"`
// If set to true, file permissions for a server will be checked when the process is // If set to true, file permissions for a server will be checked when the process is
// booted. This can cause boot delays if the server has a large amount of files. In most // booted. This can cause boot delays if the server has a large amount of files. In most
// cases disabling this should not have any major impact unless external processes are // cases disabling this should not have any major impact unless external processes are
@ -78,6 +73,20 @@ type SystemConfiguration struct {
WebsocketLogCount int `default:"150" yaml:"websocket_log_count"` WebsocketLogCount int `default:"150" yaml:"websocket_log_count"`
Sftp SftpConfiguration `yaml:"sftp"` Sftp SftpConfiguration `yaml:"sftp"`
CrashDetection CrashDetection `yaml:"crash_detection"`
}
type CrashDetection struct {
// Determines if Wings should detect a server that stops with a normal exit code of
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
// the user did not press the stop button, but the process stopped cleanly.
DetectCleanExitAsCrash bool `default:"true" yaml:"detect_clean_exit_as_crash"`
// Timeout specifies the timeout between crashes that will not cause the server
// to be automatically restarted, this value is used to prevent servers from
// becoming stuck in a boot-loop after multiple consecutive crashes.
Timeout int `default:"60" json:"timeout"`
} }
// Ensures that all of the system directories exist on the system. These directories are // Ensures that all of the system directories exist on the system. These directories are

View File

@ -95,7 +95,7 @@ func getErrorStack(err error, i bool) errors.StackTrace {
// The errors.WrapIf did not return a interface compatible with `tracer`, so // The errors.WrapIf did not return a interface compatible with `tracer`, so
// we don't have an easy way to get the stacktrace, this should probably be changed // we don't have an easy way to get the stacktrace, this should probably be changed
// at some point, but without this the application may panic when handling some errors. // at some point, but without this the application may panic when handling some errors.
return nil return errors.WithStack(err).(tracer).StackTrace()
} }
return getErrorStack(errors.WithMessage(err, err.Error()), true) return getErrorStack(errors.WithMessage(err, err.Error()), true)

View File

@ -13,8 +13,7 @@ import (
// Notifies the panel of a backup's state and returns an error if one is encountered // Notifies the panel of a backup's state and returns an error if one is encountered
// while performing this action. // while performing this action.
func (s *Server) notifyPanelOfBackup(uuid string, ad *backup.ArchiveDetails, successful bool) error { func (s *Server) notifyPanelOfBackup(uuid string, ad *backup.ArchiveDetails, successful bool) error {
err := api.New().SendBackupStatus(uuid, ad.ToRequest(successful)) if err := api.New().SendBackupStatus(uuid, ad.ToRequest(successful)); err != nil {
if err != nil {
if !api.IsRequestError(err) { if !api.IsRequestError(err) {
s.Log().WithFields(log.Fields{ s.Log().WithFields(log.Fields{
"backup": uuid, "backup": uuid,

View File

@ -1,7 +1,6 @@
package backup package backup
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"github.com/apex/log" "github.com/apex/log"
@ -10,7 +9,6 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"time"
) )
type S3Backup struct { type S3Backup struct {
@ -75,10 +73,12 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
return err return err
} }
log.WithFields(log.Fields{ l := log.WithFields(log.Fields{
"backup_id": s.Uuid, "backup_id": s.Uuid,
"adapter": "s3", "adapter": "s3",
}).Info("attempting to upload backup..") })
l.Info("attempting to upload backup..")
handlePart := func(part string, size int64) (string, error) { handlePart := func(part string, size int64) (string, error) {
r, err := http.NewRequest(http.MethodPut, part, nil) r, err := http.NewRequest(http.MethodPut, part, nil)
@ -91,7 +91,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
r.Header.Add("Content-Type", "application/x-gzip") r.Header.Add("Content-Type", "application/x-gzip")
// Limit the reader to the size of the part. // Limit the reader to the size of the part.
r.Body = Reader{io.LimitReader(rc, size)} r.Body = Reader{Reader: io.LimitReader(rc, size)}
// This http request can block forever due to it not having a timeout, // This http request can block forever due to it not having a timeout,
// but we are uploading up to 5GB of data, so there is not really // but we are uploading up to 5GB of data, so there is not really
@ -111,10 +111,6 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
return res.Header.Get("ETag"), nil return res.Header.Get("ETag"), nil
} }
// Start assembling the body that will be sent as apart of the CompleteMultipartUpload request.
var completeUploadBody bytes.Buffer
completeUploadBody.WriteString("<CompleteMultipartUpload>\n")
partCount := len(urls.Parts) partCount := len(urls.Parts)
for i, part := range urls.Parts { for i, part := range urls.Parts {
// Get the size for the current part. // Get the size for the current part.
@ -128,67 +124,14 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
} }
// Attempt to upload the part. // Attempt to upload the part.
etag, err := handlePart(part, partSize) _, err := handlePart(part, partSize)
if err != nil { if err != nil {
log.WithError(err).Warn("failed to upload part") l.WithField("part_id", part).WithError(err).Warn("failed to upload part")
// Send an AbortMultipartUpload request.
if err := s.finishUpload(urls.AbortMultipartUpload, nil); err != nil {
log.WithError(err).Warn("failed to abort multipart backup upload")
}
return err return err
} }
// Add the part to the CompleteMultipartUpload body.
completeUploadBody.WriteString("\t<Part>\n")
completeUploadBody.WriteString("\t\t<ETag>\"" + etag + "\"</ETag>\n")
completeUploadBody.WriteString("\t\t<PartNumber>" + strconv.Itoa(i+1) + "</PartNumber>\n")
completeUploadBody.WriteString("\t</Part>\n")
}
completeUploadBody.WriteString("</CompleteMultipartUpload>")
// Send a CompleteMultipartUpload request.
if err := s.finishUpload(urls.CompleteMultipartUpload, &completeUploadBody); err != nil {
return err
} }
log.WithFields(log.Fields{ l.Info("backup has been successfully uploaded")
"backup_id": s.Uuid,
"adapter": "s3",
}).Info("backup has been successfully uploaded")
return nil
}
// finishUpload sends a requests to the specified url to either complete or abort the upload.
func (s *S3Backup) finishUpload(url string, body io.Reader) error {
r, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
return err
}
// Create a new http client with a 10 second timeout.
c := &http.Client{
Timeout: 10 * time.Second,
}
res, err := c.Do(r)
if err != nil {
return err
}
defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != http.StatusOK {
// If no body was sent, we were aborting the upload.
if body == nil {
return fmt.Errorf("failed to abort S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
// If a body was sent we were completing the upload.
// TODO: Attempt to send abort request?
return fmt.Errorf("failed to complete S3 multipart upload, %d:%s", res.StatusCode, res.Status)
}
return nil return nil
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"strconv"
"sync" "sync"
"time" "time"
) )
@ -47,8 +48,7 @@ func (s *Server) handleServerCrash() error {
if s.Environment.State() != environment.ProcessOfflineState || !s.Config().CrashDetectionEnabled { if s.Environment.State() != environment.ProcessOfflineState || !s.Config().CrashDetectionEnabled {
if !s.Config().CrashDetectionEnabled { if !s.Config().CrashDetectionEnabled {
s.Log().Debug("server triggered crash detection but handler is disabled for server process") s.Log().Debug("server triggered crash detection but handler is disabled for server process")
s.PublishConsoleOutputFromDaemon("Aborting automatic restart, crash detection is disabled for this instance.")
s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.")
} }
return nil return nil
@ -61,9 +61,8 @@ func (s *Server) handleServerCrash() error {
// If the system is not configured to detect a clean exit code as a crash, and the // If the system is not configured to detect a clean exit code as a crash, and the
// crash is not the result of the program running out of memory, do nothing. // crash is not the result of the program running out of memory, do nothing.
if exitCode == 0 && !oomKilled && !config.Get().System.DetectCleanExitAsCrash { if exitCode == 0 && !oomKilled && !config.Get().System.CrashDetection.DetectCleanExitAsCrash {
s.Log().Debug("server exited with successful exit code; system is configured to not detect this as a crash") s.Log().Debug("server exited with successful exit code; system is configured to not detect this as a crash")
return nil return nil
} }
@ -72,11 +71,14 @@ func (s *Server) handleServerCrash() error {
s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled))
c := s.crasher.LastCrashTime() c := s.crasher.LastCrashTime()
// If the last crash time was within the last 60 seconds we do not want to perform timeout := config.Get().System.CrashDetection.Timeout
// an automatic reboot of the process. Return an error that can be handled.
if !c.IsZero() && c.Add(time.Second*60).After(time.Now()) {
s.PublishConsoleOutputFromDaemon("Aborting automatic reboot: last crash occurred less than 60 seconds ago.")
// If the last crash time was within the last `timeout` seconds we do not want to perform
// an automatic reboot of the process. Return an error that can be handled.
//
// If timeout is set to 0, always reboot the server (this is probably a terrible idea, but some people want it)
if timeout != 0 && !c.IsZero() && c.Add(time.Second*time.Duration(config.Get().System.CrashDetection.Timeout)).After(time.Now()) {
s.PublishConsoleOutputFromDaemon("Aborting automatic restart, last crash occurred less than " + strconv.Itoa(timeout) + " seconds ago.")
return &crashTooFrequent{} return &crashTooFrequent{}
} }