Rework archiving logic to be more consistent and less impactful on disk IO (#79)
Co-authored-by: Dane Everitt <dane@daneeveritt.com>
This commit is contained in:
@@ -1,150 +0,0 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"emperror.dev/errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/juju/ratelimit"
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Archive struct {
|
||||
sync.Mutex
|
||||
|
||||
TrimPrefix string
|
||||
Files *IncludedFiles
|
||||
}
|
||||
|
||||
// Creates an archive at dst with all of the files defined in the included files struct.
|
||||
func (a *Archive) Create(dst string, ctx context.Context) error {
|
||||
f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Select a writer based off of the WriteLimit configuration option.
|
||||
var writer io.Writer
|
||||
if writeLimit := config.Get().System.Backups.WriteLimit; writeLimit < 1 {
|
||||
// If there is no write limit, use the file as the writer.
|
||||
writer = f
|
||||
} else {
|
||||
// Token bucket with a capacity of "writeLimit" MiB, adding "writeLimit" MiB/s
|
||||
bucket := ratelimit.NewBucketWithRate(float64(writeLimit)*1024*1024, int64(writeLimit)*1024*1024)
|
||||
|
||||
// Wrap the file writer with the token bucket limiter.
|
||||
writer = ratelimit.Writer(f, bucket)
|
||||
}
|
||||
|
||||
maxCpu := runtime.NumCPU() / 2
|
||||
if maxCpu > 4 {
|
||||
maxCpu = 4
|
||||
}
|
||||
|
||||
gzw, err := gzip.NewWriterLevel(writer, gzip.BestSpeed)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed to create gzip writer")
|
||||
}
|
||||
if err := gzw.SetConcurrency(1<<20, maxCpu); err != nil {
|
||||
return errors.WithMessage(err, "failed to set gzip concurrency")
|
||||
}
|
||||
|
||||
defer gzw.Flush()
|
||||
defer gzw.Close()
|
||||
|
||||
tw := tar.NewWriter(gzw)
|
||||
defer tw.Flush()
|
||||
defer tw.Close()
|
||||
|
||||
wg := sizedwaitgroup.New(10)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
// Iterate over all of the files to be included and put them into the archive. This is
|
||||
// done as a concurrent goroutine to speed things along. If an error is encountered at
|
||||
// any step, the entire process is aborted.
|
||||
for _, p := range a.Files.All() {
|
||||
p := p
|
||||
g.Go(func() error {
|
||||
wg.Add()
|
||||
defer wg.Done()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return a.addToArchive(p, tw)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Block until the entire routine is completed.
|
||||
if err := g.Wait(); err != nil {
|
||||
f.Close()
|
||||
|
||||
// Attempt to remove the archive if there is an error, report that error to
|
||||
// the logger if it fails.
|
||||
if rerr := os.Remove(dst); rerr != nil && !os.IsNotExist(rerr) {
|
||||
log.WithField("location", dst).Warn("failed to delete corrupted backup archive")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adds a single file to the existing tar archive writer.
|
||||
func (a *Archive) addToArchive(p string, w *tar.Writer) error {
|
||||
f, err := os.Open(p)
|
||||
if err != nil {
|
||||
// If you try to backup something that no longer exists (got deleted somewhere during the process
|
||||
// but not by this process), just skip over it and don't kill the entire backup.
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
s, err := f.Stat()
|
||||
if err != nil {
|
||||
// Same as above, don't kill the process just because the file no longer exists.
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
name := strings.TrimPrefix(p, a.TrimPrefix)
|
||||
header, err := tar.FileInfoHeader(s, name)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed to get tar#FileInfoHeader for "+name)
|
||||
}
|
||||
header.Name = name
|
||||
|
||||
// These actions must occur sequentially, even if this function is called multiple
|
||||
// in parallel. You'll get some nasty panic's otherwise.
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
if err := w.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, 4*1024)
|
||||
if _, err := io.CopyBuffer(w, io.LimitReader(f, header.Size), buf); err != nil {
|
||||
return errors.WithMessage(err, "failed to copy "+header.Name+" to archive")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -40,7 +40,7 @@ type Backup struct {
|
||||
|
||||
// An array of files to ignore when generating this backup. This should be
|
||||
// compatible with a standard .gitignore structure.
|
||||
IgnoredFiles []string `json:"ignored_files"`
|
||||
Ignore string `json:"ignore"`
|
||||
}
|
||||
|
||||
// noinspection GoNameStartsWithPackageName
|
||||
@@ -50,12 +50,12 @@ type BackupInterface interface {
|
||||
|
||||
// Generates a backup in whatever the configured source for the specific
|
||||
// implementation is.
|
||||
Generate(*IncludedFiles, string) (*ArchiveDetails, error)
|
||||
Generate(string, string) (*ArchiveDetails, error)
|
||||
|
||||
// Returns the ignored files for this backup instance.
|
||||
Ignored() []string
|
||||
Ignored() string
|
||||
|
||||
// Returns a SHA256 checksum for the generated backup.
|
||||
// Returns a SHA1 checksum for the generated backup.
|
||||
Checksum() ([]byte, error)
|
||||
|
||||
// Returns the size of the generated backup.
|
||||
@@ -153,6 +153,6 @@ func (b *Backup) Details() *ArchiveDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backup) Ignored() []string {
|
||||
return b.IgnoredFiles
|
||||
func (b *Backup) Ignored() string {
|
||||
return b.Ignore
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/pterodactyl/wings/server/filesystem"
|
||||
"os"
|
||||
)
|
||||
|
||||
@@ -17,8 +17,8 @@ var _ BackupInterface = (*LocalBackup)(nil)
|
||||
func LocateLocal(uuid string) (*LocalBackup, os.FileInfo, error) {
|
||||
b := &LocalBackup{
|
||||
Backup{
|
||||
Uuid: uuid,
|
||||
IgnoredFiles: nil,
|
||||
Uuid: uuid,
|
||||
Ignore: "",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -41,13 +41,13 @@ func (b *LocalBackup) Remove() error {
|
||||
|
||||
// Generates a backup of the selected files and pushes it to the defined location
|
||||
// for this instance.
|
||||
func (b *LocalBackup) Generate(included *IncludedFiles, prefix string) (*ArchiveDetails, error) {
|
||||
a := &Archive{
|
||||
TrimPrefix: prefix,
|
||||
Files: included,
|
||||
func (b *LocalBackup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
||||
a := &filesystem.Archive{
|
||||
BasePath: basePath,
|
||||
Ignore: ignore,
|
||||
}
|
||||
|
||||
if err := a.Create(b.Path(), context.Background()); err != nil {
|
||||
if err := a.Create(b.Path()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
)
|
||||
|
||||
type Request struct {
|
||||
Adapter string `json:"adapter"`
|
||||
Uuid string `json:"uuid"`
|
||||
IgnoredFiles []string `json:"ignored_files"`
|
||||
Adapter string `json:"adapter"`
|
||||
Uuid string `json:"uuid"`
|
||||
Ignore string `json:"ignore"`
|
||||
}
|
||||
|
||||
// Generates a new local backup struct.
|
||||
@@ -19,8 +19,8 @@ func (r *Request) NewLocalBackup() (*LocalBackup, error) {
|
||||
|
||||
return &LocalBackup{
|
||||
Backup{
|
||||
Uuid: r.Uuid,
|
||||
IgnoredFiles: r.IgnoredFiles,
|
||||
Uuid: r.Uuid,
|
||||
Ignore: r.Ignore,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -33,8 +33,8 @@ func (r *Request) NewS3Backup() (*S3Backup, error) {
|
||||
|
||||
return &S3Backup{
|
||||
Backup: Backup{
|
||||
Uuid: r.Uuid,
|
||||
IgnoredFiles: r.IgnoredFiles,
|
||||
Uuid: r.Uuid,
|
||||
Ignore: r.Ignore,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/pterodactyl/wings/api"
|
||||
"github.com/pterodactyl/wings/server/filesystem"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -19,15 +19,15 @@ var _ BackupInterface = (*S3Backup)(nil)
|
||||
|
||||
// Generates a new backup on the disk, moves it into the S3 bucket via the provided
|
||||
// presigned URL, and then deletes the backup from the disk.
|
||||
func (s *S3Backup) Generate(included *IncludedFiles, prefix string) (*ArchiveDetails, error) {
|
||||
func (s *S3Backup) Generate(basePath, ignore string) (*ArchiveDetails, error) {
|
||||
defer s.Remove()
|
||||
|
||||
a := &Archive{
|
||||
TrimPrefix: prefix,
|
||||
Files: included,
|
||||
a := &filesystem.Archive{
|
||||
BasePath: basePath,
|
||||
Ignore: ignore,
|
||||
}
|
||||
|
||||
if err := a.Create(s.Path(), context.Background()); err != nil {
|
||||
if err := a.Create(s.Path()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type IncludedFiles struct {
|
||||
sync.RWMutex
|
||||
files []string
|
||||
}
|
||||
|
||||
// Pushes an additional file or folder onto the struct.
|
||||
func (i *IncludedFiles) Push(p string) {
|
||||
i.Lock()
|
||||
i.files = append(i.files, p) // ~~
|
||||
i.Unlock()
|
||||
}
|
||||
|
||||
// Returns all of the files that were marked as being included.
|
||||
func (i *IncludedFiles) All() []string {
|
||||
i.RLock()
|
||||
defer i.RUnlock()
|
||||
|
||||
return i.files
|
||||
}
|
||||
Reference in New Issue
Block a user