From 5c78cb9ab384a32d4d9f9651958a5b345b68cca2 Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Fri, 25 Dec 2020 14:32:41 -0700 Subject: [PATCH] Add transfer logging output (#77) Co-authored-by: Dane Everitt --- api/server_endpoints.go | 19 +- config/config_system.go | 14 +- config/config_throttles.go | 2 +- environment/docker/power.go | 4 +- router/router_server_ws.go | 4 +- router/router_transfer.go | 509 +++++++++++++++++++++------------- router/websocket/listeners.go | 2 + router/websocket/websocket.go | 22 +- server/backup/backup_s3.go | 2 +- server/collection.go | 2 + server/errors.go | 8 +- server/events.go | 2 + server/install.go | 11 +- server/power.go | 8 + server/server.go | 3 + system/utils.go | 13 + 16 files changed, 398 insertions(+), 227 deletions(-) diff --git a/api/server_endpoints.go b/api/server_endpoints.go index f7fedbe..ad47c75 100644 --- a/api/server_endpoints.go +++ b/api/server_endpoints.go @@ -189,22 +189,15 @@ func (r *Request) SendArchiveStatus(uuid string, successful bool) error { return resp.Error() } -func (r *Request) SendTransferFailure(uuid string) error { - resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/failure", uuid), nil) +func (r *Request) SendTransferStatus(uuid string, successful bool) error { + state := "failure" + if successful { + state = "success" + } + resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/%s", uuid, state), nil) if err != nil { return err } defer resp.Body.Close() - - return resp.Error() -} - -func (r *Request) SendTransferSuccess(uuid string) error { - resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/success", uuid), nil) - if err != nil { - return err - } - defer resp.Body.Close() - return resp.Error() } diff --git a/config/config_system.go b/config/config_system.go index c5bd0b4..b97cc76 100644 --- a/config/config_system.go +++ b/config/config_system.go @@ -77,6 +77,8 @@ type SystemConfiguration struct { CrashDetection CrashDetection `yaml:"crash_detection"` Backups Backups `yaml:"backups"` + + Transfers Transfers `yaml:"transfers"` } type CrashDetection struct { @@ -97,12 +99,22 @@ type Backups struct { // upload it to any external storage provider. // // If the value is less than 1, the write speed is unlimited, - // if the value is greater than 0, the write speed is the value in MB/s. + // if the value is greater than 0, the write speed is the value in MiB/s. // // Defaults to 0 (unlimited) WriteLimit int `default:"0" yaml:"write_limit"` } +type Transfers struct { + // DownloadLimit imposes a Network I/O read limit when downloading a transfer archive. + // + // If the value is less than 1, the write speed is unlimited, + // if the value is greater than 0, the write speed is the value in MiB/s. + // + // Defaults to 0 (unlimited) + DownloadLimit int `default:"0" yaml:"download_limit"` +} + // Ensures that all of the system directories exist on the system. These directories are // created so that only the owner can read the data, and no other users. func (sc *SystemConfiguration) ConfigureDirectories() error { diff --git a/config/config_throttles.go b/config/config_throttles.go index 656f7c1..e9c5531 100644 --- a/config/config_throttles.go +++ b/config/config_throttles.go @@ -22,6 +22,6 @@ type ConsoleThrottles struct { DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"` // The amount of time that a server is allowed to be stopping for before it is terminated - // forfully if it triggers output throttles. + // forcefully if it triggers output throttles. StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"` } diff --git a/environment/docker/power.go b/environment/docker/power.go index d8654c9..980c596 100644 --- a/environment/docker/power.go +++ b/environment/docker/power.go @@ -147,8 +147,8 @@ func (e *Environment) Stop() error { } t := time.Second * 30 - err := e.client.ContainerStop(context.Background(), e.Id, &t) - if err != nil { + + if err := e.client.ContainerStop(context.Background(), e.Id, &t); err != nil { // If the container does not exist just mark the process as stopped and return without // an error. if client.IsErrNotFound(err) { diff --git a/router/router_server_ws.go b/router/router_server_ws.go index 2633b03..8c7dadd 100644 --- a/router/router_server_ws.go +++ b/router/router_server_ws.go @@ -24,14 +24,14 @@ func getServerWebsocket(c *gin.Context) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Track this open connection on the server so that we can close them all programtically + // Track this open connection on the server so that we can close them all programmatically // if the server is deleted. s.Websockets().Push(handler.Uuid(), &cancel) defer s.Websockets().Remove(handler.Uuid()) // Listen for the context being canceled and then close the websocket connection. This normally // just happens because you're disconnecting from the socket in the browser, however in some - // cases we close the connections programatically (e.g. deleting the server) and need to send + // cases we close the connections programmatically (e.g. deleting the server) and need to send // a close message to the websocket so it disconnects. go func(ctx context.Context, c *ws.Conn) { ListenerLoop: diff --git a/router/router_transfer.go b/router/router_transfer.go index f009f99..16bf1a1 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -2,28 +2,52 @@ package router import ( "bufio" - "bytes" "crypto/sha256" "emperror.dev/errors" "encoding/hex" + "encoding/json" + "fmt" "github.com/apex/log" - "github.com/buger/jsonparser" "github.com/gin-gonic/gin" + "github.com/juju/ratelimit" "github.com/mholt/archiver/v3" + "github.com/mitchellh/colorstring" "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/installer" "github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/server" + "github.com/pterodactyl/wings/system" "io" - "io/ioutil" "net/http" "os" "path/filepath" "strconv" "strings" + "sync/atomic" + "time" ) +// Number of ticks in the progress bar +const ticks = 25 + +// 100% / number of ticks = percentage represented by each tick +const tickPercentage = 100 / ticks + +type downloadProgress struct { + size int64 + progress int64 +} + +// Data passed over to initiate a server transfer. +type serverTransferRequest struct { + ServerID string `binding:"required" json:"server_id"` + URL string `binding:"required" json:"url"` + Token string `binding:"required" json:"token"` + Server json.RawMessage `json:"server"` +} + +// Returns the archive for a server so that it can be transfered to a new node. func getServerArchive(c *gin.Context) { auth := strings.SplitN(c.GetHeader("Authorization"), " ", 2) @@ -41,22 +65,20 @@ func getServerArchive(c *gin.Context) { return } - if token.Subject != c.Param("server") { + s := ExtractServer(c) + if token.Subject != s.Id() { c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ - "error": "( .. •˘___˘• .. )", + "error": "Missing required token subject, or subject is not valid for the requested server.", }) return } - s := GetServer(c.Param("server")) - st, err := s.Archiver.Stat() if err != nil { if !errors.Is(err, os.ErrNotExist) { - NewServerError(err, s).SetMessage("failed to stat archive").Abort(c) + WithError(c, err) return } - c.AbortWithStatus(http.StatusNotFound) return } @@ -69,14 +91,7 @@ func getServerArchive(c *gin.Context) { file, err := os.Open(s.Archiver.Path()) if err != nil { - tserr := NewServerError(err, s) - if !os.IsNotExist(err) { - tserr.SetMessage("failed to open archive for reading") - } else { - tserr.SetMessage("failed to open archive") - } - - tserr.Abort(c) + WithError(c, err) return } defer file.Close() @@ -91,65 +106,21 @@ func getServerArchive(c *gin.Context) { } func postServerArchive(c *gin.Context) { - s := GetServer(c.Param("server")) + s := ExtractServer(c) go func(s *server.Server) { r := api.New() l := log.WithField("server", s.Id()) - // Attempt to get an archive of the server. This **WILL NOT** modify the source files of a server, - // this process is 100% safe and will not corrupt a server's files if it fails. - if err := s.Archiver.Archive(); err != nil { - l.WithField("error", err).Error("failed to get transfer archive for server") - - if err := r.SendArchiveStatus(s.Id(), false); err != nil { - if !api.IsRequestError(err) { - l.WithField("error", err).Error("failed to notify panel of failed archive status") - return - } - - l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a failed archive status") - return - } - - l.Info("successfully notified panel of failed archive status") - return + // This function automatically adds the Source Node prefix and Timestamp to the log + // output before sending it over the websocket. + sendTransferLog := func(data string) { + output := colorstring.Color(fmt.Sprintf("[yellow][bold]%s [Pterodactyl Transfer System] [Source Node]:[default] %s", time.Now().Format(time.RFC1123), data)) + s.Events().Publish(server.TransferLogsEvent, output) } - l.Info("successfully created server transfer archive, notifying panel..") - - if err := r.SendArchiveStatus(s.Id(), true); err != nil { - if !api.IsRequestError(err) { - l.WithField("error", err).Error("failed to notify panel of successful archive status") - return - } - - l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a successful archive status") - return - } - - l.Info("successfully notified panel of successful transfer archive status") - }(s) - - c.Status(http.StatusAccepted) -} - -func postTransfer(c *gin.Context) { - var buf bytes.Buffer - if _, err := buf.ReadFrom(c.Request.Body); err != nil { - c.AbortWithStatus(http.StatusBadRequest) - return - } - - go func(data []byte) { - serverID, _ := jsonparser.GetString(data, "server_id") - url, _ := jsonparser.GetString(data, "url") - token, _ := jsonparser.GetString(data, "token") - - l := log.WithField("server", serverID) - l.Info("incoming transfer for server") - // Create an http client with no timeout. - client := &http.Client{Timeout: 0} + s.Events().Publish(server.TransferStatusEvent, "starting") + sendTransferLog("Attempting to archive server...") hasError := true defer func() { @@ -157,162 +128,318 @@ func postTransfer(c *gin.Context) { return } - l.Info("server transfer failed, notifying panel") - if err := api.New().SendTransferFailure(serverID); err != nil { + s.Events().Publish(server.TransferStatusEvent, "failure") + + sendTransferLog("Attempting to notify panel of archive failure..") + if err := r.SendArchiveStatus(s.Id(), false); err != nil { if !api.IsRequestError(err) { - l.WithField("error", err).Error("failed to notify panel with transfer failure") + sendTransferLog("Failed to notify panel of archive failure: " + err.Error()) + l.WithField("error", err).Error("failed to notify panel of failed archive status") return } - l.WithField("error", err.Error()).Error("received error response from panel while notifying of transfer failure") + sendTransferLog("Panel returned an error while notifying it of a failed archive: " + err.Error()) + l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a failed archive status") return } - l.Debug("notified panel of transfer failure") + sendTransferLog("Successfully notified panel of failed archive status") + l.Info("successfully notified panel of failed archive status") }() - // Make a new GET request to the URL the panel gave us. - req, err := http.NewRequest("GET", url, nil) - if err != nil { - log.WithField("error", err).Error("failed to create http request for archive transfer") + // Mark the server as transferring to prevent problems. + s.SetTransferring(true) + + // Ensure the server is offline. Sometimes a "No such container" error gets through + // which means the server is already stopped. We can ignore that. + if err := s.Environment.WaitForStop(60, false); err != nil && !strings.Contains(strings.ToLower(err.Error()), "no such container") { + sendTransferLog("Failed to stop server, aborting transfer..") + l.WithField("error", err).Error("failed to stop server") return } - // Add the authorization header. - req.Header.Set("Authorization", token) + // Attempt to get an archive of the server. + if err := s.Archiver.Archive(); err != nil { + sendTransferLog("An error occurred while archiving the server: " + err.Error()) + l.WithField("error", err).Error("failed to get transfer archive for server") + return + } - l.Info("requesting archive for server transfer..") - // Execute the http request. - res, err := client.Do(req) + sendTransferLog("Successfully created archive, attempting to notify panel..") + l.Info("successfully created server transfer archive, notifying panel..") + + if err := r.SendArchiveStatus(s.Id(), true); err != nil { + if !api.IsRequestError(err) { + sendTransferLog("Failed to notify panel of archive success: " + err.Error()) + l.WithField("error", err).Error("failed to notify panel of successful archive status") + return + } + + sendTransferLog("Panel returned an error while notifying it of a successful archive: " + err.Error()) + l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a successful archive status") + return + } + + hasError = false + + // This log may not be displayed by the client due to the status event being sent before or at the same time. + sendTransferLog("Successfully notified panel of successful archive status") + + l.Info("successfully notified panel of successful transfer archive status") + s.Events().Publish(server.TransferStatusEvent, "archived") + }(s) + + c.Status(http.StatusAccepted) +} + +func (w *downloadProgress) Write(v []byte) (int, error) { + n := len(v) + atomic.AddInt64(&w.progress, int64(n)) + return n, nil +} + +// Log helper function to attach all errors and info output to a consistently formatted +// log string for easier querying. +func (str serverTransferRequest) log() *log.Entry { + return log.WithField("subsystem", "transfers").WithField("server_id", str.ServerID) +} + +// Downloads an archive from the machine that the server currently lives on. +func (str serverTransferRequest) downloadArchive() (*http.Response, error) { + client := http.Client{Timeout: 0} + req, err := http.NewRequest(http.MethodGet, str.URL, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", str.Token) + res, err := client.Do(req) + if err != nil { + return nil, err + } + return res, nil +} + +// Returns the path to the local archive on the system. +func (str serverTransferRequest) path() string { + return filepath.Join(config.Get().System.ArchiveDirectory, str.ServerID+".tar.gz") +} + +// Creates the archive location on this machine by first checking that the required file +// does not already exist. If it does exist, the file is deleted and then re-created as +// an empty file. +func (str serverTransferRequest) createArchiveFile() (*os.File, error) { + p := str.path() + if _, err := os.Stat(p); err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } else if err := os.Remove(p); err != nil { + return nil, err + } + return os.Create(p) +} + +// Deletes the archive from the local filesystem. This is executed as a deferred function. +func (str serverTransferRequest) removeArchivePath() { + p := str.path() + str.log().Debug("deleting temporary transfer archive") + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + str.log().WithField("path", p).WithField("error", err).Error("failed to delete temporary transfer archive file") + return + } + str.log().Debug("deleted temporary transfer archive successfully") +} + +// Verifies that the SHA-256 checksum of the file on the local filesystem matches the +// expected value from the transfer request. The string value returned is the computed +// checksum on the system. +func (str serverTransferRequest) verifyChecksum(matches string) (bool, string, error) { + file, err := os.Open(str.path()) + if err != nil { + return false, "", err + } + defer file.Close() + hash := sha256.New() + buf := make([]byte, 1024*4) + if _, err := io.CopyBuffer(hash, file, buf); err != nil { + return false, "", err + } + checksum := hex.EncodeToString(hash.Sum(nil)) + return checksum == matches, checksum, nil +} + +// Sends a notification to the Panel letting it know what the status of this transfer is. +func (str serverTransferRequest) sendTransferStatus(successful bool) error { + lg := str.log().WithField("transfer_successful", successful) + lg.Info("notifying Panel of server transfer state") + if err := api.New().SendTransferStatus(str.ServerID, successful); err != nil { + lg.WithField("error", err).Error("error notifying panel of transfer state") + return err + } + lg.Debug("notified panel of transfer state") + return nil +} + +// Initiates a transfer between two nodes for a server by downloading an archive from the +// remote node and then applying the server details to this machine. +func postTransfer(c *gin.Context) { + var data serverTransferRequest + if err := c.BindJSON(&data); err != nil { + return + } + + data.log().Info("handling incoming server transfer request") + go func(data *serverTransferRequest) { + hasError := true + defer func() { + _ = data.sendTransferStatus(!hasError) + }() + + // Create a new server installer. This will only configure the environment and not + // run the installer scripts. + i, err := installer.New(data.Server) if err != nil { - l.WithField("error", err).Error("failed to send archive http request") + data.log().WithField("error", err).Error("failed to validate received server data") + return + } + + // This function automatically adds the Target Node prefix and Timestamp to the log output before sending it + // over the websocket. + sendTransferLog := func(data string) { + output := colorstring.Color(fmt.Sprintf("[yellow][bold]%s [Pterodactyl Transfer System] [Target Node]:[default] %s", time.Now().Format(time.RFC1123), data)) + i.Server().Events().Publish(server.TransferLogsEvent, output) + } + + // Mark the server as transferring to prevent problems later on during the process and + // then push the server into the global server collection for this instance. + i.Server().SetTransferring(true) + server.GetServers().Add(i.Server()) + defer func(s *server.Server) { + // In the event that this transfer call fails, remove the server from the global + // server tracking so that we don't have a dangling instance. + if hasError { + sendTransferLog("Server transfer failed, check Wings logs for additional information.") + s.Events().Publish(server.TransferStatusEvent, "failure") + server.GetServers().Remove(func(s2 *server.Server) bool { + return s.Id() == s2.Id() + }) + } else { + i.Server().SetTransferring(false) + i.Server().Events().Publish(server.TransferStatusEvent, "success") + } + }(i.Server()) + + data.log().Info("downloading server archive from current server node") + sendTransferLog("Received incoming transfer from Panel, attempting to download archive from source node...") + res, err := data.downloadArchive() + if err != nil { + sendTransferLog("Failed to retrieve server archive from remote node: " + err.Error()) + data.log().WithField("error", err).Error("failed to download archive for server transfer") return } defer res.Body.Close() - - // Handle non-200 status codes. if res.StatusCode != 200 { - _, err := ioutil.ReadAll(res.Body) - if err != nil { - l.WithField("error", err).WithField("status", res.StatusCode).Error("failed read transfer response body") - return - } - - l.WithField("error", err).WithField("status", res.StatusCode).Error("failed to request server archive") + data.log().WithField("error", err).WithField("status", res.StatusCode).Error("unexpected error response from transfer endpoint") return } - // Get the path to the archive. - archivePath := filepath.Join(config.Get().System.ArchiveDirectory, serverID+".tar.gz") - - // Check if the archive already exists and delete it if it does. - if _, err = os.Stat(archivePath); err != nil { - if !os.IsNotExist(err) { - l.WithField("error", err).Error("failed to stat archive file") - return - } - } else if err := os.Remove(archivePath); err != nil { - l.WithField("error", err).Warn("failed to remove old archive file") + size := res.ContentLength + if size == 0 { + data.log().WithField("error", err).Error("recieved an archive response with Content-Length of 0") return } - - // Create the file. - file, err := os.Create(archivePath) + sendTransferLog("Got server archive response from remote node. (Content-Length: " + strconv.Itoa(int(size)) + ")") + sendTransferLog("Creating local archive file...") + file, err := data.createArchiveFile() if err != nil { - l.WithField("error", err).Error("failed to open archive on disk") + data.log().WithField("error", err).Error("failed to create archive file on local filesystem") return } - l.Info("writing transfer archive to disk..") + sendTransferLog("Writing archive to disk...") + data.log().Info("writing transfer archive to disk..") + // Copy the file. - buf := make([]byte, 1024*4) - _, err = io.CopyBuffer(file, res.Body, buf) - if err != nil { - l.WithField("error", err).Error("failed to copy archive file to disk") - return - } - - // Close the file so it can be opened to verify the checksum. - if err := file.Close(); err != nil { - l.WithField("error", err).Error("failed to close archive file") - return - } - l.Info("finished writing transfer archive to disk") - - // Whenever the transfer fails or succeeds, delete the temporary transfer archive. - defer func() { - log.WithField("server", serverID).Debug("deleting temporary transfer archive..") - if err := os.Remove(archivePath); err != nil && !os.IsNotExist(err) { - l.WithFields(log.Fields{ - "server": serverID, - "error": err, - }).Warn("failed to delete transfer archive") - } else { - l.Debug("deleted temporary transfer archive successfully") + progress := &downloadProgress{size: size} + ticker := time.NewTicker(3 * time.Second) + go func(progress *downloadProgress, t *time.Ticker) { + for range ticker.C { + // p = 100 (Downloaded) + // size = 1000 (Content-Length) + // p / size = 0.1 + // * 100 = 10% (Multiply by 100 to get a percentage of the download) + // 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks) + // 2.5 (Number of ticks as a float64) + // 2 (convert to an integer) + p := atomic.LoadInt64(&progress.progress) + // We have to cast these numbers to float in order to get a float result from the division. + width := ((float64(p) / float64(size)) * 100) / tickPercentage + bar := strings.Repeat("=", int(width)) + strings.Repeat(" ", ticks-int(width)) + sendTransferLog("Downloading [" + bar + "] " + system.FormatBytes(p) + " / " + system.FormatBytes(progress.size)) } - }() + }(progress, ticker) - l.Info("server transfer archive downloaded, computing checksum...") - - // Open the archive file for computing a checksum. - file, err = os.Open(archivePath) - if err != nil { - l.WithField("error", err).Error("failed to open archive on disk") - return + var reader io.Reader = res.Body + downloadLimit := float64(config.Get().System.Transfers.DownloadLimit) * 1024 * 1024 + if downloadLimit > 0 { + // Wrap the body with a reader that is limited to the defined download limit speed. + reader = ratelimit.Reader(res.Body, ratelimit.NewBucketWithRate(downloadLimit, int64(downloadLimit))) } - // Compute the sha256 checksum of the file. - hash := sha256.New() - buf = make([]byte, 1024*4) - if _, err := io.CopyBuffer(hash, file, buf); err != nil { - l.WithField("error", err).Error("failed to copy archive file for checksum verification") + buf := make([]byte, 1024*4) + if _, err := io.CopyBuffer(file, io.TeeReader(reader, progress), buf); err != nil { + ticker.Stop() + sendTransferLog("Failed while writing archive file to disk: " + err.Error()) + data.log().WithField("error", err).Error("failed to copy archive file to disk") return } + ticker.Stop() - checksum := hex.EncodeToString(hash.Sum(nil)) - l.WithField("checksum", checksum).Info("computed checksum of transfer archive") + // Show 100% completion. + humanSize := system.FormatBytes(progress.size) + sendTransferLog("Downloading [" + strings.Repeat("=", ticks) + "] " + humanSize + " / " + humanSize) - // Verify the two checksums. - if checksum != res.Header.Get("X-Checksum") { - l.WithField("source_checksum", res.Header.Get("X-Checksum")).Error("checksum verification failed for archive") - return - } - - // Close the file. if err := file.Close(); err != nil { - l.WithField("error", err).Error("failed to close archive file after calculating checksum") + data.log().WithField("error", err).Error("unable to close archive file on local filesystem") + return + } + data.log().Info("finished writing transfer archive to disk") + sendTransferLog("Successfully wrote archive to disk.") + + // Whenever the transfer fails or succeeds, delete the temporary transfer archive that + // was created on the disk. + defer data.removeArchivePath() + + sendTransferLog("Verifying checksum of downloaded archive...") + data.log().Info("computing checksum of downloaded archive file") + expected := res.Header.Get("X-Checksum") + if matches, computed, err := data.verifyChecksum(expected); err != nil { + data.log().WithField("error", err).Error("encountered an error while calculating local filesystem archive checksum") + return + } else if !matches { + sendTransferLog("@@@@@ CHECKSUM VERIFICATION FAILED @@@@@") + sendTransferLog(" - Source Checksum: " + expected) + sendTransferLog(" - Computed Checksum: " + computed) + data.log().WithField("expected_sum", expected).WithField("computed_checksum", computed).Error("checksum mismatch when verifying integrity of local archive") return } - l.Info("server archive transfer checksums have been validated, creating server environment..") - - // Get the server data from the request. - serverData, t, _, _ := jsonparser.Get(data, "server") - if t != jsonparser.Object { - l.Error("invalid server data passed in request") - return - } - - // Create a new server installer (note this does not execute the install script) - i, err := installer.New(serverData) - if err != nil { - l.WithField("error", err).Error("failed to validate received server data") - return - } - - // Add the server to the collection. - server.GetServers().Add(i.Server()) - - // Create the server's environment (note this does not execute the install script) + // Create the server's environment. + sendTransferLog("Creating server environment, this could take a while..") + data.log().Info("creating server environment") if err := i.Server().CreateEnvironment(); err != nil { - l.WithField("error", err).Error("failed to create server environment") + data.log().WithField("error", err).Error("failed to create server environment") return } - l.Info("server environment configured, extracting transfer archive..") - // Extract the transfer archive. - if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem().Path()); err != nil { - l.WithField("error", err).Error("failed to extract server archive") + sendTransferLog("Server environment has been created, extracting transfer archive..") + data.log().Info("server environment configured, extracting transfer archive") + if err := archiver.NewTarGz().Unarchive(data.path(), i.Server().Filesystem().Path()); err != nil { + // Unarchiving failed, delete the server's data directory. + if err := os.RemoveAll(i.Server().Filesystem().Path()); err != nil && !os.IsNotExist(err) { + data.log().WithField("error", err).Warn("failed to delete local server files directory") + } + data.log().WithField("error", err).Error("failed to extract server archive") return } @@ -322,23 +449,9 @@ func postTransfer(c *gin.Context) { // It may be useful to retry sending the transfer success every so often just in case of a small // hiccup or the fix of whatever error causing the success request to fail. hasError = false - - l.Info("server transfer archive has been extracted, notifying panel..") - - // Notify the panel that the transfer succeeded. - err = api.New().SendTransferSuccess(serverID) - if err != nil { - if !api.IsRequestError(err) { - l.WithField("error", err).Error("failed to notify panel of transfer success") - return - } - - l.WithField("error", err.Error()).Error("panel responded with error after transfer success") - return - } - - l.Info("successfully notified panel of transfer success") - }(buf.Bytes()) + data.log().Info("archive transfered successfully, notifying panel of status") + sendTransferLog("Archive transfered successfully.") + }(&data) c.Status(http.StatusAccepted) } diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index 3856714..40bb5ce 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -45,6 +45,8 @@ var e = []string{ server.InstallCompletedEvent, server.DaemonMessageEvent, server.BackupCompletedEvent, + server.TransferLogsEvent, + server.TransferStatusEvent, } // Listens for different events happening on a server and sends them along diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index dd74484..f9f3313 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -28,6 +28,7 @@ const ( PermissionSendPowerRestart = "control.restart" PermissionReceiveErrors = "admin.websocket.errors" PermissionReceiveInstall = "admin.websocket.install" + PermissionReceiveTransfer = "admin.websocket.transfer" PermissionReceiveBackups = "backup.read" ) @@ -149,6 +150,13 @@ func (h *Handler) SendJson(v *Message) error { return nil } } + + // If we are sending transfer output, only send it to the user if they have the required permissions. + if v.Event == server.TransferLogsEvent { + if !j.HasPermission(PermissionReceiveTransfer) { + return nil + } + } } if err := h.unsafeSendJson(v); err != nil { @@ -320,13 +328,15 @@ func (h *Handler) HandleInbound(m Message) error { // Only send the current disk usage if the server is offline, if docker container is running, // Environment#EnableResourcePolling() will send this data to all clients. if state == environment.ProcessOfflineState { - _ = h.server.Filesystem().HasSpaceAvailable(false) + if !h.server.IsInstalling() && !h.server.IsTransferring() { + _ = h.server.Filesystem().HasSpaceAvailable(false) - b, _ := json.Marshal(h.server.Proc()) - h.SendJson(&Message{ - Event: server.StatsEvent, - Args: []string{string(b)}, - }) + b, _ := json.Marshal(h.server.Proc()) + h.SendJson(&Message{ + Event: server.StatsEvent, + Args: []string{string(b)}, + }) + } } return nil diff --git a/server/backup/backup_s3.go b/server/backup/backup_s3.go index abcd417..cbb6ed3 100644 --- a/server/backup/backup_s3.go +++ b/server/backup/backup_s3.go @@ -130,7 +130,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error { } } - l.Info("backup has been successfully uploaded") + l.WithField("parts", partCount).Info("backup has been successfully uploaded") return nil } diff --git a/server/collection.go b/server/collection.go index c675024..5b7e46d 100644 --- a/server/collection.go +++ b/server/collection.go @@ -60,6 +60,8 @@ func (c *Collection) Find(filter func(*Server) bool) *Server { } // Removes all items from the collection that match the filter function. +// +// TODO: cancel the context? func (c *Collection) Remove(filter func(*Server) bool) { c.Lock() defer c.Unlock() diff --git a/server/errors.go b/server/errors.go index 826052e..ffdc7fa 100644 --- a/server/errors.go +++ b/server/errors.go @@ -4,8 +4,12 @@ import ( "emperror.dev/errors" ) -var ErrIsRunning = errors.New("server is running") -var ErrSuspended = errors.New("server is currently in a suspended state") +var ( + ErrIsRunning = errors.New("server is running") + ErrSuspended = errors.New("server is currently in a suspended state") + ErrServerIsInstalling = errors.New("server is currently installing") + ErrServerIsTransferring = errors.New("server is currently being transferred") +) type crashTooFrequent struct { } diff --git a/server/events.go b/server/events.go index 288e012..971f4a0 100644 --- a/server/events.go +++ b/server/events.go @@ -15,6 +15,8 @@ const ( StatusEvent = "status" StatsEvent = "stats" BackupCompletedEvent = "backup completed" + TransferLogsEvent = "transfer logs" + TransferStatusEvent = "transfer status" ) // Returns the server's emitter instance. diff --git a/server/install.go b/server/install.go index 394eb69..78e9799 100644 --- a/server/install.go +++ b/server/install.go @@ -144,7 +144,8 @@ func (s *Server) acquireInstallationLock() error { s.installer.sem = semaphore.NewWeighted(1) } - ctx, _ := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() return s.installer.sem.Acquire(ctx, 1) } @@ -168,6 +169,14 @@ func (s *Server) IsInstalling() bool { return true } +func (s *Server) IsTransferring() bool { + return s.transferring.Get() +} + +func (s *Server) SetTransferring(state bool) { + s.transferring.Set(state) +} + // Removes the installer container for the server. func (ip *InstallationProcess) RemoveContainer() { err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ diff --git a/server/power.go b/server/power.go index 3a52df9..0b4713c 100644 --- a/server/power.go +++ b/server/power.go @@ -61,6 +61,14 @@ func (s *Server) ExecutingPowerAction() bool { // function rather than making direct calls to the start/stop/restart functions on the // environment struct. func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error { + if s.IsInstalling() { + return ErrServerIsInstalling + } + + if s.IsTransferring() { + return ErrServerIsTransferring + } + if s.powerLock == nil { s.powerLock = semaphore.NewWeighted(1) } diff --git a/server/server.go b/server/server.go index 52cb03f..4d9c2f9 100644 --- a/server/server.go +++ b/server/server.go @@ -12,6 +12,7 @@ import ( "github.com/pterodactyl/wings/environment/docker" "github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/server/filesystem" + "github.com/pterodactyl/wings/system" "golang.org/x/sync/semaphore" "strings" "sync" @@ -56,6 +57,8 @@ type Server struct { // installer process is still running. installer InstallerDetails + transferring system.AtomicBool + // The console throttler instance used to control outputs. throttler *ConsoleThrottler diff --git a/system/utils.go b/system/utils.go index 2a7fe67..5e97735 100644 --- a/system/utils.go +++ b/system/utils.go @@ -2,6 +2,7 @@ package system import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -23,6 +24,18 @@ func Every(ctx context.Context, d time.Duration, work func(t time.Time)) { }() } +func FormatBytes(b int64) string { + if b < 1024 { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(1024), 0 + for n := b / 1024; n >= 1024; n /= 1024 { + div *= 1024 + exp++ + } + return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) +} + type AtomicBool struct { flag uint32 }