Add transfer logging output (#77)

Co-authored-by: Dane Everitt <dane@daneeveritt.com>
This commit is contained in:
Matthew Penner 2020-12-25 14:32:41 -07:00 committed by GitHub
parent 901ab1157d
commit 5c78cb9ab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 398 additions and 227 deletions

View File

@ -189,22 +189,15 @@ func (r *Request) SendArchiveStatus(uuid string, successful bool) error {
return resp.Error() return resp.Error()
} }
func (r *Request) SendTransferFailure(uuid string) error { func (r *Request) SendTransferStatus(uuid string, successful bool) error {
resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/failure", uuid), nil) state := "failure"
if successful {
state = "success"
}
resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/%s", uuid, state), nil)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
return resp.Error()
}
func (r *Request) SendTransferSuccess(uuid string) error {
resp, err := r.Get(fmt.Sprintf("/servers/%s/transfer/success", uuid), nil)
if err != nil {
return err
}
defer resp.Body.Close()
return resp.Error() return resp.Error()
} }

View File

@ -77,6 +77,8 @@ type SystemConfiguration struct {
CrashDetection CrashDetection `yaml:"crash_detection"` CrashDetection CrashDetection `yaml:"crash_detection"`
Backups Backups `yaml:"backups"` Backups Backups `yaml:"backups"`
Transfers Transfers `yaml:"transfers"`
} }
type CrashDetection struct { type CrashDetection struct {
@ -97,12 +99,22 @@ type Backups struct {
// upload it to any external storage provider. // upload it to any external storage provider.
// //
// If the value is less than 1, the write speed is unlimited, // If the value is less than 1, the write speed is unlimited,
// if the value is greater than 0, the write speed is the value in MB/s. // if the value is greater than 0, the write speed is the value in MiB/s.
// //
// Defaults to 0 (unlimited) // Defaults to 0 (unlimited)
WriteLimit int `default:"0" yaml:"write_limit"` WriteLimit int `default:"0" yaml:"write_limit"`
} }
type Transfers struct {
// DownloadLimit imposes a Network I/O read limit when downloading a transfer archive.
//
// If the value is less than 1, the write speed is unlimited,
// if the value is greater than 0, the write speed is the value in MiB/s.
//
// Defaults to 0 (unlimited)
DownloadLimit int `default:"0" yaml:"download_limit"`
}
// Ensures that all of the system directories exist on the system. These directories are // Ensures that all of the system directories exist on the system. These directories are
// created so that only the owner can read the data, and no other users. // created so that only the owner can read the data, and no other users.
func (sc *SystemConfiguration) ConfigureDirectories() error { func (sc *SystemConfiguration) ConfigureDirectories() error {

View File

@ -22,6 +22,6 @@ type ConsoleThrottles struct {
DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"` DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"`
// The amount of time that a server is allowed to be stopping for before it is terminated // The amount of time that a server is allowed to be stopping for before it is terminated
// forfully if it triggers output throttles. // forcefully if it triggers output throttles.
StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"` StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"`
} }

View File

@ -147,8 +147,8 @@ func (e *Environment) Stop() error {
} }
t := time.Second * 30 t := time.Second * 30
err := e.client.ContainerStop(context.Background(), e.Id, &t)
if err != nil { if err := e.client.ContainerStop(context.Background(), e.Id, &t); err != nil {
// If the container does not exist just mark the process as stopped and return without // If the container does not exist just mark the process as stopped and return without
// an error. // an error.
if client.IsErrNotFound(err) { if client.IsErrNotFound(err) {

View File

@ -24,14 +24,14 @@ func getServerWebsocket(c *gin.Context) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// Track this open connection on the server so that we can close them all programtically // Track this open connection on the server so that we can close them all programmatically
// if the server is deleted. // if the server is deleted.
s.Websockets().Push(handler.Uuid(), &cancel) s.Websockets().Push(handler.Uuid(), &cancel)
defer s.Websockets().Remove(handler.Uuid()) defer s.Websockets().Remove(handler.Uuid())
// Listen for the context being canceled and then close the websocket connection. This normally // Listen for the context being canceled and then close the websocket connection. This normally
// just happens because you're disconnecting from the socket in the browser, however in some // just happens because you're disconnecting from the socket in the browser, however in some
// cases we close the connections programatically (e.g. deleting the server) and need to send // cases we close the connections programmatically (e.g. deleting the server) and need to send
// a close message to the websocket so it disconnects. // a close message to the websocket so it disconnects.
go func(ctx context.Context, c *ws.Conn) { go func(ctx context.Context, c *ws.Conn) {
ListenerLoop: ListenerLoop:

View File

@ -2,28 +2,52 @@ package router
import ( import (
"bufio" "bufio"
"bytes"
"crypto/sha256" "crypto/sha256"
"emperror.dev/errors" "emperror.dev/errors"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt"
"github.com/apex/log" "github.com/apex/log"
"github.com/buger/jsonparser"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/juju/ratelimit"
"github.com/mholt/archiver/v3" "github.com/mholt/archiver/v3"
"github.com/mitchellh/colorstring"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/installer" "github.com/pterodactyl/wings/installer"
"github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/system"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time"
) )
// Number of ticks in the progress bar
const ticks = 25
// 100% / number of ticks = percentage represented by each tick
const tickPercentage = 100 / ticks
type downloadProgress struct {
size int64
progress int64
}
// Data passed over to initiate a server transfer.
type serverTransferRequest struct {
ServerID string `binding:"required" json:"server_id"`
URL string `binding:"required" json:"url"`
Token string `binding:"required" json:"token"`
Server json.RawMessage `json:"server"`
}
// Returns the archive for a server so that it can be transfered to a new node.
func getServerArchive(c *gin.Context) { func getServerArchive(c *gin.Context) {
auth := strings.SplitN(c.GetHeader("Authorization"), " ", 2) auth := strings.SplitN(c.GetHeader("Authorization"), " ", 2)
@ -41,22 +65,20 @@ func getServerArchive(c *gin.Context) {
return return
} }
if token.Subject != c.Param("server") { s := ExtractServer(c)
if token.Subject != s.Id() {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "( .. •˘___˘• .. )", "error": "Missing required token subject, or subject is not valid for the requested server.",
}) })
return return
} }
s := GetServer(c.Param("server"))
st, err := s.Archiver.Stat() st, err := s.Archiver.Stat()
if err != nil { if err != nil {
if !errors.Is(err, os.ErrNotExist) { if !errors.Is(err, os.ErrNotExist) {
NewServerError(err, s).SetMessage("failed to stat archive").Abort(c) WithError(c, err)
return return
} }
c.AbortWithStatus(http.StatusNotFound) c.AbortWithStatus(http.StatusNotFound)
return return
} }
@ -69,14 +91,7 @@ func getServerArchive(c *gin.Context) {
file, err := os.Open(s.Archiver.Path()) file, err := os.Open(s.Archiver.Path())
if err != nil { if err != nil {
tserr := NewServerError(err, s) WithError(c, err)
if !os.IsNotExist(err) {
tserr.SetMessage("failed to open archive for reading")
} else {
tserr.SetMessage("failed to open archive")
}
tserr.Abort(c)
return return
} }
defer file.Close() defer file.Close()
@ -91,65 +106,21 @@ func getServerArchive(c *gin.Context) {
} }
func postServerArchive(c *gin.Context) { func postServerArchive(c *gin.Context) {
s := GetServer(c.Param("server")) s := ExtractServer(c)
go func(s *server.Server) { go func(s *server.Server) {
r := api.New() r := api.New()
l := log.WithField("server", s.Id()) l := log.WithField("server", s.Id())
// Attempt to get an archive of the server. This **WILL NOT** modify the source files of a server, // This function automatically adds the Source Node prefix and Timestamp to the log
// this process is 100% safe and will not corrupt a server's files if it fails. // output before sending it over the websocket.
if err := s.Archiver.Archive(); err != nil { sendTransferLog := func(data string) {
l.WithField("error", err).Error("failed to get transfer archive for server") output := colorstring.Color(fmt.Sprintf("[yellow][bold]%s [Pterodactyl Transfer System] [Source Node]:[default] %s", time.Now().Format(time.RFC1123), data))
s.Events().Publish(server.TransferLogsEvent, output)
if err := r.SendArchiveStatus(s.Id(), false); err != nil {
if !api.IsRequestError(err) {
l.WithField("error", err).Error("failed to notify panel of failed archive status")
return
} }
l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a failed archive status") s.Events().Publish(server.TransferStatusEvent, "starting")
return sendTransferLog("Attempting to archive server...")
}
l.Info("successfully notified panel of failed archive status")
return
}
l.Info("successfully created server transfer archive, notifying panel..")
if err := r.SendArchiveStatus(s.Id(), true); err != nil {
if !api.IsRequestError(err) {
l.WithField("error", err).Error("failed to notify panel of successful archive status")
return
}
l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a successful archive status")
return
}
l.Info("successfully notified panel of successful transfer archive status")
}(s)
c.Status(http.StatusAccepted)
}
func postTransfer(c *gin.Context) {
var buf bytes.Buffer
if _, err := buf.ReadFrom(c.Request.Body); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
go func(data []byte) {
serverID, _ := jsonparser.GetString(data, "server_id")
url, _ := jsonparser.GetString(data, "url")
token, _ := jsonparser.GetString(data, "token")
l := log.WithField("server", serverID)
l.Info("incoming transfer for server")
// Create an http client with no timeout.
client := &http.Client{Timeout: 0}
hasError := true hasError := true
defer func() { defer func() {
@ -157,162 +128,318 @@ func postTransfer(c *gin.Context) {
return return
} }
l.Info("server transfer failed, notifying panel") s.Events().Publish(server.TransferStatusEvent, "failure")
if err := api.New().SendTransferFailure(serverID); err != nil {
sendTransferLog("Attempting to notify panel of archive failure..")
if err := r.SendArchiveStatus(s.Id(), false); err != nil {
if !api.IsRequestError(err) { if !api.IsRequestError(err) {
l.WithField("error", err).Error("failed to notify panel with transfer failure") sendTransferLog("Failed to notify panel of archive failure: " + err.Error())
l.WithField("error", err).Error("failed to notify panel of failed archive status")
return return
} }
l.WithField("error", err.Error()).Error("received error response from panel while notifying of transfer failure") sendTransferLog("Panel returned an error while notifying it of a failed archive: " + err.Error())
l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a failed archive status")
return return
} }
l.Debug("notified panel of transfer failure") sendTransferLog("Successfully notified panel of failed archive status")
l.Info("successfully notified panel of failed archive status")
}() }()
// Make a new GET request to the URL the panel gave us. // Mark the server as transferring to prevent problems.
req, err := http.NewRequest("GET", url, nil) s.SetTransferring(true)
if err != nil {
log.WithField("error", err).Error("failed to create http request for archive transfer") // Ensure the server is offline. Sometimes a "No such container" error gets through
// which means the server is already stopped. We can ignore that.
if err := s.Environment.WaitForStop(60, false); err != nil && !strings.Contains(strings.ToLower(err.Error()), "no such container") {
sendTransferLog("Failed to stop server, aborting transfer..")
l.WithField("error", err).Error("failed to stop server")
return return
} }
// Add the authorization header. // Attempt to get an archive of the server.
req.Header.Set("Authorization", token) if err := s.Archiver.Archive(); err != nil {
sendTransferLog("An error occurred while archiving the server: " + err.Error())
l.WithField("error", err).Error("failed to get transfer archive for server")
return
}
l.Info("requesting archive for server transfer..") sendTransferLog("Successfully created archive, attempting to notify panel..")
// Execute the http request. l.Info("successfully created server transfer archive, notifying panel..")
if err := r.SendArchiveStatus(s.Id(), true); err != nil {
if !api.IsRequestError(err) {
sendTransferLog("Failed to notify panel of archive success: " + err.Error())
l.WithField("error", err).Error("failed to notify panel of successful archive status")
return
}
sendTransferLog("Panel returned an error while notifying it of a successful archive: " + err.Error())
l.WithField("error", err.Error()).Error("panel returned an error when notifying it of a successful archive status")
return
}
hasError = false
// This log may not be displayed by the client due to the status event being sent before or at the same time.
sendTransferLog("Successfully notified panel of successful archive status")
l.Info("successfully notified panel of successful transfer archive status")
s.Events().Publish(server.TransferStatusEvent, "archived")
}(s)
c.Status(http.StatusAccepted)
}
func (w *downloadProgress) Write(v []byte) (int, error) {
n := len(v)
atomic.AddInt64(&w.progress, int64(n))
return n, nil
}
// Log helper function to attach all errors and info output to a consistently formatted
// log string for easier querying.
func (str serverTransferRequest) log() *log.Entry {
return log.WithField("subsystem", "transfers").WithField("server_id", str.ServerID)
}
// Downloads an archive from the machine that the server currently lives on.
func (str serverTransferRequest) downloadArchive() (*http.Response, error) {
client := http.Client{Timeout: 0}
req, err := http.NewRequest(http.MethodGet, str.URL, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", str.Token)
res, err := client.Do(req) res, err := client.Do(req)
if err != nil { if err != nil {
l.WithField("error", err).Error("failed to send archive http request") return nil, err
}
return res, nil
}
// Returns the path to the local archive on the system.
func (str serverTransferRequest) path() string {
return filepath.Join(config.Get().System.ArchiveDirectory, str.ServerID+".tar.gz")
}
// Creates the archive location on this machine by first checking that the required file
// does not already exist. If it does exist, the file is deleted and then re-created as
// an empty file.
func (str serverTransferRequest) createArchiveFile() (*os.File, error) {
p := str.path()
if _, err := os.Stat(p); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else if err := os.Remove(p); err != nil {
return nil, err
}
return os.Create(p)
}
// Deletes the archive from the local filesystem. This is executed as a deferred function.
func (str serverTransferRequest) removeArchivePath() {
p := str.path()
str.log().Debug("deleting temporary transfer archive")
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
str.log().WithField("path", p).WithField("error", err).Error("failed to delete temporary transfer archive file")
return
}
str.log().Debug("deleted temporary transfer archive successfully")
}
// Verifies that the SHA-256 checksum of the file on the local filesystem matches the
// expected value from the transfer request. The string value returned is the computed
// checksum on the system.
func (str serverTransferRequest) verifyChecksum(matches string) (bool, string, error) {
file, err := os.Open(str.path())
if err != nil {
return false, "", err
}
defer file.Close()
hash := sha256.New()
buf := make([]byte, 1024*4)
if _, err := io.CopyBuffer(hash, file, buf); err != nil {
return false, "", err
}
checksum := hex.EncodeToString(hash.Sum(nil))
return checksum == matches, checksum, nil
}
// Sends a notification to the Panel letting it know what the status of this transfer is.
func (str serverTransferRequest) sendTransferStatus(successful bool) error {
lg := str.log().WithField("transfer_successful", successful)
lg.Info("notifying Panel of server transfer state")
if err := api.New().SendTransferStatus(str.ServerID, successful); err != nil {
lg.WithField("error", err).Error("error notifying panel of transfer state")
return err
}
lg.Debug("notified panel of transfer state")
return nil
}
// Initiates a transfer between two nodes for a server by downloading an archive from the
// remote node and then applying the server details to this machine.
func postTransfer(c *gin.Context) {
var data serverTransferRequest
if err := c.BindJSON(&data); err != nil {
return
}
data.log().Info("handling incoming server transfer request")
go func(data *serverTransferRequest) {
hasError := true
defer func() {
_ = data.sendTransferStatus(!hasError)
}()
// Create a new server installer. This will only configure the environment and not
// run the installer scripts.
i, err := installer.New(data.Server)
if err != nil {
data.log().WithField("error", err).Error("failed to validate received server data")
return
}
// This function automatically adds the Target Node prefix and Timestamp to the log output before sending it
// over the websocket.
sendTransferLog := func(data string) {
output := colorstring.Color(fmt.Sprintf("[yellow][bold]%s [Pterodactyl Transfer System] [Target Node]:[default] %s", time.Now().Format(time.RFC1123), data))
i.Server().Events().Publish(server.TransferLogsEvent, output)
}
// Mark the server as transferring to prevent problems later on during the process and
// then push the server into the global server collection for this instance.
i.Server().SetTransferring(true)
server.GetServers().Add(i.Server())
defer func(s *server.Server) {
// In the event that this transfer call fails, remove the server from the global
// server tracking so that we don't have a dangling instance.
if hasError {
sendTransferLog("Server transfer failed, check Wings logs for additional information.")
s.Events().Publish(server.TransferStatusEvent, "failure")
server.GetServers().Remove(func(s2 *server.Server) bool {
return s.Id() == s2.Id()
})
} else {
i.Server().SetTransferring(false)
i.Server().Events().Publish(server.TransferStatusEvent, "success")
}
}(i.Server())
data.log().Info("downloading server archive from current server node")
sendTransferLog("Received incoming transfer from Panel, attempting to download archive from source node...")
res, err := data.downloadArchive()
if err != nil {
sendTransferLog("Failed to retrieve server archive from remote node: " + err.Error())
data.log().WithField("error", err).Error("failed to download archive for server transfer")
return return
} }
defer res.Body.Close() defer res.Body.Close()
// Handle non-200 status codes.
if res.StatusCode != 200 { if res.StatusCode != 200 {
_, err := ioutil.ReadAll(res.Body) data.log().WithField("error", err).WithField("status", res.StatusCode).Error("unexpected error response from transfer endpoint")
return
}
size := res.ContentLength
if size == 0 {
data.log().WithField("error", err).Error("recieved an archive response with Content-Length of 0")
return
}
sendTransferLog("Got server archive response from remote node. (Content-Length: " + strconv.Itoa(int(size)) + ")")
sendTransferLog("Creating local archive file...")
file, err := data.createArchiveFile()
if err != nil { if err != nil {
l.WithField("error", err).WithField("status", res.StatusCode).Error("failed read transfer response body") data.log().WithField("error", err).Error("failed to create archive file on local filesystem")
return return
} }
l.WithField("error", err).WithField("status", res.StatusCode).Error("failed to request server archive") sendTransferLog("Writing archive to disk...")
return data.log().Info("writing transfer archive to disk..")
}
// Get the path to the archive.
archivePath := filepath.Join(config.Get().System.ArchiveDirectory, serverID+".tar.gz")
// Check if the archive already exists and delete it if it does.
if _, err = os.Stat(archivePath); err != nil {
if !os.IsNotExist(err) {
l.WithField("error", err).Error("failed to stat archive file")
return
}
} else if err := os.Remove(archivePath); err != nil {
l.WithField("error", err).Warn("failed to remove old archive file")
return
}
// Create the file.
file, err := os.Create(archivePath)
if err != nil {
l.WithField("error", err).Error("failed to open archive on disk")
return
}
l.Info("writing transfer archive to disk..")
// Copy the file. // Copy the file.
progress := &downloadProgress{size: size}
ticker := time.NewTicker(3 * time.Second)
go func(progress *downloadProgress, t *time.Ticker) {
for range ticker.C {
// p = 100 (Downloaded)
// size = 1000 (Content-Length)
// p / size = 0.1
// * 100 = 10% (Multiply by 100 to get a percentage of the download)
// 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks)
// 2.5 (Number of ticks as a float64)
// 2 (convert to an integer)
p := atomic.LoadInt64(&progress.progress)
// We have to cast these numbers to float in order to get a float result from the division.
width := ((float64(p) / float64(size)) * 100) / tickPercentage
bar := strings.Repeat("=", int(width)) + strings.Repeat(" ", ticks-int(width))
sendTransferLog("Downloading [" + bar + "] " + system.FormatBytes(p) + " / " + system.FormatBytes(progress.size))
}
}(progress, ticker)
var reader io.Reader = res.Body
downloadLimit := float64(config.Get().System.Transfers.DownloadLimit) * 1024 * 1024
if downloadLimit > 0 {
// Wrap the body with a reader that is limited to the defined download limit speed.
reader = ratelimit.Reader(res.Body, ratelimit.NewBucketWithRate(downloadLimit, int64(downloadLimit)))
}
buf := make([]byte, 1024*4) buf := make([]byte, 1024*4)
_, err = io.CopyBuffer(file, res.Body, buf) if _, err := io.CopyBuffer(file, io.TeeReader(reader, progress), buf); err != nil {
if err != nil { ticker.Stop()
l.WithField("error", err).Error("failed to copy archive file to disk") sendTransferLog("Failed while writing archive file to disk: " + err.Error())
data.log().WithField("error", err).Error("failed to copy archive file to disk")
return return
} }
ticker.Stop()
// Show 100% completion.
humanSize := system.FormatBytes(progress.size)
sendTransferLog("Downloading [" + strings.Repeat("=", ticks) + "] " + humanSize + " / " + humanSize)
// Close the file so it can be opened to verify the checksum.
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
l.WithField("error", err).Error("failed to close archive file") data.log().WithField("error", err).Error("unable to close archive file on local filesystem")
return return
} }
l.Info("finished writing transfer archive to disk") data.log().Info("finished writing transfer archive to disk")
sendTransferLog("Successfully wrote archive to disk.")
// Whenever the transfer fails or succeeds, delete the temporary transfer archive. // Whenever the transfer fails or succeeds, delete the temporary transfer archive that
defer func() { // was created on the disk.
log.WithField("server", serverID).Debug("deleting temporary transfer archive..") defer data.removeArchivePath()
if err := os.Remove(archivePath); err != nil && !os.IsNotExist(err) {
l.WithFields(log.Fields{
"server": serverID,
"error": err,
}).Warn("failed to delete transfer archive")
} else {
l.Debug("deleted temporary transfer archive successfully")
}
}()
l.Info("server transfer archive downloaded, computing checksum...") sendTransferLog("Verifying checksum of downloaded archive...")
data.log().Info("computing checksum of downloaded archive file")
// Open the archive file for computing a checksum. expected := res.Header.Get("X-Checksum")
file, err = os.Open(archivePath) if matches, computed, err := data.verifyChecksum(expected); err != nil {
if err != nil { data.log().WithField("error", err).Error("encountered an error while calculating local filesystem archive checksum")
l.WithField("error", err).Error("failed to open archive on disk") return
} else if !matches {
sendTransferLog("@@@@@ CHECKSUM VERIFICATION FAILED @@@@@")
sendTransferLog(" - Source Checksum: " + expected)
sendTransferLog(" - Computed Checksum: " + computed)
data.log().WithField("expected_sum", expected).WithField("computed_checksum", computed).Error("checksum mismatch when verifying integrity of local archive")
return return
} }
// Compute the sha256 checksum of the file. // Create the server's environment.
hash := sha256.New() sendTransferLog("Creating server environment, this could take a while..")
buf = make([]byte, 1024*4) data.log().Info("creating server environment")
if _, err := io.CopyBuffer(hash, file, buf); err != nil {
l.WithField("error", err).Error("failed to copy archive file for checksum verification")
return
}
checksum := hex.EncodeToString(hash.Sum(nil))
l.WithField("checksum", checksum).Info("computed checksum of transfer archive")
// Verify the two checksums.
if checksum != res.Header.Get("X-Checksum") {
l.WithField("source_checksum", res.Header.Get("X-Checksum")).Error("checksum verification failed for archive")
return
}
// Close the file.
if err := file.Close(); err != nil {
l.WithField("error", err).Error("failed to close archive file after calculating checksum")
return
}
l.Info("server archive transfer checksums have been validated, creating server environment..")
// Get the server data from the request.
serverData, t, _, _ := jsonparser.Get(data, "server")
if t != jsonparser.Object {
l.Error("invalid server data passed in request")
return
}
// Create a new server installer (note this does not execute the install script)
i, err := installer.New(serverData)
if err != nil {
l.WithField("error", err).Error("failed to validate received server data")
return
}
// Add the server to the collection.
server.GetServers().Add(i.Server())
// Create the server's environment (note this does not execute the install script)
if err := i.Server().CreateEnvironment(); err != nil { if err := i.Server().CreateEnvironment(); err != nil {
l.WithField("error", err).Error("failed to create server environment") data.log().WithField("error", err).Error("failed to create server environment")
return return
} }
l.Info("server environment configured, extracting transfer archive..") sendTransferLog("Server environment has been created, extracting transfer archive..")
// Extract the transfer archive. data.log().Info("server environment configured, extracting transfer archive")
if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem().Path()); err != nil { if err := archiver.NewTarGz().Unarchive(data.path(), i.Server().Filesystem().Path()); err != nil {
l.WithField("error", err).Error("failed to extract server archive") // Unarchiving failed, delete the server's data directory.
if err := os.RemoveAll(i.Server().Filesystem().Path()); err != nil && !os.IsNotExist(err) {
data.log().WithField("error", err).Warn("failed to delete local server files directory")
}
data.log().WithField("error", err).Error("failed to extract server archive")
return return
} }
@ -322,23 +449,9 @@ func postTransfer(c *gin.Context) {
// It may be useful to retry sending the transfer success every so often just in case of a small // It may be useful to retry sending the transfer success every so often just in case of a small
// hiccup or the fix of whatever error causing the success request to fail. // hiccup or the fix of whatever error causing the success request to fail.
hasError = false hasError = false
data.log().Info("archive transfered successfully, notifying panel of status")
l.Info("server transfer archive has been extracted, notifying panel..") sendTransferLog("Archive transfered successfully.")
}(&data)
// Notify the panel that the transfer succeeded.
err = api.New().SendTransferSuccess(serverID)
if err != nil {
if !api.IsRequestError(err) {
l.WithField("error", err).Error("failed to notify panel of transfer success")
return
}
l.WithField("error", err.Error()).Error("panel responded with error after transfer success")
return
}
l.Info("successfully notified panel of transfer success")
}(buf.Bytes())
c.Status(http.StatusAccepted) c.Status(http.StatusAccepted)
} }

View File

@ -45,6 +45,8 @@ var e = []string{
server.InstallCompletedEvent, server.InstallCompletedEvent,
server.DaemonMessageEvent, server.DaemonMessageEvent,
server.BackupCompletedEvent, server.BackupCompletedEvent,
server.TransferLogsEvent,
server.TransferStatusEvent,
} }
// Listens for different events happening on a server and sends them along // Listens for different events happening on a server and sends them along

View File

@ -28,6 +28,7 @@ const (
PermissionSendPowerRestart = "control.restart" PermissionSendPowerRestart = "control.restart"
PermissionReceiveErrors = "admin.websocket.errors" PermissionReceiveErrors = "admin.websocket.errors"
PermissionReceiveInstall = "admin.websocket.install" PermissionReceiveInstall = "admin.websocket.install"
PermissionReceiveTransfer = "admin.websocket.transfer"
PermissionReceiveBackups = "backup.read" PermissionReceiveBackups = "backup.read"
) )
@ -149,6 +150,13 @@ func (h *Handler) SendJson(v *Message) error {
return nil return nil
} }
} }
// If we are sending transfer output, only send it to the user if they have the required permissions.
if v.Event == server.TransferLogsEvent {
if !j.HasPermission(PermissionReceiveTransfer) {
return nil
}
}
} }
if err := h.unsafeSendJson(v); err != nil { if err := h.unsafeSendJson(v); err != nil {
@ -320,6 +328,7 @@ func (h *Handler) HandleInbound(m Message) error {
// Only send the current disk usage if the server is offline, if docker container is running, // Only send the current disk usage if the server is offline, if docker container is running,
// Environment#EnableResourcePolling() will send this data to all clients. // Environment#EnableResourcePolling() will send this data to all clients.
if state == environment.ProcessOfflineState { if state == environment.ProcessOfflineState {
if !h.server.IsInstalling() && !h.server.IsTransferring() {
_ = h.server.Filesystem().HasSpaceAvailable(false) _ = h.server.Filesystem().HasSpaceAvailable(false)
b, _ := json.Marshal(h.server.Proc()) b, _ := json.Marshal(h.server.Proc())
@ -328,6 +337,7 @@ func (h *Handler) HandleInbound(m Message) error {
Args: []string{string(b)}, Args: []string{string(b)},
}) })
} }
}
return nil return nil
} }

View File

@ -130,7 +130,7 @@ func (s *S3Backup) generateRemoteRequest(rc io.ReadCloser) error {
} }
} }
l.Info("backup has been successfully uploaded") l.WithField("parts", partCount).Info("backup has been successfully uploaded")
return nil return nil
} }

View File

@ -60,6 +60,8 @@ func (c *Collection) Find(filter func(*Server) bool) *Server {
} }
// Removes all items from the collection that match the filter function. // Removes all items from the collection that match the filter function.
//
// TODO: cancel the context?
func (c *Collection) Remove(filter func(*Server) bool) { func (c *Collection) Remove(filter func(*Server) bool) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()

View File

@ -4,8 +4,12 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
) )
var ErrIsRunning = errors.New("server is running") var (
var ErrSuspended = errors.New("server is currently in a suspended state") ErrIsRunning = errors.New("server is running")
ErrSuspended = errors.New("server is currently in a suspended state")
ErrServerIsInstalling = errors.New("server is currently installing")
ErrServerIsTransferring = errors.New("server is currently being transferred")
)
type crashTooFrequent struct { type crashTooFrequent struct {
} }

View File

@ -15,6 +15,8 @@ const (
StatusEvent = "status" StatusEvent = "status"
StatsEvent = "stats" StatsEvent = "stats"
BackupCompletedEvent = "backup completed" BackupCompletedEvent = "backup completed"
TransferLogsEvent = "transfer logs"
TransferStatusEvent = "transfer status"
) )
// Returns the server's emitter instance. // Returns the server's emitter instance.

View File

@ -144,7 +144,8 @@ func (s *Server) acquireInstallationLock() error {
s.installer.sem = semaphore.NewWeighted(1) s.installer.sem = semaphore.NewWeighted(1)
} }
ctx, _ := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return s.installer.sem.Acquire(ctx, 1) return s.installer.sem.Acquire(ctx, 1)
} }
@ -168,6 +169,14 @@ func (s *Server) IsInstalling() bool {
return true return true
} }
func (s *Server) IsTransferring() bool {
return s.transferring.Get()
}
func (s *Server) SetTransferring(state bool) {
s.transferring.Set(state)
}
// Removes the installer container for the server. // Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() { func (ip *InstallationProcess) RemoveContainer() {
err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{

View File

@ -61,6 +61,14 @@ func (s *Server) ExecutingPowerAction() bool {
// function rather than making direct calls to the start/stop/restart functions on the // function rather than making direct calls to the start/stop/restart functions on the
// environment struct. // environment struct.
func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error { func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error {
if s.IsInstalling() {
return ErrServerIsInstalling
}
if s.IsTransferring() {
return ErrServerIsTransferring
}
if s.powerLock == nil { if s.powerLock == nil {
s.powerLock = semaphore.NewWeighted(1) s.powerLock = semaphore.NewWeighted(1)
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/pterodactyl/wings/environment/docker" "github.com/pterodactyl/wings/environment/docker"
"github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/server/filesystem" "github.com/pterodactyl/wings/server/filesystem"
"github.com/pterodactyl/wings/system"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"strings" "strings"
"sync" "sync"
@ -56,6 +57,8 @@ type Server struct {
// installer process is still running. // installer process is still running.
installer InstallerDetails installer InstallerDetails
transferring system.AtomicBool
// The console throttler instance used to control outputs. // The console throttler instance used to control outputs.
throttler *ConsoleThrottler throttler *ConsoleThrottler

View File

@ -2,6 +2,7 @@ package system
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -23,6 +24,18 @@ func Every(ctx context.Context, d time.Duration, work func(t time.Time)) {
}() }()
} }
func FormatBytes(b int64) string {
if b < 1024 {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(1024), 0
for n := b / 1024; n >= 1024; n /= 1024 {
div *= 1024
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
}
type AtomicBool struct { type AtomicBool struct {
flag uint32 flag uint32
} }