server(transfers): track progress of archive creation and extraction (#143)

This commit is contained in:
Matthew Penner 2022-10-04 20:35:48 -06:00 committed by GitHub
parent 3edec80efa
commit 6fb61261b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 55 deletions

View File

@ -12,7 +12,6 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"emperror.dev/errors" "emperror.dev/errors"
@ -30,19 +29,9 @@ import (
"github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/server/filesystem" "github.com/pterodactyl/wings/server/filesystem"
"github.com/pterodactyl/wings/system"
) )
// Number of ticks in the progress bar const progressWidth = 25
const ticks = 25
// 100% / number of ticks = percentage represented by each tick
const tickPercentage = 100 / ticks
type downloadProgress struct {
size int64
progress int64
}
// Data passed over to initiate a server transfer. // Data passed over to initiate a server transfer.
type serverTransferRequest struct { type serverTransferRequest struct {
@ -95,7 +84,7 @@ func getServerArchive(c *gin.Context) {
return return
} }
// Compute sha1 checksum. // Compute sha256 checksum.
h := sha256.New() h := sha256.New()
f, err := os.Open(archivePath) f, err := os.Open(archivePath)
if err != nil { if err != nil {
@ -184,11 +173,35 @@ func postServerArchive(c *gin.Context) {
return return
} }
// Get the disk usage of the server (used to calculate the progress of the archive process)
rawSize, err := s.Filesystem().DiskUsage(true)
if err != nil {
sendTransferLog("Failed to get disk usage for server, aborting transfer..")
l.WithField("error", err).Error("failed to get disk usage for server")
return
}
// Create an archive of the entire server's data directory. // Create an archive of the entire server's data directory.
a := &filesystem.Archive{ a := &filesystem.Archive{
BasePath: s.Filesystem().Path(), BasePath: s.Filesystem().Path(),
Progress: filesystem.NewProgress(rawSize),
} }
// Send the archive progress to the websocket every 3 seconds.
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
go func(ctx context.Context, p *filesystem.Progress, t *time.Ticker) {
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
sendTransferLog("Archiving " + p.Progress(progressWidth))
}
}
}(ctx, a.Progress, time.NewTicker(5*time.Second))
// Attempt to get an archive of the server. // Attempt to get an archive of the server.
if err := a.Create(getArchivePath(s.ID())); err != nil { if err := a.Create(getArchivePath(s.ID())); err != nil {
sendTransferLog("An error occurred while archiving the server: " + err.Error()) sendTransferLog("An error occurred while archiving the server: " + err.Error())
@ -196,6 +209,12 @@ func postServerArchive(c *gin.Context) {
return return
} }
// Cancel the progress ticker.
cancel()
// Show 100% completion.
sendTransferLog("Archiving " + a.Progress.Progress(progressWidth))
sendTransferLog("Successfully created archive, attempting to notify panel..") sendTransferLog("Successfully created archive, attempting to notify panel..")
l.Info("successfully created server transfer archive, notifying panel..") l.Info("successfully created server transfer archive, notifying panel..")
@ -223,12 +242,6 @@ func postServerArchive(c *gin.Context) {
c.Status(http.StatusAccepted) c.Status(http.StatusAccepted)
} }
func (w *downloadProgress) Write(v []byte) (int, error) {
n := len(v)
atomic.AddInt64(&w.progress, int64(n))
return n, nil
}
// Log helper function to attach all errors and info output to a consistently formatted // Log helper function to attach all errors and info output to a consistently formatted
// log string for easier querying. // log string for easier querying.
func (str serverTransferRequest) log() *log.Entry { func (str serverTransferRequest) log() *log.Entry {
@ -321,7 +334,7 @@ func postTransfer(c *gin.Context) {
manager := middleware.ExtractManager(c) manager := middleware.ExtractManager(c)
u, err := uuid.Parse(data.ServerID) u, err := uuid.Parse(data.ServerID)
if err != nil { if err != nil {
WithError(c, err) _ = WithError(c, err)
return return
} }
// Force the server ID to be a valid UUID string at this point. If it is not an error // Force the server ID to be a valid UUID string at this point. If it is not an error
@ -331,11 +344,12 @@ func postTransfer(c *gin.Context) {
data.log().Info("handling incoming server transfer request") data.log().Info("handling incoming server transfer request")
go func(data *serverTransferRequest) { go func(data *serverTransferRequest) {
ctx := context.Background()
hasError := true hasError := true
// Create a new server installer. This will only configure the environment and not // Create a new server installer. This will only configure the environment and not
// run the installer scripts. // run the installer scripts.
i, err := installer.New(context.Background(), manager, data.Server) i, err := installer.New(ctx, manager, data.Server)
if err != nil { if err != nil {
_ = data.sendTransferStatus(manager.Client(), false) _ = data.sendTransferStatus(manager.Client(), false)
data.log().WithField("error", err).Error("failed to validate received server data") data.log().WithField("error", err).Error("failed to validate received server data")
@ -407,25 +421,22 @@ func postTransfer(c *gin.Context) {
sendTransferLog("Writing archive to disk...") sendTransferLog("Writing archive to disk...")
data.log().Info("writing transfer archive to disk...") data.log().Info("writing transfer archive to disk...")
// Copy the file. progress := filesystem.NewProgress(size)
progress := &downloadProgress{size: size}
ticker := time.NewTicker(3 * time.Second) // Send the archive progress to the websocket every 3 seconds.
go func(progress *downloadProgress, t *time.Ticker) { ctx, cancel := context.WithCancel(ctx)
for range ticker.C { defer cancel()
// p = 100 (Downloaded) go func(ctx context.Context, p *filesystem.Progress, t *time.Ticker) {
// size = 1000 (Content-Length) defer t.Stop()
// p / size = 0.1 for {
// * 100 = 10% (Multiply by 100 to get a percentage of the download) select {
// 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks) case <-ctx.Done():
// 2.5 (Number of ticks as a float64) return
// 2 (convert to an integer) case <-t.C:
p := atomic.LoadInt64(&progress.progress) sendTransferLog("Downloading " + p.Progress(progressWidth))
// We have to cast these numbers to float in order to get a float result from the division. }
width := ((float64(p) / float64(size)) * 100) / tickPercentage
bar := strings.Repeat("=", int(width)) + strings.Repeat(" ", ticks-int(width))
sendTransferLog("Downloading [" + bar + "] " + system.FormatBytes(p) + " / " + system.FormatBytes(progress.size))
} }
}(progress, ticker) }(ctx, progress, time.NewTicker(5*time.Second))
var reader io.Reader var reader io.Reader
downloadLimit := float64(config.Get().System.Transfers.DownloadLimit) * 1024 * 1024 downloadLimit := float64(config.Get().System.Transfers.DownloadLimit) * 1024 * 1024
@ -438,18 +449,16 @@ func postTransfer(c *gin.Context) {
buf := make([]byte, 1024*4) buf := make([]byte, 1024*4)
if _, err := io.CopyBuffer(file, io.TeeReader(reader, progress), buf); err != nil { if _, err := io.CopyBuffer(file, io.TeeReader(reader, progress), buf); err != nil {
ticker.Stop()
_ = file.Close() _ = file.Close()
sendTransferLog("Failed while writing archive file to disk: " + err.Error()) sendTransferLog("Failed while writing archive file to disk: " + err.Error())
data.log().WithField("error", err).Error("failed to copy archive file to disk") data.log().WithField("error", err).Error("failed to copy archive file to disk")
return return
} }
ticker.Stop() cancel()
// Show 100% completion. // Show 100% completion.
humanSize := system.FormatBytes(progress.size) sendTransferLog("Downloading " + progress.Progress(progressWidth))
sendTransferLog("Downloading [" + strings.Repeat("=", ticks) + "] " + humanSize + " / " + humanSize)
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
data.log().WithField("error", err).Error("unable to close archive file on local filesystem") data.log().WithField("error", err).Error("unable to close archive file on local filesystem")

View File

@ -8,6 +8,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
@ -17,6 +18,7 @@ import (
ignore "github.com/sabhiram/go-gitignore" ignore "github.com/sabhiram/go-gitignore"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/system"
) )
const memory = 4 * 1024 const memory = 4 * 1024
@ -28,6 +30,62 @@ var pool = sync.Pool{
}, },
} }
// Progress is used to track the progress of any I/O operation that are being
// performed.
type Progress struct {
// written is the total size of the files that have been written to the writer.
written int64
// Total is the total size of the archive in bytes.
total int64
// w .
w io.Writer
}
// NewProgress .
func NewProgress(total int64) *Progress {
return &Progress{total: total}
}
// Written returns the total number of bytes written.
// This function should be used when the progress is tracking data being written.
func (p *Progress) Written() int64 {
return atomic.LoadInt64(&p.written)
}
// Total returns the total size in bytes.
func (p *Progress) Total() int64 {
return atomic.LoadInt64(&p.total)
}
// Write totals the number of bytes that have been written to the writer.
func (p *Progress) Write(v []byte) (int, error) {
n := len(v)
atomic.AddInt64(&p.written, int64(n))
if p.w != nil {
return p.w.Write(v)
}
return n, nil
}
// Progress returns a formatted progress string for the current progress.
func (p *Progress) Progress(width int) string {
current := p.Written()
total := p.Total()
// v = 100 (Progress)
// size = 1000 (Content-Length)
// p / size = 0.1
// * 100 = 10% (Multiply by 100 to get a percentage of the download)
// 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks)
// 2.5 (Number of ticks as a float64)
// 2 (convert to an integer)
// We have to cast these numbers to float in order to get a float result from the division.
ticks := ((float64(current) / float64(total)) * 100) / (float64(100) / float64(width))
bar := strings.Repeat("=", int(ticks)) + strings.Repeat(" ", width-int(ticks))
return "[" + bar + "] " + system.FormatBytes(current) + " / " + system.FormatBytes(total)
}
type Archive struct { type Archive struct {
// BasePath is the absolute path to create the archive from where Files and Ignore are // BasePath is the absolute path to create the archive from where Files and Ignore are
// relative to. // relative to.
@ -40,10 +98,13 @@ type Archive struct {
// Files specifies the files to archive, this takes priority over the Ignore option, if // Files specifies the files to archive, this takes priority over the Ignore option, if
// unspecified, all files in the BasePath will be archived unless Ignore is set. // unspecified, all files in the BasePath will be archived unless Ignore is set.
Files []string Files []string
// Progress wraps the writer of the archive to pass through the progress tracker.
Progress *Progress
} }
// Create creates an archive at dst with all of the files defined in the // Create creates an archive at dst with all the files defined in the
// included files struct. // included Files array.
func (a *Archive) Create(dst string) error { func (a *Archive) Create(dst string) error {
f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil { if err != nil {
@ -62,26 +123,34 @@ func (a *Archive) Create(dst string) error {
writer = f writer = f
} }
// The default compression level is BestSpeed
var cl = pgzip.BestSpeed
// Choose which compression level to use based on the compression_level configuration option // Choose which compression level to use based on the compression_level configuration option
var compressionLevel int
switch config.Get().System.Backups.CompressionLevel { switch config.Get().System.Backups.CompressionLevel {
case "none": case "none":
cl = pgzip.NoCompression compressionLevel = pgzip.NoCompression
case "best_speed":
cl = pgzip.BestSpeed
case "best_compression": case "best_compression":
cl = pgzip.BestCompression compressionLevel = pgzip.BestCompression
case "best_speed":
fallthrough
default:
compressionLevel = pgzip.BestSpeed
} }
// Create a new gzip writer around the file. // Create a new gzip writer around the file.
gw, _ := pgzip.NewWriterLevel(writer, cl) gw, _ := pgzip.NewWriterLevel(writer, compressionLevel)
_ = gw.SetConcurrency(1<<20, 1) _ = gw.SetConcurrency(1<<20, 1)
defer gw.Close() defer gw.Close()
var pw io.Writer
if a.Progress != nil {
a.Progress.w = gw
pw = a.Progress
} else {
pw = gw
}
// Create a new tar writer around the gzip writer. // Create a new tar writer around the gzip writer.
tw := tar.NewWriter(gw) tw := tar.NewWriter(pw)
defer tw.Close() defer tw.Close()
// Configure godirwalk. // Configure godirwalk.
@ -116,7 +185,7 @@ func (a *Archive) Create(dst string) error {
// being generated. // being generated.
func (a *Archive) callback(tw *tar.Writer, opts ...func(path string, relative string) error) func(path string, de *godirwalk.Dirent) error { func (a *Archive) callback(tw *tar.Writer, opts ...func(path string, relative string) error) func(path string, de *godirwalk.Dirent) error {
return func(path string, de *godirwalk.Dirent) error { return func(path string, de *godirwalk.Dirent) error {
// Skip directories because we walking them recursively. // Skip directories because we are walking them recursively.
if de.IsDir() { if de.IsDir() {
return nil return nil
} }