From 6fb61261b0f37f87ec87c0ff5faacebe2fa36a70 Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Tue, 4 Oct 2022 20:35:48 -0600 Subject: [PATCH] server(transfers): track progress of archive creation and extraction (#143) --- router/router_transfer.go | 95 ++++++++++++++++++++---------------- server/filesystem/archive.go | 93 ++++++++++++++++++++++++++++++----- 2 files changed, 133 insertions(+), 55 deletions(-) diff --git a/router/router_transfer.go b/router/router_transfer.go index b3153e0..8202c3d 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -12,7 +12,6 @@ import ( "path/filepath" "strconv" "strings" - "sync/atomic" "time" "emperror.dev/errors" @@ -30,19 +29,9 @@ import ( "github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server/filesystem" - "github.com/pterodactyl/wings/system" ) -// Number of ticks in the progress bar -const ticks = 25 - -// 100% / number of ticks = percentage represented by each tick -const tickPercentage = 100 / ticks - -type downloadProgress struct { - size int64 - progress int64 -} +const progressWidth = 25 // Data passed over to initiate a server transfer. type serverTransferRequest struct { @@ -95,7 +84,7 @@ func getServerArchive(c *gin.Context) { return } - // Compute sha1 checksum. + // Compute sha256 checksum. h := sha256.New() f, err := os.Open(archivePath) if err != nil { @@ -184,11 +173,35 @@ func postServerArchive(c *gin.Context) { return } + // Get the disk usage of the server (used to calculate the progress of the archive process) + rawSize, err := s.Filesystem().DiskUsage(true) + if err != nil { + sendTransferLog("Failed to get disk usage for server, aborting transfer..") + l.WithField("error", err).Error("failed to get disk usage for server") + return + } + // Create an archive of the entire server's data directory. a := &filesystem.Archive{ BasePath: s.Filesystem().Path(), + Progress: filesystem.NewProgress(rawSize), } + // Send the archive progress to the websocket every 3 seconds. + ctx, cancel := context.WithCancel(s.Context()) + defer cancel() + go func(ctx context.Context, p *filesystem.Progress, t *time.Ticker) { + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + sendTransferLog("Archiving " + p.Progress(progressWidth)) + } + } + }(ctx, a.Progress, time.NewTicker(5*time.Second)) + // Attempt to get an archive of the server. if err := a.Create(getArchivePath(s.ID())); err != nil { sendTransferLog("An error occurred while archiving the server: " + err.Error()) @@ -196,6 +209,12 @@ func postServerArchive(c *gin.Context) { return } + // Cancel the progress ticker. + cancel() + + // Show 100% completion. + sendTransferLog("Archiving " + a.Progress.Progress(progressWidth)) + sendTransferLog("Successfully created archive, attempting to notify panel..") l.Info("successfully created server transfer archive, notifying panel..") @@ -223,12 +242,6 @@ func postServerArchive(c *gin.Context) { c.Status(http.StatusAccepted) } -func (w *downloadProgress) Write(v []byte) (int, error) { - n := len(v) - atomic.AddInt64(&w.progress, int64(n)) - return n, nil -} - // Log helper function to attach all errors and info output to a consistently formatted // log string for easier querying. func (str serverTransferRequest) log() *log.Entry { @@ -321,7 +334,7 @@ func postTransfer(c *gin.Context) { manager := middleware.ExtractManager(c) u, err := uuid.Parse(data.ServerID) if err != nil { - WithError(c, err) + _ = WithError(c, err) return } // Force the server ID to be a valid UUID string at this point. If it is not an error @@ -331,11 +344,12 @@ func postTransfer(c *gin.Context) { data.log().Info("handling incoming server transfer request") go func(data *serverTransferRequest) { + ctx := context.Background() hasError := true // Create a new server installer. This will only configure the environment and not // run the installer scripts. - i, err := installer.New(context.Background(), manager, data.Server) + i, err := installer.New(ctx, manager, data.Server) if err != nil { _ = data.sendTransferStatus(manager.Client(), false) data.log().WithField("error", err).Error("failed to validate received server data") @@ -407,25 +421,22 @@ func postTransfer(c *gin.Context) { sendTransferLog("Writing archive to disk...") data.log().Info("writing transfer archive to disk...") - // Copy the file. - progress := &downloadProgress{size: size} - ticker := time.NewTicker(3 * time.Second) - go func(progress *downloadProgress, t *time.Ticker) { - for range ticker.C { - // p = 100 (Downloaded) - // size = 1000 (Content-Length) - // p / size = 0.1 - // * 100 = 10% (Multiply by 100 to get a percentage of the download) - // 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks) - // 2.5 (Number of ticks as a float64) - // 2 (convert to an integer) - p := atomic.LoadInt64(&progress.progress) - // We have to cast these numbers to float in order to get a float result from the division. - width := ((float64(p) / float64(size)) * 100) / tickPercentage - bar := strings.Repeat("=", int(width)) + strings.Repeat(" ", ticks-int(width)) - sendTransferLog("Downloading [" + bar + "] " + system.FormatBytes(p) + " / " + system.FormatBytes(progress.size)) + progress := filesystem.NewProgress(size) + + // Send the archive progress to the websocket every 3 seconds. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func(ctx context.Context, p *filesystem.Progress, t *time.Ticker) { + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + sendTransferLog("Downloading " + p.Progress(progressWidth)) + } } - }(progress, ticker) + }(ctx, progress, time.NewTicker(5*time.Second)) var reader io.Reader downloadLimit := float64(config.Get().System.Transfers.DownloadLimit) * 1024 * 1024 @@ -438,18 +449,16 @@ func postTransfer(c *gin.Context) { buf := make([]byte, 1024*4) if _, err := io.CopyBuffer(file, io.TeeReader(reader, progress), buf); err != nil { - ticker.Stop() _ = file.Close() sendTransferLog("Failed while writing archive file to disk: " + err.Error()) data.log().WithField("error", err).Error("failed to copy archive file to disk") return } - ticker.Stop() + cancel() // Show 100% completion. - humanSize := system.FormatBytes(progress.size) - sendTransferLog("Downloading [" + strings.Repeat("=", ticks) + "] " + humanSize + " / " + humanSize) + sendTransferLog("Downloading " + progress.Progress(progressWidth)) if err := file.Close(); err != nil { data.log().WithField("error", err).Error("unable to close archive file on local filesystem") diff --git a/server/filesystem/archive.go b/server/filesystem/archive.go index 668d8bd..206907a 100644 --- a/server/filesystem/archive.go +++ b/server/filesystem/archive.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "emperror.dev/errors" "github.com/apex/log" @@ -17,6 +18,7 @@ import ( ignore "github.com/sabhiram/go-gitignore" "github.com/pterodactyl/wings/config" + "github.com/pterodactyl/wings/system" ) const memory = 4 * 1024 @@ -28,6 +30,62 @@ var pool = sync.Pool{ }, } +// Progress is used to track the progress of any I/O operation that are being +// performed. +type Progress struct { + // written is the total size of the files that have been written to the writer. + written int64 + // Total is the total size of the archive in bytes. + total int64 + // w . + w io.Writer +} + +// NewProgress . +func NewProgress(total int64) *Progress { + return &Progress{total: total} +} + +// Written returns the total number of bytes written. +// This function should be used when the progress is tracking data being written. +func (p *Progress) Written() int64 { + return atomic.LoadInt64(&p.written) +} + +// Total returns the total size in bytes. +func (p *Progress) Total() int64 { + return atomic.LoadInt64(&p.total) +} + +// Write totals the number of bytes that have been written to the writer. +func (p *Progress) Write(v []byte) (int, error) { + n := len(v) + atomic.AddInt64(&p.written, int64(n)) + if p.w != nil { + return p.w.Write(v) + } + return n, nil +} + +// Progress returns a formatted progress string for the current progress. +func (p *Progress) Progress(width int) string { + current := p.Written() + total := p.Total() + + // v = 100 (Progress) + // size = 1000 (Content-Length) + // p / size = 0.1 + // * 100 = 10% (Multiply by 100 to get a percentage of the download) + // 10% / tickPercentage = (10% / (100 / 25)) (Divide by tick percentage to get the number of ticks) + // 2.5 (Number of ticks as a float64) + // 2 (convert to an integer) + + // We have to cast these numbers to float in order to get a float result from the division. + ticks := ((float64(current) / float64(total)) * 100) / (float64(100) / float64(width)) + bar := strings.Repeat("=", int(ticks)) + strings.Repeat(" ", width-int(ticks)) + return "[" + bar + "] " + system.FormatBytes(current) + " / " + system.FormatBytes(total) +} + type Archive struct { // BasePath is the absolute path to create the archive from where Files and Ignore are // relative to. @@ -40,10 +98,13 @@ type Archive struct { // Files specifies the files to archive, this takes priority over the Ignore option, if // unspecified, all files in the BasePath will be archived unless Ignore is set. Files []string + + // Progress wraps the writer of the archive to pass through the progress tracker. + Progress *Progress } -// Create creates an archive at dst with all of the files defined in the -// included files struct. +// Create creates an archive at dst with all the files defined in the +// included Files array. func (a *Archive) Create(dst string) error { f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) if err != nil { @@ -62,26 +123,34 @@ func (a *Archive) Create(dst string) error { writer = f } - // The default compression level is BestSpeed - var cl = pgzip.BestSpeed - // Choose which compression level to use based on the compression_level configuration option + var compressionLevel int switch config.Get().System.Backups.CompressionLevel { case "none": - cl = pgzip.NoCompression - case "best_speed": - cl = pgzip.BestSpeed + compressionLevel = pgzip.NoCompression case "best_compression": - cl = pgzip.BestCompression + compressionLevel = pgzip.BestCompression + case "best_speed": + fallthrough + default: + compressionLevel = pgzip.BestSpeed } // Create a new gzip writer around the file. - gw, _ := pgzip.NewWriterLevel(writer, cl) + gw, _ := pgzip.NewWriterLevel(writer, compressionLevel) _ = gw.SetConcurrency(1<<20, 1) defer gw.Close() + var pw io.Writer + if a.Progress != nil { + a.Progress.w = gw + pw = a.Progress + } else { + pw = gw + } + // Create a new tar writer around the gzip writer. - tw := tar.NewWriter(gw) + tw := tar.NewWriter(pw) defer tw.Close() // Configure godirwalk. @@ -116,7 +185,7 @@ func (a *Archive) Create(dst string) error { // being generated. func (a *Archive) callback(tw *tar.Writer, opts ...func(path string, relative string) error) func(path string, de *godirwalk.Dirent) error { return func(path string, de *godirwalk.Dirent) error { - // Skip directories because we walking them recursively. + // Skip directories because we are walking them recursively. if de.IsDir() { return nil }