Support canceling an in-progress download
This commit is contained in:
parent
c718da20e3
commit
c8d297a056
|
@ -5,6 +5,7 @@ import (
|
||||||
"emperror.dev/errors"
|
"emperror.dev/errors"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/pterodactyl/wings/server"
|
"github.com/pterodactyl/wings/server"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -13,6 +14,18 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Counter struct {
|
||||||
|
total int
|
||||||
|
onWrite func(total int)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Counter) Write(p []byte) (int, error) {
|
||||||
|
n := len(p)
|
||||||
|
c.total += n
|
||||||
|
c.onWrite(c.total)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
downloadCache map[string]Download
|
downloadCache map[string]Download
|
||||||
|
@ -28,6 +41,7 @@ type Download struct {
|
||||||
Identifier string
|
Identifier string
|
||||||
req DownloadRequest
|
req DownloadRequest
|
||||||
server *server.Server
|
server *server.Server
|
||||||
|
progress float64
|
||||||
cancelFunc *context.CancelFunc
|
cancelFunc *context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +67,28 @@ func New(s *server.Server, r DownloadRequest) *Download {
|
||||||
return &dl
|
return &dl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns all of the tracked downloads for a given server instance.
|
||||||
|
func ByServer(sid string) []Download {
|
||||||
|
var downloads []Download
|
||||||
|
if v, ok := instance.serverCache[sid]; ok {
|
||||||
|
for _, id := range v {
|
||||||
|
if dl, dlok := instance.downloadCache[id]; dlok {
|
||||||
|
downloads = append(downloads, dl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return downloads
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a single Download matching a given identifier. If no download is found
|
||||||
|
// the second argument in the response will be false.
|
||||||
|
func ByID(dlid string) (Download, bool) {
|
||||||
|
if v, ok := instance.downloadCache[dlid]; ok {
|
||||||
|
return v, true
|
||||||
|
}
|
||||||
|
return Download{}, false
|
||||||
|
}
|
||||||
|
|
||||||
// Executes a given download for the server and begins writing the file to the disk. Once
|
// Executes a given download for the server and begins writing the file to the disk. Once
|
||||||
// completed the download will be removed from the cache.
|
// completed the download will be removed from the cache.
|
||||||
func (dl *Download) Execute() error {
|
func (dl *Download) Execute() error {
|
||||||
|
@ -82,7 +118,9 @@ func (dl *Download) Execute() error {
|
||||||
fnameparts := strings.Split(dl.req.URL.Path, "/")
|
fnameparts := strings.Split(dl.req.URL.Path, "/")
|
||||||
p := filepath.Join(dl.req.Directory, fnameparts[len(fnameparts)-1])
|
p := filepath.Join(dl.req.Directory, fnameparts[len(fnameparts)-1])
|
||||||
dl.server.Log().WithField("path", p).Debug("writing remote file to disk")
|
dl.server.Log().WithField("path", p).Debug("writing remote file to disk")
|
||||||
if err := dl.server.Filesystem().Writefile(p, res.Body); err != nil {
|
|
||||||
|
r := io.TeeReader(res.Body, dl.counter(res.ContentLength))
|
||||||
|
if err := dl.server.Filesystem().Writefile(p, r); err != nil {
|
||||||
return errors.WrapIf(err, "downloader: failed to write file to server directory")
|
return errors.WrapIf(err, "downloader: failed to write file to server directory")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -97,6 +135,23 @@ func (dl *Download) Cancel() {
|
||||||
instance.remove(dl.Identifier)
|
instance.remove(dl.Identifier)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checks if the given download belongs to the provided server.
|
||||||
|
func (dl *Download) BelongsTo(s *server.Server) bool {
|
||||||
|
return dl.server.Id() == s.Id()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handles a write event by updating the progress completed percentage and firing off
|
||||||
|
// events to the server websocket as needed.
|
||||||
|
func (dl *Download) counter(contentLength int64) *Counter {
|
||||||
|
onWrite := func(t int) {
|
||||||
|
dl.progress = float64(t) / float64(contentLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Counter{
|
||||||
|
onWrite: onWrite,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tracks a download in the internal cache for this instance.
|
// Tracks a download in the internal cache for this instance.
|
||||||
func (d *Downloader) track(dl Download) {
|
func (d *Downloader) track(dl Download) {
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
|
|
|
@ -121,16 +121,20 @@ func (e *RequestError) Abort(c *gin.Context) {
|
||||||
// Looks at the given RequestError and determines if it is a specific filesystem error that
|
// Looks at the given RequestError and determines if it is a specific filesystem error that
|
||||||
// we can process and return differently for the user.
|
// we can process and return differently for the user.
|
||||||
func (e *RequestError) getAsFilesystemError() (int, string) {
|
func (e *RequestError) getAsFilesystemError() (int, string) {
|
||||||
if errors.Is(e.err, os.ErrNotExist) || filesystem.IsErrorCode(e.err, filesystem.ErrCodePathResolution) {
|
err := errors.Unwrap(e.err)
|
||||||
|
if err == nil {
|
||||||
|
return 0, ""
|
||||||
|
}
|
||||||
|
if errors.Is(err, os.ErrNotExist) || filesystem.IsErrorCode(err, filesystem.ErrCodePathResolution) {
|
||||||
return http.StatusNotFound, "The requested resource was not found on the system."
|
return http.StatusNotFound, "The requested resource was not found on the system."
|
||||||
}
|
}
|
||||||
if filesystem.IsErrorCode(e.err, filesystem.ErrCodeDiskSpace) {
|
if filesystem.IsErrorCode(err, filesystem.ErrCodeDiskSpace) {
|
||||||
return http.StatusConflict, "There is not enough disk space available to perform that action."
|
return http.StatusConflict, "There is not enough disk space available to perform that action."
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(e.err.Error(), "file name too long") {
|
if strings.HasSuffix(err.Error(), "file name too long") {
|
||||||
return http.StatusBadRequest, "Cannot perform that action: file name is too long."
|
return http.StatusBadRequest, "Cannot perform that action: file name is too long."
|
||||||
}
|
}
|
||||||
if e, ok := e.err.(*os.SyscallError); ok && e.Syscall == "readdirent" {
|
if e, ok := err.(*os.SyscallError); ok && e.Syscall == "readdirent" {
|
||||||
return http.StatusNotFound, "The requested directory does not exist."
|
return http.StatusNotFound, "The requested directory does not exist."
|
||||||
}
|
}
|
||||||
return 0, ""
|
return 0, ""
|
||||||
|
|
|
@ -83,6 +83,7 @@ func Configure() *gin.Engine {
|
||||||
files.POST("/copy", postServerCopyFile)
|
files.POST("/copy", postServerCopyFile)
|
||||||
files.POST("/write", postServerWriteFile)
|
files.POST("/write", postServerWriteFile)
|
||||||
files.POST("/pull", postServerPullRemoteFile)
|
files.POST("/pull", postServerPullRemoteFile)
|
||||||
|
files.DELETE("/pull/:download", deleteServerPullRemoteFile)
|
||||||
files.POST("/create-directory", postServerCreateDirectory)
|
files.POST("/create-directory", postServerCreateDirectory)
|
||||||
files.POST("/delete", postServerDeleteFiles)
|
files.POST("/delete", postServerDeleteFiles)
|
||||||
files.POST("/compress", postServerCompressFiles)
|
files.POST("/compress", postServerCompressFiles)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"emperror.dev/errors"
|
"emperror.dev/errors"
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/pterodactyl/wings/router/downloader"
|
||||||
"github.com/pterodactyl/wings/router/tokens"
|
"github.com/pterodactyl/wings/router/tokens"
|
||||||
"github.com/pterodactyl/wings/server"
|
"github.com/pterodactyl/wings/server"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -210,6 +211,11 @@ func deleteServer(c *gin.Context) {
|
||||||
s.Throttler().StopTimer()
|
s.Throttler().StopTimer()
|
||||||
s.Websockets().CancelAll()
|
s.Websockets().CancelAll()
|
||||||
|
|
||||||
|
// Remove any pending remote file downloads for the server.
|
||||||
|
for _, dl := range downloader.ByServer(s.Id()) {
|
||||||
|
dl.Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
// Destroy the environment; in Docker this will handle a running container and
|
// Destroy the environment; in Docker this will handle a running container and
|
||||||
// forcibly terminate it before removing the container, so we do not need to handle
|
// forcibly terminate it before removing the container, so we do not need to handle
|
||||||
// that here.
|
// that here.
|
||||||
|
|
|
@ -266,13 +266,27 @@ func postServerPullRemoteFile(c *gin.Context) {
|
||||||
Directory: data.Directory,
|
Directory: data.Directory,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Execute this pull in a seperate thread since it may take a long time to complete.
|
||||||
|
go func() {
|
||||||
s.Log().WithField("download_id", dl.Identifier).WithField("url", u.String()).Info("starting pull of remote file to disk")
|
s.Log().WithField("download_id", dl.Identifier).WithField("url", u.String()).Info("starting pull of remote file to disk")
|
||||||
if err := dl.Execute(); err != nil {
|
if err := dl.Execute(); err != nil {
|
||||||
WithError(c, err)
|
s.Log().WithField("download_id", dl.Identifier).WithField("error", err).Error("failed to pull remote file")
|
||||||
return
|
} else {
|
||||||
}
|
|
||||||
s.Log().WithField("download_id", dl.Identifier).Info("completed pull of remote file")
|
s.Log().WithField("download_id", dl.Identifier).Info("completed pull of remote file")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.JSON(http.StatusAccepted, gin.H{
|
||||||
|
"identifier": dl.Identifier,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stops a remote file download if it exists and belongs to this server.
|
||||||
|
func deleteServerPullRemoteFile(c *gin.Context) {
|
||||||
|
s := ExtractServer(c)
|
||||||
|
if dl, ok := downloader.ByID(c.Param("download")); ok && dl.BelongsTo(s) {
|
||||||
|
dl.Cancel()
|
||||||
|
}
|
||||||
c.Status(http.StatusNoContent)
|
c.Status(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user