Streaming Transfers (#153)
This commit is contained in:
52
server/transfer/archive.go
Normal file
52
server/transfer/archive.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pterodactyl/wings/internal/progress"
|
||||
"github.com/pterodactyl/wings/server/filesystem"
|
||||
)
|
||||
|
||||
// Archive returns an archive that can be used to stream the contents of the
|
||||
// contents of a server.
|
||||
func (t *Transfer) Archive() (*Archive, error) {
|
||||
if t.archive == nil {
|
||||
// Get the disk usage of the server (used to calculate the progress of the archive process)
|
||||
rawSize, err := t.Server.Filesystem().DiskUsage(true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("transfer: failed to get server disk usage: %w", err)
|
||||
}
|
||||
|
||||
// Create a new archive instance and assign it to the transfer.
|
||||
t.archive = NewArchive(t, uint64(rawSize))
|
||||
}
|
||||
|
||||
return t.archive, nil
|
||||
}
|
||||
|
||||
// Archive represents an archive used to transfer the contents of a server.
|
||||
type Archive struct {
|
||||
archive *filesystem.Archive
|
||||
}
|
||||
|
||||
// NewArchive returns a new archive associated with the given transfer.
|
||||
func NewArchive(t *Transfer, size uint64) *Archive {
|
||||
return &Archive{
|
||||
archive: &filesystem.Archive{
|
||||
BasePath: t.Server.Filesystem().Path(),
|
||||
Progress: progress.NewProgress(size),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Stream returns a reader that can be used to stream the contents of the archive.
|
||||
func (a *Archive) Stream(ctx context.Context, w io.Writer) error {
|
||||
return a.archive.Stream(ctx, w)
|
||||
}
|
||||
|
||||
// Progress returns the current progress of the archive.
|
||||
func (a *Archive) Progress() *progress.Progress {
|
||||
return a.archive.Progress
|
||||
}
|
||||
4
server/transfer/doc.go
Normal file
4
server/transfer/doc.go
Normal file
@@ -0,0 +1,4 @@
|
||||
// Package transfer handles all logic related to transferring servers between
|
||||
// two nodes. This includes the logic for archiving a server on the source node
|
||||
// and logic for importing a server from the source node into the target node.
|
||||
package transfer
|
||||
57
server/transfer/manager.go
Normal file
57
server/transfer/manager.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
incomingTransfers = NewManager()
|
||||
outgoingTransfers = NewManager()
|
||||
)
|
||||
|
||||
// Incoming returns a transfer manager for incoming transfers.
|
||||
func Incoming() *Manager {
|
||||
return incomingTransfers
|
||||
}
|
||||
|
||||
// Outgoing returns a transfer manager for outgoing transfers.
|
||||
func Outgoing() *Manager {
|
||||
return outgoingTransfers
|
||||
}
|
||||
|
||||
// Manager manages transfers.
|
||||
type Manager struct {
|
||||
mu sync.RWMutex
|
||||
transfers map[string]*Transfer
|
||||
}
|
||||
|
||||
// NewManager returns a new transfer manager.
|
||||
func NewManager() *Manager {
|
||||
return &Manager{
|
||||
transfers: make(map[string]*Transfer),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a transfer to the manager.
|
||||
func (m *Manager) Add(transfer *Transfer) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.transfers[transfer.Server.ID()] = transfer
|
||||
}
|
||||
|
||||
// Remove removes a transfer from the manager.
|
||||
func (m *Manager) Remove(transfer *Transfer) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
delete(m.transfers, transfer.Server.ID())
|
||||
}
|
||||
|
||||
// Get gets a transfer from the manager using a server ID.
|
||||
func (m *Manager) Get(id string) *Transfer {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
return m.transfers[id]
|
||||
}
|
||||
159
server/transfer/source.go
Normal file
159
server/transfer/source.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/pterodactyl/wings/internal/progress"
|
||||
)
|
||||
|
||||
// PushArchiveToTarget POSTs the archive to the target node and returns the
|
||||
// response body.
|
||||
func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) {
|
||||
ctx, cancel := context.WithCancel(t.ctx)
|
||||
defer cancel()
|
||||
|
||||
t.SendMessage("Preparing to stream server data to destination...")
|
||||
t.SetStatus(StatusProcessing)
|
||||
|
||||
a, err := t.Archive()
|
||||
if err != nil {
|
||||
t.Error(err, "Failed to get archive for transfer.")
|
||||
return nil, errors.New("failed to get archive for transfer")
|
||||
}
|
||||
|
||||
t.SendMessage("Streaming archive to destination...")
|
||||
|
||||
// Send the upload progress to the websocket every 5 seconds.
|
||||
ctx2, cancel2 := context.WithCancel(ctx)
|
||||
defer cancel2()
|
||||
go func(ctx context.Context, p *progress.Progress, tc *time.Ticker) {
|
||||
defer tc.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tc.C:
|
||||
t.SendMessage("Uploading " + p.Progress(25))
|
||||
}
|
||||
}
|
||||
}(ctx2, a.Progress(), time.NewTicker(5*time.Second))
|
||||
|
||||
// Create a new request using the pipe as the body.
|
||||
body, writer := io.Pipe()
|
||||
defer body.Close()
|
||||
defer writer.Close()
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", token)
|
||||
|
||||
// Create a new multipart writer that writes the archive to the pipe.
|
||||
mp := multipart.NewWriter(writer)
|
||||
defer mp.Close()
|
||||
req.Header.Set("Content-Type", mp.FormDataContentType())
|
||||
|
||||
// Create a new goroutine to write the archive to the pipe used by the
|
||||
// multipart writer.
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
defer close(errChan)
|
||||
defer writer.Close()
|
||||
defer mp.Close()
|
||||
|
||||
src, pw := io.Pipe()
|
||||
defer src.Close()
|
||||
defer pw.Close()
|
||||
|
||||
h := sha256.New()
|
||||
tee := io.TeeReader(src, h)
|
||||
|
||||
dest, err := mp.CreateFormFile("archive", "archive.tar.gz")
|
||||
if err != nil {
|
||||
errChan <- errors.New("failed to create form file")
|
||||
return
|
||||
}
|
||||
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
if _, err := io.Copy(dest, tee); err != nil {
|
||||
ch <- fmt.Errorf("failed to stream archive to destination: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Log().Debug("finished copying dest to tee")
|
||||
}()
|
||||
|
||||
if err := a.Stream(ctx, pw); err != nil {
|
||||
errChan <- errors.New("failed to stream archive to pipe")
|
||||
return
|
||||
}
|
||||
t.Log().Debug("finished streaming archive to pipe")
|
||||
|
||||
// Close the pipe writer early to release resources and ensure that the data gets flushed.
|
||||
_ = pw.Close()
|
||||
|
||||
// Wait for the copy to finish before we continue.
|
||||
t.Log().Debug("waiting on copy to finish")
|
||||
if err := <-ch; err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err := mp.WriteField("checksum", hex.EncodeToString(h.Sum(nil))); err != nil {
|
||||
errChan <- errors.New("failed to stream checksum")
|
||||
return
|
||||
}
|
||||
|
||||
cancel2()
|
||||
t.SendMessage("Finished streaming archive to destination.")
|
||||
|
||||
if err := mp.Close(); err != nil {
|
||||
t.Log().WithError(err).Error("error while closing multipart writer")
|
||||
}
|
||||
t.Log().Debug("closed multipart writer")
|
||||
}()
|
||||
|
||||
t.Log().Debug("sending archive to destination")
|
||||
client := http.Client{Timeout: 0}
|
||||
res, err := client.Do(req)
|
||||
t.Log().Debug("waiting for stream to complete")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case err2 := <-errChan:
|
||||
t.Log().Debug("stream completed")
|
||||
if err != nil || err2 != nil {
|
||||
if err == context.Canceled {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.Log().WithError(err).Debug("failed to send archive to destination")
|
||||
return nil, fmt.Errorf("http error: %w, multipart error: %v", err, err2)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
t.Log().Debug("received response from destination")
|
||||
|
||||
v, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, errors.New(string(v))
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
128
server/transfer/transfer.go
Normal file
128
server/transfer/transfer.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package transfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/mitchellh/colorstring"
|
||||
|
||||
"github.com/pterodactyl/wings/server"
|
||||
"github.com/pterodactyl/wings/system"
|
||||
)
|
||||
|
||||
// Status represents the current status of a transfer.
|
||||
type Status string
|
||||
|
||||
// String satisfies the fmt.Stringer interface.
|
||||
func (s Status) String() string {
|
||||
return string(s)
|
||||
}
|
||||
|
||||
const (
|
||||
// StatusPending is the status of a transfer when it is first created.
|
||||
StatusPending Status = "pending"
|
||||
// StatusProcessing is the status of a transfer when it is currently in
|
||||
// progress, such as when the archive is being streamed to the target node.
|
||||
StatusProcessing Status = "processing"
|
||||
|
||||
// StatusCancelling is the status of a transfer when it is in the process of
|
||||
// being cancelled.
|
||||
StatusCancelling Status = "cancelling"
|
||||
|
||||
// StatusCancelled is the final status of a transfer when it has been
|
||||
// cancelled.
|
||||
StatusCancelled Status = "cancelled"
|
||||
// StatusFailed is the final status of a transfer when it has failed.
|
||||
StatusFailed Status = "failed"
|
||||
// StatusCompleted is the final status of a transfer when it has completed.
|
||||
StatusCompleted Status = "completed"
|
||||
)
|
||||
|
||||
// Transfer represents a transfer of a server from one node to another.
|
||||
type Transfer struct {
|
||||
// ctx is the context for the transfer.
|
||||
ctx context.Context
|
||||
// cancel is used to cancel all ongoing transfer operations for the server.
|
||||
cancel *context.CancelFunc
|
||||
|
||||
// Server associated with the transfer.
|
||||
Server *server.Server
|
||||
// status of the transfer.
|
||||
status *system.Atomic[Status]
|
||||
|
||||
// archive is the archive that is being created for the transfer.
|
||||
archive *Archive
|
||||
}
|
||||
|
||||
// New returns a new transfer instance for the given server.
|
||||
func New(ctx context.Context, s *server.Server) *Transfer {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
return &Transfer{
|
||||
ctx: ctx,
|
||||
cancel: &cancel,
|
||||
|
||||
Server: s,
|
||||
status: system.NewAtomic(StatusPending),
|
||||
}
|
||||
}
|
||||
|
||||
// Context returns the context for the transfer.
|
||||
func (t *Transfer) Context() context.Context {
|
||||
return t.ctx
|
||||
}
|
||||
|
||||
// Cancel cancels the transfer.
|
||||
func (t *Transfer) Cancel() {
|
||||
status := t.Status()
|
||||
if status == StatusCancelling ||
|
||||
status == StatusCancelled ||
|
||||
status == StatusCompleted ||
|
||||
status == StatusFailed {
|
||||
return
|
||||
}
|
||||
|
||||
if t.cancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.SetStatus(StatusCancelling)
|
||||
(*t.cancel)()
|
||||
}
|
||||
|
||||
// Status returns the current status of the transfer.
|
||||
func (t *Transfer) Status() Status {
|
||||
return t.status.Load()
|
||||
}
|
||||
|
||||
// SetStatus sets the status of the transfer.
|
||||
func (t *Transfer) SetStatus(s Status) {
|
||||
// TODO: prevent certain status changes from happening.
|
||||
// If we are cancelling, then we can't go back to processing.
|
||||
t.status.Store(s)
|
||||
|
||||
t.Server.Events().Publish(server.TransferStatusEvent, s)
|
||||
}
|
||||
|
||||
// SendMessage sends a message to the server's console.
|
||||
func (t *Transfer) SendMessage(v string) {
|
||||
t.Server.Events().Publish(
|
||||
server.TransferLogsEvent,
|
||||
colorstring.Color("[yellow][bold]"+time.Now().Format(time.RFC1123)+" [Transfer System] [Source Node]:[default] "+v),
|
||||
)
|
||||
}
|
||||
|
||||
// Error logs an error that occurred on the source node.
|
||||
func (t *Transfer) Error(err error, v string) {
|
||||
t.Log().WithError(err).Error(v)
|
||||
t.SendMessage(v)
|
||||
}
|
||||
|
||||
// Log returns a logger for the transfer.
|
||||
func (t *Transfer) Log() *log.Entry {
|
||||
if t.Server == nil {
|
||||
return log.WithField("subsystem", "transfer")
|
||||
}
|
||||
return t.Server.Log().WithField("subsystem", "transfer")
|
||||
}
|
||||
Reference in New Issue
Block a user