Compare commits

..

1 Commits

Author SHA1 Message Date
Pterodactyl CI
be5ad761ea bump version for release 2022-01-31 01:31:09 +00:00
47 changed files with 846 additions and 1017 deletions

View File

@@ -1,37 +1,5 @@
# Changelog # Changelog
## v1.6.4
### Fixed
* Fixes a bug causing CPU limiting to not be properly applied to servers.
* Fixes a bug causing zip archives to decompress without taking into account nested folder structures.
## v1.6.3
### Fixed
* Fixes SFTP authentication failing for administrative users due to a permissions adjustment on the Panel.
## v1.6.2
### Fixed
* Fixes file upload size not being properly enforced.
* Fixes a bug that prevented listing a directory when it contained a named pipe. Also added a check to prevent attempting to read a named pipe directly.
* Fixes a bug with the archiver logic that would include folders that had the same name prefix. (for example, requesting only `map` would also include `map2` and `map3`)
* Requests to the Panel that return a client error (4xx response code) no longer trigger an exponential backoff, they immediately stop the request.
### Changed
* CPU limit fields are only set on the Docker container if they have been specified for the server — otherwise they are left empty.
### Added
* Added the ability to define the location of the temporary folder used by Wings — defaults to `/tmp/pterodactyl`.
* Adds the ability to authenticate for SFTP using public keys (requires `Panel@1.8.0`).
## v1.6.1
### Fixed
* Fixes error that would sometimes occur when starting a server that would cause the temporary power action lock to never be released due to a blocked channel.
* Fixes a bug causing the CPU usage of Wings to get stuck at 100% when a server is deleted while the installation process is running.
### Changed
* Cleans up a lot of the logic for handling events between the server and environment process to make it easier to make modifications to down the road.
* Cleans up logic handling the `StopAndWait` logic for stopping a server gracefully before terminating the process if it does not respond.
## v1.6.0 ## v1.6.0
### Fixed ### Fixed
* Internal logic for processing a server start event has been adjusted to attach to the Docker container before attempting to start the container. This should fix issues where a server would get stuck after pulling the container image. * Internal logic for processing a server start event has been adjusted to attach to the Docker container before attempting to start the container. This should fix issues where a server would get stuck after pulling the container image.

View File

@@ -6,7 +6,7 @@ build:
debug: debug:
go build -ldflags="-X github.com/pterodactyl/wings/system.Version=$(GIT_HEAD)" go build -ldflags="-X github.com/pterodactyl/wings/system.Version=$(GIT_HEAD)"
sudo ./wings --debug --ignore-certificate-errors --config config.yml --pprof --pprof-block-rate 1 sudo ./wings --debug --ignore-certificate-errors --config config.yml --pprof
# Runs a remotly debuggable session for Wings allowing an IDE to connect and target # Runs a remotly debuggable session for Wings allowing an IDE to connect and target
# different breakpoints. # different breakpoints.

View File

@@ -11,7 +11,6 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"runtime"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -77,7 +76,6 @@ func init() {
// Flags specifically used when running the API. // Flags specifically used when running the API.
rootCommand.Flags().Bool("pprof", false, "if the pprof profiler should be enabled. The profiler will bind to localhost:6060 by default") rootCommand.Flags().Bool("pprof", false, "if the pprof profiler should be enabled. The profiler will bind to localhost:6060 by default")
rootCommand.Flags().Int("pprof-block-rate", 0, "enables block profile support, may have performance impacts")
rootCommand.Flags().Int("pprof-port", 6060, "If provided with --pprof, the port it will run on") rootCommand.Flags().Int("pprof-port", 6060, "If provided with --pprof, the port it will run on")
rootCommand.Flags().Bool("auto-tls", false, "pass in order to have wings generate and manage it's own SSL certificates using Let's Encrypt") rootCommand.Flags().Bool("auto-tls", false, "pass in order to have wings generate and manage it's own SSL certificates using Let's Encrypt")
rootCommand.Flags().String("tls-hostname", "", "required with --auto-tls, the FQDN for the generated SSL certificate") rootCommand.Flags().String("tls-hostname", "", "required with --auto-tls, the FQDN for the generated SSL certificate")
@@ -311,12 +309,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
profile, _ := cmd.Flags().GetBool("pprof") profile, _ := cmd.Flags().GetBool("pprof")
if profile { if profile {
if r, _ := cmd.Flags().GetInt("pprof-block-rate"); r > 0 {
runtime.SetBlockProfileRate(r)
}
// Catch at least 1% of mutex contention issues.
runtime.SetMutexProfileFraction(100)
profilePort, _ := cmd.Flags().GetInt("pprof-port") profilePort, _ := cmd.Flags().GetInt("pprof-port")
go func() { go func() {
http.ListenAndServe(fmt.Sprintf("localhost:%d", profilePort), nil) http.ListenAndServe(fmt.Sprintf("localhost:%d", profilePort), nil)

View File

@@ -89,8 +89,8 @@ type ApiConfiguration struct {
// servers. // servers.
DisableRemoteDownload bool `json:"disable_remote_download" yaml:"disable_remote_download"` DisableRemoteDownload bool `json:"disable_remote_download" yaml:"disable_remote_download"`
// The maximum size for files uploaded through the Panel in MB. // The maximum size for files uploaded through the Panel in bytes.
UploadLimit int64 `default:"100" json:"upload_limit" yaml:"upload_limit"` UploadLimit int `default:"100" json:"upload_limit" yaml:"upload_limit"`
} }
// RemoteQueryConfiguration defines the configuration settings for remote requests // RemoteQueryConfiguration defines the configuration settings for remote requests
@@ -132,10 +132,6 @@ type SystemConfiguration struct {
// Directory where local backups will be stored on the machine. // Directory where local backups will be stored on the machine.
BackupDirectory string `default:"/var/lib/pterodactyl/backups" yaml:"backup_directory"` BackupDirectory string `default:"/var/lib/pterodactyl/backups" yaml:"backup_directory"`
// TmpDirectory specifies where temporary files for Pterodactyl installation processes
// should be created. This supports environments running docker-in-docker.
TmpDirectory string `default:"/tmp/pterodactyl" yaml:"tmp_directory"`
// The user that should own all of the server files, and be used for containers. // The user that should own all of the server files, and be used for containers.
Username string `default:"pterodactyl" yaml:"username"` Username string `default:"pterodactyl" yaml:"username"`

View File

@@ -73,9 +73,6 @@ func (e *Environment) ContainerInspect(ctx context.Context) (types.ContainerJSON
res, err := e.client.HTTPClient().Do(req) res, err := e.client.HTTPClient().Do(req)
if err != nil { if err != nil {
if res == nil {
return st, errdefs.Unknown(err)
}
return st, errdefs.FromStatusCode(err, res.StatusCode) return st, errdefs.FromStatusCode(err, res.StatusCode)
} }

View File

@@ -480,3 +480,21 @@ func (e *Environment) convertMounts() []mount.Mount {
return out return out
} }
func (e *Environment) resources() container.Resources {
l := e.Configuration.Limits()
pids := l.ProcessLimit()
return container.Resources{
Memory: l.BoundedMemoryLimit(),
MemoryReservation: l.MemoryLimit * 1_000_000,
MemorySwap: l.ConvertedSwap(),
CPUQuota: l.ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: l.IoWeight,
OomKillDisable: &l.OOMDisabled,
CpusetCpus: l.Threads,
PidsLimit: &pids,
}
}

View File

@@ -26,7 +26,7 @@ type Metadata struct {
var _ environment.ProcessEnvironment = (*Environment)(nil) var _ environment.ProcessEnvironment = (*Environment)(nil)
type Environment struct { type Environment struct {
mu sync.RWMutex mu sync.RWMutex
// The public identifier for this environment. In this case it is the Docker container // The public identifier for this environment. In this case it is the Docker container
// name that will be used for all instances created under it. // name that will be used for all instances created under it.

View File

@@ -138,7 +138,9 @@ func (e *Environment) Start(ctx context.Context) error {
// You most likely want to be using WaitForStop() rather than this function, // You most likely want to be using WaitForStop() rather than this function,
// since this will return as soon as the command is sent, rather than waiting // since this will return as soon as the command is sent, rather than waiting
// for the process to be completed stopped. // for the process to be completed stopped.
func (e *Environment) Stop(ctx context.Context) error { //
// TODO: pass context through from the server instance.
func (e *Environment) Stop() error {
e.mu.RLock() e.mu.RLock()
s := e.meta.Stop s := e.meta.Stop
e.mu.RUnlock() e.mu.RUnlock()
@@ -162,7 +164,7 @@ func (e *Environment) Stop(ctx context.Context) error {
case "SIGTERM": case "SIGTERM":
signal = syscall.SIGTERM signal = syscall.SIGTERM
} }
return e.Terminate(ctx, signal) return e.Terminate(signal)
} }
// If the process is already offline don't switch it back to stopping. Just leave it how // If the process is already offline don't switch it back to stopping. Just leave it how
@@ -177,10 +179,8 @@ func (e *Environment) Stop(ctx context.Context) error {
return e.SendCommand(s.Value) return e.SendCommand(s.Value)
} }
// Allow the stop action to run for however long it takes, similar to executing a command t := time.Second * 30
// and using a different logic pathway to wait for the container to stop successfully. if err := e.client.ContainerStop(context.Background(), e.Id, &t); err != nil {
t := time.Duration(-1)
if err := e.client.ContainerStop(ctx, e.Id, &t); err != nil {
// If the container does not exist just mark the process as stopped and return without // If the container does not exist just mark the process as stopped and return without
// an error. // an error.
if client.IsErrNotFound(err) { if client.IsErrNotFound(err) {
@@ -198,66 +198,45 @@ func (e *Environment) Stop(ctx context.Context) error {
// command. If the server does not stop after seconds have passed, an error will // command. If the server does not stop after seconds have passed, an error will
// be returned, or the instance will be terminated forcefully depending on the // be returned, or the instance will be terminated forcefully depending on the
// value of the second argument. // value of the second argument.
// func (e *Environment) WaitForStop(seconds uint, terminate bool) error {
// Calls to Environment.Terminate() in this function use the context passed if err := e.Stop(); err != nil {
// through since we don't want to prevent termination of the server instance
// just because the context.WithTimeout() has expired.
func (e *Environment) WaitForStop(ctx context.Context, duration time.Duration, terminate bool) error {
tctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
// If the parent context is canceled, abort the timed context for termination.
go func() {
select {
case <-ctx.Done():
cancel()
case <-tctx.Done():
// When the timed context is canceled, terminate this routine since we no longer
// need to worry about the parent routine being canceled.
break
}
}()
doTermination := func(s string) error {
e.log().WithField("step", s).WithField("duration", duration).Warn("container stop did not complete in time, terminating process...")
return e.Terminate(ctx, os.Kill)
}
// We pass through the timed context for this stop action so that if one of the
// internal docker calls fails to ever finish before we've exhausted the time limit
// the resources get cleaned up, and the exection is stopped.
if err := e.Stop(tctx); err != nil {
if terminate && errors.Is(err, context.DeadlineExceeded) {
return doTermination("stop")
}
return err return err
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
// Block the return of this function until the container as been marked as no // Block the return of this function until the container as been marked as no
// longer running. If this wait does not end by the time seconds have passed, // longer running. If this wait does not end by the time seconds have passed,
// attempt to terminate the container, or return an error. // attempt to terminate the container, or return an error.
ok, errChan := e.client.ContainerWait(tctx, e.Id, container.WaitConditionNotRunning) ok, errChan := e.client.ContainerWait(ctx, e.Id, container.WaitConditionNotRunning)
select { select {
case <-ctx.Done(): case <-ctx.Done():
if err := ctx.Err(); err != nil { if ctxErr := ctx.Err(); ctxErr != nil {
if terminate { if terminate {
return doTermination("parent-context") log.WithField("container_id", e.Id).Info("server did not stop in time, executing process termination")
return e.Terminate(os.Kill)
} }
return err
return ctxErr
} }
case err := <-errChan: case err := <-errChan:
// If the error stems from the container not existing there is no point in wasting // If the error stems from the container not existing there is no point in wasting
// CPU time to then try and terminate it. // CPU time to then try and terminate it.
if err == nil || client.IsErrNotFound(err) { if err != nil && !client.IsErrNotFound(err) {
return nil if terminate {
} l := log.WithField("container_id", e.Id)
if terminate { if errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, context.DeadlineExceeded) { l.Warn("deadline exceeded for container stop; terminating process")
e.log().WithField("error", err).Warn("error while waiting for container stop; terminating process") } else {
l.WithField("error", err).Warn("error while waiting for container stop; terminating process")
}
return e.Terminate(os.Kill)
} }
return doTermination("wait") return errors.WrapIf(err, "environment/docker: error waiting on container to enter \"not-running\" state")
} }
return errors.WrapIf(err, "environment/docker: error waiting on container to enter \"not-running\" state")
case <-ok: case <-ok:
} }
@@ -265,8 +244,8 @@ func (e *Environment) WaitForStop(ctx context.Context, duration time.Duration, t
} }
// Terminate forcefully terminates the container using the signal provided. // Terminate forcefully terminates the container using the signal provided.
func (e *Environment) Terminate(ctx context.Context, signal os.Signal) error { func (e *Environment) Terminate(signal os.Signal) error {
c, err := e.ContainerInspect(ctx) c, err := e.ContainerInspect(context.Background())
if err != nil { if err != nil {
// Treat missing containers as an okay error state, means it is obviously // Treat missing containers as an okay error state, means it is obviously
// already terminated at this point. // already terminated at this point.
@@ -291,7 +270,7 @@ func (e *Environment) Terminate(ctx context.Context, signal os.Signal) error {
// We set it to stopping than offline to prevent crash detection from being triggered. // We set it to stopping than offline to prevent crash detection from being triggered.
e.SetState(environment.ProcessStoppingState) e.SetState(environment.ProcessStoppingState)
sig := strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed") sig := strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed")
if err := e.client.ContainerKill(ctx, e.Id, sig); err != nil && !client.IsErrNotFound(err) { if err := e.client.ContainerKill(context.Background(), e.Id, sig); err != nil && !client.IsErrNotFound(err) {
return errors.WithStack(err) return errors.WithStack(err)
} }
e.SetState(environment.ProcessOfflineState) e.SetState(environment.ProcessOfflineState)

View File

@@ -3,7 +3,6 @@ package environment
import ( import (
"context" "context"
"os" "os"
"time"
"github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/events"
) )
@@ -59,20 +58,18 @@ type ProcessEnvironment interface {
// can be started an error should be returned. // can be started an error should be returned.
Start(ctx context.Context) error Start(ctx context.Context) error
// Stop stops a server instance. If the server is already stopped an error will // Stops a server instance. If the server is already stopped an error should
// not be returned, this function will act as a no-op. // not be returned.
Stop(ctx context.Context) error Stop() error
// WaitForStop waits for a server instance to stop gracefully. If the server is // Waits for a server instance to stop gracefully. If the server is still detected
// still detected as running after "duration", an error will be returned, or the server // as running after seconds, an error will be returned, or the server will be terminated
// will be terminated depending on the value of the second argument. If the context // depending on the value of the second argument.
// provided is canceled the underlying wait conditions will be stopped and the WaitForStop(seconds uint, terminate bool) error
// entire loop will be ended (potentially without stopping or terminating).
WaitForStop(ctx context.Context, duration time.Duration, terminate bool) error
// Terminate stops a running server instance using the provided signal. This function // Terminates a running server instance using the provided signal. If the server
// is a no-op if the server is already stopped. // is not running no error should be returned.
Terminate(ctx context.Context, signal os.Signal) error Terminate(signal os.Signal) error
// Destroys the environment removing any containers that were created (in Docker // Destroys the environment removing any containers that were created (in Docker
// environments at least). // environments at least).

View File

@@ -99,36 +99,21 @@ func (l Limits) ProcessLimit() int64 {
return config.Get().Docker.ContainerPidLimit return config.Get().Docker.ContainerPidLimit
} }
// AsContainerResources returns the available resources for a container in a format
// that Docker understands.
func (l Limits) AsContainerResources() container.Resources { func (l Limits) AsContainerResources() container.Resources {
pids := l.ProcessLimit() pids := l.ProcessLimit()
resources := container.Resources{
return container.Resources{
Memory: l.BoundedMemoryLimit(), Memory: l.BoundedMemoryLimit(),
MemoryReservation: l.MemoryLimit * 1_000_000, MemoryReservation: l.MemoryLimit * 1_000_000,
MemorySwap: l.ConvertedSwap(), MemorySwap: l.ConvertedSwap(),
CPUQuota: l.ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: l.IoWeight, BlkioWeight: l.IoWeight,
OomKillDisable: &l.OOMDisabled, OomKillDisable: &l.OOMDisabled,
CpusetCpus: l.Threads,
PidsLimit: &pids, PidsLimit: &pids,
} }
// If the CPU Limit is not set, don't send any of these fields through. Providing
// them seems to break some Java services that try to read the available processors.
//
// @see https://github.com/pterodactyl/panel/issues/3988
if l.CpuLimit > 0 {
resources.CPUQuota = l.CpuLimit * 1_000
resources.CPUPeriod = 100_000
resources.CPUShares = 1024
}
// Similar to above, don't set the specific assigned CPUs if we didn't actually limit
// the server to any of them.
if l.Threads != "" {
resources.CpusetCpus = l.Threads
}
return resources
} }
type Variables map[string]interface{} type Variables map[string]interface{}

View File

@@ -2,12 +2,11 @@ package events
import ( import (
"strings" "strings"
"sync"
"emperror.dev/errors"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/system"
) )
type Listener chan Event
// Event represents an Event sent over a Bus. // Event represents an Event sent over a Bus.
type Event struct { type Event struct {
Topic string Topic string
@@ -16,55 +15,137 @@ type Event struct {
// Bus represents an Event Bus. // Bus represents an Event Bus.
type Bus struct { type Bus struct {
*system.SinkPool listenersMx sync.Mutex
listeners map[string][]Listener
} }
// NewBus returns a new empty Bus. This is simply a nicer wrapper around the // NewBus returns a new empty Event Bus.
// system.SinkPool implementation that allows for more simplistic usage within
// the codebase.
//
// All of the events emitted out of this bus are byte slices that can be decoded
// back into an events.Event interface.
func NewBus() *Bus { func NewBus() *Bus {
return &Bus{ return &Bus{
system.NewSinkPool(), listeners: make(map[string][]Listener),
}
}
// Off unregisters a listener from the specified topics on the Bus.
func (b *Bus) Off(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
var closed bool
for _, topic := range topics {
ok := b.off(topic, listener)
if !closed && ok {
close(listener)
closed = true
}
}
}
func (b *Bus) off(topic string, listener Listener) bool {
listeners, ok := b.listeners[topic]
if !ok {
return false
}
for i, l := range listeners {
if l != listener {
continue
}
listeners = append(listeners[:i], listeners[i+1:]...)
b.listeners[topic] = listeners
return true
}
return false
}
// On registers a listener to the specified topics on the Bus.
func (b *Bus) On(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
for _, topic := range topics {
b.on(topic, listener)
}
}
func (b *Bus) on(topic string, listener Listener) {
listeners, ok := b.listeners[topic]
if !ok {
b.listeners[topic] = []Listener{listener}
} else {
b.listeners[topic] = append(listeners, listener)
} }
} }
// Publish publishes a message to the Bus. // Publish publishes a message to the Bus.
func (b *Bus) Publish(topic string, data interface{}) { func (b *Bus) Publish(topic string, data interface{}) {
// Some of our actions for the socket support passing a more specific namespace, // Some of our topics for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed. // such as "backup completed:1234" to indicate which specific backup was completed.
// //
// In these cases, we still need to send the event using the standard listener // In these cases, we still need to send the event using the standard listener
// name of "backup completed". // name of "backup completed".
if strings.Contains(topic, ":") { if strings.Contains(topic, ":") {
parts := strings.SplitN(topic, ":", 2) parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 { if len(parts) == 2 {
topic = parts[0] topic = parts[0]
} }
} }
enc, err := json.Marshal(Event{Topic: topic, Data: data}) b.listenersMx.Lock()
if err != nil { defer b.listenersMx.Unlock()
panic(errors.WithStack(err))
listeners, ok := b.listeners[topic]
if !ok {
return
} }
b.Push(enc) if len(listeners) < 1 {
return
}
var wg sync.WaitGroup
event := Event{Topic: topic, Data: data}
for _, listener := range listeners {
l := listener
wg.Add(1)
go func(l Listener, event Event) {
defer wg.Done()
l <- event
}(l, event)
}
wg.Wait()
} }
// MustDecode decodes the event byte slice back into an events.Event struct or // Destroy destroys the Event Bus by unregistering and closing all listeners.
// panics if an error is encountered during this process. func (b *Bus) Destroy() {
func MustDecode(data []byte) (e Event) { b.listenersMx.Lock()
if err := DecodeTo(data, &e); err != nil { defer b.listenersMx.Unlock()
panic(err)
// Track what listeners have already been closed. Because the same listener
// can be listening on multiple topics, we need a way to essentially
// "de-duplicate" all the listeners across all the topics.
var closed []Listener
for _, listeners := range b.listeners {
for _, listener := range listeners {
if contains(closed, listener) {
continue
}
close(listener)
closed = append(closed, listener)
}
} }
return
b.listeners = make(map[string][]Listener)
} }
// DecodeTo decodes a byte slice of event data into the given interface. func contains(closed []Listener, listener Listener) bool {
func DecodeTo(data []byte, v interface{}) error { for _, c := range closed {
if err := json.Unmarshal(data, &v); err != nil { if c == listener {
return errors.Wrap(err, "events: failed to decode byte slice") return true
}
} }
return nil return false
} }

View File

@@ -9,90 +9,162 @@ import (
func TestNewBus(t *testing.T) { func TestNewBus(t *testing.T) {
g := Goblin(t) g := Goblin(t)
bus := NewBus()
g.Describe("Events", func() { g.Describe("NewBus", func() {
var bus *Bus g.It("is not nil", func() {
g.BeforeEach(func() { g.Assert(bus).IsNotNil("Bus expected to not be nil")
bus = NewBus() g.Assert(bus.listeners).IsNotNil("Bus#listeners expected to not be nil")
}) })
})
g.Describe("NewBus", func() { }
g.It("is not nil", func() {
g.Assert(bus).IsNotNil("Bus expected to not be nil") func TestBus_Off(t *testing.T) {
}) g := Goblin(t)
})
const topic = "test"
g.Describe("Publish", func() {
const topic = "test" g.Describe("Off", func() {
const message = "this is a test message!" g.It("unregisters listener", func() {
bus := NewBus()
g.It("publishes message", func() {
bus := NewBus() g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan []byte) listener := make(chan Event)
bus.On(listener) bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
done := make(chan struct{}, 1)
go func() { bus.Off(listener, topic)
select { g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
case v := <-listener: })
m := MustDecode(v)
g.Assert(m.Topic).Equal(topic) g.It("unregisters correct listener", func() {
g.Assert(m.Data).Equal(message) bus := NewBus()
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time") listener := make(chan Event)
} listener2 := make(chan Event)
done <- struct{}{} listener3 := make(chan Event)
}() bus.On(listener, topic)
bus.Publish(topic, message) bus.On(listener2, topic)
<-done bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listeners were not registered")
// Cleanup
bus.Off(listener) bus.Off(listener, topic)
}) bus.Off(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Expected 1 listener to remain")
g.It("publishes message to all listeners", func() {
bus := NewBus() if bus.listeners[topic][0] != listener2 {
// A normal Assert does not properly compare channels.
listener := make(chan []byte) g.Fail("wrong listener unregistered")
listener2 := make(chan []byte) }
listener3 := make(chan []byte)
bus.On(listener) // Cleanup
bus.On(listener2) bus.Off(listener2, topic)
bus.On(listener3) })
})
done := make(chan struct{}, 1) }
go func() {
for i := 0; i < 3; i++ { func TestBus_On(t *testing.T) {
select { g := Goblin(t)
case v := <-listener:
m := MustDecode(v) const topic = "test"
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message) g.Describe("On", func() {
case v := <-listener2: g.It("registers listener", func() {
m := MustDecode(v) bus := NewBus()
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message) g.Assert(bus.listeners[topic]).IsNotNil()
case v := <-listener3: g.Assert(len(bus.listeners[topic])).IsZero()
m := MustDecode(v) listener := make(chan Event)
g.Assert(m.Topic).Equal(topic) bus.On(listener, topic)
g.Assert(m.Data).Equal(message) g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
case <-time.After(1 * time.Second):
g.Fail("all listeners did not receive the message in time") if bus.listeners[topic][0] != listener {
i = 3 // A normal Assert does not properly compare channels.
} g.Fail("wrong listener registered")
} }
done <- struct{}{} // Cleanup
}() bus.Off(listener, topic)
bus.Publish(topic, message) })
<-done })
}
// Cleanup
bus.Off(listener) func TestBus_Publish(t *testing.T) {
bus.Off(listener2) g := Goblin(t)
bus.Off(listener3)
}) const topic = "test"
const message = "this is a test message!"
g.Describe("Publish", func() {
g.It("publishes message", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
bus.Off(listener, topic)
})
g.It("publishes message to all listeners", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
listener2 := make(chan Event)
listener3 := make(chan Event)
bus.On(listener, topic)
bus.On(listener2, topic)
bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listener was not registered")
done := make(chan struct{}, 1)
go func() {
for i := 0; i < 3; i++ {
select {
case m := <-listener:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case m := <-listener2:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case m := <-listener3:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("all listeners did not receive the message in time")
i = 3
}
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
bus.Off(listener, topic)
bus.Off(listener2, topic)
bus.Off(listener3, topic)
}) })
}) })
} }

105
go.mod
View File

@@ -3,113 +3,116 @@ module github.com/pterodactyl/wings
go 1.17 go 1.17
require ( require (
emperror.dev/errors v0.8.1 emperror.dev/errors v0.8.0
github.com/AlecAivazis/survey/v2 v2.3.4 github.com/AlecAivazis/survey/v2 v2.2.15
github.com/Jeffail/gabs/v2 v2.6.1 github.com/Jeffail/gabs/v2 v2.6.1
github.com/NYTimes/logrotate v1.0.0 github.com/NYTimes/logrotate v1.0.0
github.com/apex/log v1.9.0 github.com/apex/log v1.9.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/beevik/etree v1.1.0 github.com/beevik/etree v1.1.0
github.com/buger/jsonparser v1.1.1 github.com/buger/jsonparser v1.1.1
github.com/cenkalti/backoff/v4 v4.1.2 github.com/cenkalti/backoff/v4 v4.1.1
github.com/cobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 github.com/cobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249
github.com/creasty/defaults v1.5.2 github.com/creasty/defaults v1.5.1
github.com/docker/docker v20.10.14+incompatible github.com/docker/docker v20.10.7+incompatible
github.com/docker/go-connections v0.4.0 github.com/docker/go-connections v0.4.0
github.com/fatih/color v1.13.0 github.com/fatih/color v1.12.0
github.com/franela/goblin v0.0.0-20200825194134-80c0062ed6cd github.com/franela/goblin v0.0.0-20200825194134-80c0062ed6cd
github.com/gabriel-vasile/mimetype v1.4.0 github.com/gabriel-vasile/mimetype v1.3.1
github.com/gammazero/workerpool v1.1.2 github.com/gammazero/workerpool v1.1.2
github.com/gbrlsnchs/jwt/v3 v3.0.1 github.com/gbrlsnchs/jwt/v3 v3.0.1
github.com/gin-gonic/gin v1.7.7 github.com/gin-gonic/gin v1.7.2
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.4.2
github.com/iancoleman/strcase v0.2.0 github.com/iancoleman/strcase v0.2.0
github.com/icza/dyno v0.0.0-20210726202311-f1bafe5d9996 github.com/icza/dyno v0.0.0-20210726202311-f1bafe5d9996
github.com/juju/ratelimit v1.0.1 github.com/juju/ratelimit v1.0.1
github.com/karrick/godirwalk v1.16.1 github.com/karrick/godirwalk v1.16.1
github.com/klauspost/pgzip v1.2.5 github.com/klauspost/pgzip v1.2.5
github.com/magiconair/properties v1.8.6 github.com/magiconair/properties v1.8.5
github.com/mattn/go-colorable v0.1.12 github.com/mattn/go-colorable v0.1.8
github.com/mholt/archiver/v3 v3.5.1 github.com/mholt/archiver/v3 v3.5.0
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/sftp v1.13.4 github.com/pkg/profile v1.6.0
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 github.com/pkg/sftp v1.13.2
github.com/spf13/cobra v1.4.0 github.com/sabhiram/go-gitignore v0.0.0-20201211210132-54b8a0bf510f
github.com/spf13/cobra v1.2.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/ini.v1 v1.66.4 gopkg.in/ini.v1 v1.62.0
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )
require github.com/goccy/go-json v0.9.6 require github.com/goccy/go-json v0.9.4
require golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect require golang.org/x/sys v0.0.0-20211110154304-99a53858aa08 // indirect
require ( require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/Microsoft/hcsshim v0.9.2 // indirect github.com/Microsoft/hcsshim v0.8.20 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect github.com/andybalholm/brotli v1.0.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/containerd/containerd v1.6.2 // indirect github.com/containerd/containerd v1.5.5 // indirect
github.com/containerd/fifo v1.0.0 // indirect github.com/containerd/fifo v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect github.com/dsnet/compress v0.0.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gammazero/deque v0.1.1 // indirect github.com/gammazero/deque v0.1.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.10.1 // indirect github.com/go-playground/validator/v10 v10.8.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/mux v1.7.4 // indirect github.com/gorilla/mux v1.7.4 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.11 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.15.1 // indirect github.com/klauspost/compress v1.13.2 // indirect
github.com/kr/fs v0.1.0 // indirect github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
github.com/magefile/mage v1.13.0 // indirect github.com/magefile/mage v1.11.0 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-isatty v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/nwaples/rardecode v1.1.3 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/nwaples/rardecode v1.1.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect github.com/ugorji/go/codec v1.1.7 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect github.com/ulikunitz/xz v0.5.10 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect go.uber.org/multierr v1.7.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb // indirect google.golang.org/genproto v0.0.0-20210729151513-df9385d47c1b // indirect
google.golang.org/grpc v1.45.0 // indirect google.golang.org/grpc v1.39.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
) )

474
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -11,9 +11,9 @@ import (
"github.com/apex/log" "github.com/apex/log"
"github.com/beevik/etree" "github.com/beevik/etree"
"github.com/buger/jsonparser" "github.com/buger/jsonparser"
"github.com/goccy/go-json"
"github.com/icza/dyno" "github.com/icza/dyno"
"github.com/magiconair/properties" "github.com/magiconair/properties"
"github.com/goccy/go-json"
"gopkg.in/ini.v1" "gopkg.in/ini.v1"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"

View File

@@ -142,10 +142,12 @@ func (c *client) request(ctx context.Context, method, path string, body io.Reade
if r.HasError() { if r.HasError() {
// Close the request body after returning the error to free up resources. // Close the request body after returning the error to free up resources.
defer r.Body.Close() defer r.Body.Close()
// Don't keep attempting to access this endpoint if the response is a 4XX // Don't keep spamming the endpoint if we've already made too many requests or
// level error which indicates a client mistake. Only retry when the error // if we're not even authenticated correctly. Retrying generally won't fix either
// is due to a server issue (5XX error). // of these issues.
if r.StatusCode >= 400 && r.StatusCode < 500 { if r.StatusCode == http.StatusForbidden ||
r.StatusCode == http.StatusTooManyRequests ||
r.StatusCode == http.StatusUnauthorized {
return backoff.Permanent(r.Error()) return backoff.Permanent(r.Error())
} }
return r.Error() return r.Error()

View File

@@ -11,11 +11,6 @@ import (
"github.com/pterodactyl/wings/parser" "github.com/pterodactyl/wings/parser"
) )
const (
SftpAuthPassword = SftpAuthRequestType("password")
SftpAuthPublicKey = SftpAuthRequestType("public_key")
)
// A generic type allowing for easy binding use when making requests to API // A generic type allowing for easy binding use when making requests to API
// endpoints that only expect a singular argument or something that would not // endpoints that only expect a singular argument or something that would not
// benefit from being a typed struct. // benefit from being a typed struct.
@@ -68,17 +63,14 @@ type RawServerData struct {
ProcessConfiguration json.RawMessage `json:"process_configuration"` ProcessConfiguration json.RawMessage `json:"process_configuration"`
} }
type SftpAuthRequestType string
// SftpAuthRequest defines the request details that are passed along to the Panel // SftpAuthRequest defines the request details that are passed along to the Panel
// when determining if the credentials provided to Wings are valid. // when determining if the credentials provided to Wings are valid.
type SftpAuthRequest struct { type SftpAuthRequest struct {
Type SftpAuthRequestType `json:"type"` User string `json:"username"`
User string `json:"username"` Pass string `json:"password"`
Pass string `json:"password"` IP string `json:"ip"`
IP string `json:"ip"` SessionID []byte `json:"session_id"`
SessionID []byte `json:"session_id"` ClientVersion []byte `json:"client_version"`
ClientVersion []byte `json:"client_version"`
} }
// SftpAuthResponse is returned by the Panel when a pair of SFTP credentials // SftpAuthResponse is returned by the Panel when a pair of SFTP credentials
@@ -87,6 +79,7 @@ type SftpAuthRequest struct {
// user for the SFTP subsystem. // user for the SFTP subsystem.
type SftpAuthResponse struct { type SftpAuthResponse struct {
Server string `json:"server"` Server string `json:"server"`
Token string `json:"token"`
Permissions []string `json:"permissions"` Permissions []string `json:"permissions"`
} }

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"mime"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@@ -14,8 +13,8 @@ import (
"time" "time"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/goccy/go-json"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
) )
@@ -78,13 +77,10 @@ func (c *Counter) Write(p []byte) (int, error) {
type DownloadRequest struct { type DownloadRequest struct {
Directory string Directory string
URL *url.URL URL *url.URL
FileName string
UseHeader bool
} }
type Download struct { type Download struct {
Identifier string Identifier string
path string
mu sync.RWMutex mu sync.RWMutex
req DownloadRequest req DownloadRequest
server *server.Server server *server.Server
@@ -176,28 +172,8 @@ func (dl *Download) Execute() error {
} }
} }
if dl.req.UseHeader { fnameparts := strings.Split(dl.req.URL.Path, "/")
if contentDisposition := res.Header.Get("Content-Disposition"); contentDisposition != "" { p := filepath.Join(dl.req.Directory, fnameparts[len(fnameparts)-1])
_, params, err := mime.ParseMediaType(contentDisposition)
if err != nil {
return errors.WrapIf(err, "downloader: invalid \"Content-Disposition\" header")
}
if v, ok := params["filename"]; ok {
dl.path = v
}
}
}
if dl.path == "" {
if dl.req.FileName != "" {
dl.path = dl.req.FileName
} else {
parts := strings.Split(dl.req.URL.Path, "/")
dl.path = parts[len(parts)-1]
}
}
p := dl.Path()
dl.server.Log().WithField("path", p).Debug("writing remote file to disk") dl.server.Log().WithField("path", p).Debug("writing remote file to disk")
r := io.TeeReader(res.Body, dl.counter(res.ContentLength)) r := io.TeeReader(res.Body, dl.counter(res.ContentLength))
@@ -229,10 +205,6 @@ func (dl *Download) Progress() float64 {
return dl.progress return dl.progress
} }
func (dl *Download) Path() string {
return filepath.Join(dl.req.Directory, dl.path)
}
// Handles a write event by updating the progress completed percentage and firing off // Handles a write event by updating the progress completed percentage and firing off
// events to the server websocket as needed. // events to the server websocket as needed.
func (dl *Download) counter(contentLength int64) *Counter { func (dl *Download) counter(contentLength int64) *Counter {

View File

@@ -13,8 +13,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/pterodactyl/wings/config"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -37,15 +35,6 @@ func getServerFileContents(c *gin.Context) {
return return
} }
defer f.Close() defer f.Close()
// Don't allow a named pipe to be opened.
//
// @see https://github.com/pterodactyl/panel/issues/4059
if st.Mode()&os.ModeNamedPipe != 0 {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "Cannot open files of this type.",
})
return
}
c.Header("X-Mime-Type", st.Mimetype) c.Header("X-Mime-Type", st.Mimetype)
c.Header("Content-Length", strconv.Itoa(int(st.Size()))) c.Header("Content-Length", strconv.Itoa(int(st.Size())))
@@ -131,10 +120,6 @@ func putServerRenameFiles(c *gin.Context) {
// Return nil if the error is an is not exists. // Return nil if the error is an is not exists.
// NOTE: os.IsNotExist() does not work if the error is wrapped. // NOTE: os.IsNotExist() does not work if the error is wrapped.
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
s.Log().WithField("error", err).
WithField("from_path", pf).
WithField("to_path", pt).
Warn("failed to rename: source or target does not exist")
return nil return nil
} }
return err return err
@@ -270,12 +255,9 @@ func postServerPullRemoteFile(c *gin.Context) {
s := ExtractServer(c) s := ExtractServer(c)
var data struct { var data struct {
// Deprecated // Deprecated
Directory string `binding:"required_without=RootPath,omitempty" json:"directory"` Directory string `binding:"required_without=RootPath,omitempty" json:"directory"`
RootPath string `binding:"required_without=Directory,omitempty" json:"root"` RootPath string `binding:"required_without=Directory,omitempty" json:"root"`
URL string `binding:"required" json:"url"` URL string `binding:"required" json:"url"`
FileName string `json:"file_name"`
UseHeader bool `json:"use_header"`
Foreground bool `json:"foreground"`
} }
if err := c.BindJSON(&data); err != nil { if err := c.BindJSON(&data); err != nil {
return return
@@ -313,41 +295,21 @@ func postServerPullRemoteFile(c *gin.Context) {
dl := downloader.New(s, downloader.DownloadRequest{ dl := downloader.New(s, downloader.DownloadRequest{
Directory: data.RootPath, Directory: data.RootPath,
URL: u, URL: u,
FileName: data.FileName,
UseHeader: data.UseHeader,
}) })
download := func() error { // Execute this pull in a separate thread since it may take a long time to complete.
go func() {
s.Log().WithField("download_id", dl.Identifier).WithField("url", u.String()).Info("starting pull of remote file to disk") s.Log().WithField("download_id", dl.Identifier).WithField("url", u.String()).Info("starting pull of remote file to disk")
if err := dl.Execute(); err != nil { if err := dl.Execute(); err != nil {
s.Log().WithField("download_id", dl.Identifier).WithField("error", err).Error("failed to pull remote file") s.Log().WithField("download_id", dl.Identifier).WithField("error", err).Error("failed to pull remote file")
return err
} else { } else {
s.Log().WithField("download_id", dl.Identifier).Info("completed pull of remote file") s.Log().WithField("download_id", dl.Identifier).Info("completed pull of remote file")
} }
return nil }()
}
if !data.Foreground {
go func() {
_ = download()
}()
c.JSON(http.StatusAccepted, gin.H{
"identifier": dl.Identifier,
})
return
}
if err := download(); err != nil { c.JSON(http.StatusAccepted, gin.H{
NewServerError(err, s).Abort(c) "identifier": dl.Identifier,
return })
}
st, err := s.Filesystem().Stat(dl.Path())
if err != nil {
NewServerError(err, s).AbortFilesystemError(c)
return
}
c.JSON(http.StatusOK, &st)
} }
// Stops a remote file download if it exists and belongs to this server. // Stops a remote file download if it exists and belongs to this server.
@@ -575,16 +537,8 @@ func postServerUploadFiles(c *gin.Context) {
directory := c.Query("directory") directory := c.Query("directory")
maxFileSize := config.Get().Api.UploadLimit
maxFileSizeBytes := maxFileSize * 1024 * 1024
var totalSize int64 var totalSize int64
for _, header := range headers { for _, header := range headers {
if header.Size > maxFileSizeBytes {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "File " + header.Filename + " is larger than the maximum file upload size of " + strconv.FormatInt(maxFileSize, 10) + " MB.",
})
return
}
totalSize += header.Size totalSize += header.Size
} }

View File

@@ -5,8 +5,8 @@ import (
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/goccy/go-json"
ws "github.com/gorilla/websocket" ws "github.com/gorilla/websocket"
"github.com/goccy/go-json"
"github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/router/websocket" "github.com/pterodactyl/wings/router/websocket"

View File

@@ -178,7 +178,7 @@ func postServerArchive(c *gin.Context) {
// Ensure the server is offline. Sometimes a "No such container" error gets through // Ensure the server is offline. Sometimes a "No such container" error gets through
// which means the server is already stopped. We can ignore that. // which means the server is already stopped. We can ignore that.
if err := s.Environment.WaitForStop(s.Context(), time.Minute, false); err != nil && !strings.Contains(strings.ToLower(err.Error()), "no such container") { if err := s.Environment.WaitForStop(60, false); err != nil && !strings.Contains(strings.ToLower(err.Error()), "no such container") {
sendTransferLog("Failed to stop server, aborting transfer..") sendTransferLog("Failed to stop server, aborting transfer..")
l.WithField("error", err).Error("failed to stop server") l.WithField("error", err).Error("failed to stop server")
return return

View File

@@ -7,9 +7,8 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
"github.com/goccy/go-json" "github.com/goccy/go-json"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/system"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
) )
@@ -89,13 +88,12 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
eventChan := make(chan []byte) eventChan := make(chan events.Event)
logOutput := make(chan []byte, 8) logOutput := make(chan []byte, 8)
installOutput := make(chan []byte, 4) installOutput := make(chan []byte, 4)
h.server.Events().On(eventChan, e...)
h.server.Events().On(eventChan) // TODO: make a sinky h.server.Sink(server.LogSink).On(logOutput)
h.server.Sink(system.LogSink).On(logOutput) h.server.Sink(server.InstallSink).On(installOutput)
h.server.Sink(system.InstallSink).On(installOutput)
onError := func(evt string, err2 error) { onError := func(evt string, err2 error) {
h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket") h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket")
@@ -112,23 +110,19 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
break break
case b := <-logOutput: case e := <-logOutput:
sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(b)}}) sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}})
if sendErr == nil { if sendErr == nil {
continue continue
} }
onError(server.ConsoleOutputEvent, sendErr) onError(server.ConsoleOutputEvent, sendErr)
case b := <-installOutput: case e := <-installOutput:
sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(b)}}) sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}})
if sendErr == nil { if sendErr == nil {
continue continue
} }
onError(server.InstallOutputEvent, sendErr) onError(server.InstallOutputEvent, sendErr)
case b := <-eventChan: case e := <-eventChan:
var e events.Event
if err := events.DecodeTo(b, &e); err != nil {
continue
}
var sendErr error var sendErr error
message := Message{Event: e.Topic} message := Message{Event: e.Topic}
if str, ok := e.Data.(string); ok { if str, ok := e.Data.(string); ok {
@@ -154,9 +148,9 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
} }
// These functions will automatically close the channel if it hasn't been already. // These functions will automatically close the channel if it hasn't been already.
h.server.Events().Off(eventChan) h.server.Events().Off(eventChan, e...)
h.server.Sink(system.LogSink).Off(logOutput) h.server.Sink(server.LogSink).Off(logOutput)
h.server.Sink(system.InstallSink).Off(installOutput) h.server.Sink(server.InstallSink).Off(installOutput)
// If the internal context is stopped it is either because the parent context // If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil // got canceled or because we ran into an error. If the "err" variable is nil

View File

@@ -11,10 +11,9 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
"github.com/gbrlsnchs/jwt/v3" "github.com/gbrlsnchs/jwt/v3"
"github.com/goccy/go-json"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pterodactyl/wings/system" "github.com/goccy/go-json"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
@@ -354,7 +353,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
} }
err := h.server.HandlePowerAction(action) err := h.server.HandlePowerAction(action)
if errors.Is(err, system.ErrLockerLocked) { if errors.Is(err, context.DeadlineExceeded) {
m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later") m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later")
_ = h.SendJson(Message{ _ = h.SendJson(Message{

View File

@@ -142,7 +142,7 @@ func (s *Server) RestoreBackup(b backup.BackupInterface, reader io.ReadCloser) (
// instance, otherwise you'll likely hit all types of write errors due to the // instance, otherwise you'll likely hit all types of write errors due to the
// server being suspended. // server being suspended.
if s.Environment.State() != environment.ProcessOfflineState { if s.Environment.State() != environment.ProcessOfflineState {
if err = s.Environment.WaitForStop(s.Context(), time.Minute*2, false); err != nil { if err = s.Environment.WaitForStop(120, false); err != nil {
if !client.IsErrNotFound(err) { if !client.IsErrNotFound(err) {
return errors.WrapIf(err, "server/backup: restore: failed to wait for container stop") return errors.WrapIf(err, "server/backup: restore: failed to wait for container stop")
} }

View File

@@ -6,14 +6,12 @@ import (
"github.com/gammazero/workerpool" "github.com/gammazero/workerpool"
) )
// UpdateConfigurationFiles updates all of the defined configuration files for // Parent function that will update all of the defined configuration files for a server
// a server automatically to ensure that they always use the specified values. // automatically to ensure that they always use the specified values.
func (s *Server) UpdateConfigurationFiles() { func (s *Server) UpdateConfigurationFiles() {
pool := workerpool.New(runtime.NumCPU()) pool := workerpool.New(runtime.NumCPU())
s.Log().Debug("acquiring process configuration files...")
files := s.ProcessConfiguration().ConfigurationFiles files := s.ProcessConfiguration().ConfigurationFiles
s.Log().Debug("acquired process configuration files")
for _, cf := range files { for _, cf := range files {
f := cf f := cf
@@ -28,8 +26,6 @@ func (s *Server) UpdateConfigurationFiles() {
if err := f.Parse(p, false); err != nil { if err := f.Parse(p, false); err != nil {
s.Log().WithField("error", err).Error("failed to parse and update server configuration file") s.Log().WithField("error", err).Error("failed to parse and update server configuration file")
} }
s.Log().WithField("path", f.FileName).Debug("finished processing server configuration file")
}) })
} }

View File

@@ -19,7 +19,7 @@ func TestName(t *testing.T) {
}) })
g.It("calls strike once per time period", func() { g.It("calls strike once per time period", func() {
t := newConsoleThrottle(1, time.Millisecond*20) t := newConsoleThrottle(1, time.Millisecond * 20)
var times int var times int
t.strike = func() { t.strike = func() {
@@ -53,10 +53,10 @@ func TestName(t *testing.T) {
} }
func BenchmarkConsoleThrottle(b *testing.B) { func BenchmarkConsoleThrottle(b *testing.B) {
t := newConsoleThrottle(10, time.Millisecond*10) t := newConsoleThrottle(10, time.Millisecond * 10)
b.ReportAllocs() b.ReportAllocs()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
t.Allow() t.Allow()
} }
} }

View File

@@ -2,7 +2,6 @@ package server
import ( import (
"github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/system"
) )
// Defines all of the possible output events for a server. // Defines all of the possible output events for a server.
@@ -21,7 +20,7 @@ const (
TransferStatusEvent = "transfer status" TransferStatusEvent = "transfer status"
) )
// Events returns the server's emitter instance. // Returns the server's emitter instance.
func (s *Server) Events() *events.Bus { func (s *Server) Events() *events.Bus {
s.emitterLock.Lock() s.emitterLock.Lock()
defer s.emitterLock.Unlock() defer s.emitterLock.Unlock()
@@ -32,24 +31,3 @@ func (s *Server) Events() *events.Bus {
return s.emitter return s.emitter
} }
// Sink returns the instantiated and named sink for a server. If the sink has
// not been configured yet this function will cause a panic condition.
func (s *Server) Sink(name system.SinkName) *system.SinkPool {
sink, ok := s.sinks[name]
if !ok {
s.Log().Fatalf("attempt to access nil sink: %s", name)
}
return sink
}
// DestroyAllSinks iterates over all of the sinks configured for the server and
// destroys their instances. Note that this will cause a panic if you attempt
// to call Server.Sink() again after. This function is only used when a server
// is being deleted from the system.
func (s *Server) DestroyAllSinks() {
s.Log().Info("destroying all registered sinks for server instance")
for _, sink := range s.sinks {
sink.Destroy()
}
}

View File

@@ -130,7 +130,7 @@ func (a *Archive) withFilesCallback(tw *tar.Writer) func(path string, de *godirw
for _, f := range a.Files { for _, f := range a.Files {
// If the given doesn't match, or doesn't have the same prefix continue // If the given doesn't match, or doesn't have the same prefix continue
// to the next item in the loop. // to the next item in the loop.
if p != f && !strings.HasPrefix(strings.TrimSuffix(p, "/")+"/", f) { if p != f && !strings.HasPrefix(p, f) {
continue continue
} }

View File

@@ -5,12 +5,9 @@ import (
"archive/zip" "archive/zip"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
gzip2 "github.com/klauspost/compress/gzip"
zip2 "github.com/klauspost/compress/zip"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"reflect"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -175,26 +172,13 @@ func ExtractNameFromArchive(f archiver.File) string {
return f.Name() return f.Name()
} }
switch s := sys.(type) { switch s := sys.(type) {
case *zip.FileHeader:
return s.Name
case *zip2.FileHeader:
return s.Name
case *tar.Header: case *tar.Header:
return s.Name return s.Name
case *gzip.Header: case *gzip.Header:
return s.Name return s.Name
case *gzip2.Header: case *zip.FileHeader:
return s.Name return s.Name
default: default:
// At this point we cannot figure out what type of archive this might be so
// just try to find the name field in the struct. If it is found return it.
field := reflect.Indirect(reflect.ValueOf(sys)).FieldByName("Name")
if field.IsValid() {
return field.String()
}
// Fallback to the basename of the file at this point. There is nothing we can really
// do to try and figure out what the underlying directory of the file is supposed to
// be since it didn't implement a name field.
return f.Name() return f.Name()
} }
} }

View File

@@ -115,6 +115,19 @@ func (fs *Filesystem) Touch(p string, flag int) (*os.File, error) {
return f, nil return f, nil
} }
// Reads a file on the system and returns it as a byte representation in a file
// reader. This is not the most memory efficient usage since it will be reading the
// entirety of the file into memory.
func (fs *Filesystem) Readfile(p string, w io.Writer) error {
file, _, err := fs.File(p)
if err != nil {
return err
}
defer file.Close()
_, err = bufio.NewReader(file).WriteTo(w)
return err
}
// Writefile writes a file to the system. If the file does not already exist one // Writefile writes a file to the system. If the file does not already exist one
// will be created. This will also properly recalculate the disk space used by // will be created. This will also properly recalculate the disk space used by
// the server when writing new files or modifying existing ones. // the server when writing new files or modifying existing ones.
@@ -171,16 +184,16 @@ func (fs *Filesystem) CreateDirectory(name string, p string) error {
return os.MkdirAll(cleaned, 0o755) return os.MkdirAll(cleaned, 0o755)
} }
// Rename moves (or renames) a file or directory. // Moves (or renames) a file or directory.
func (fs *Filesystem) Rename(from string, to string) error { func (fs *Filesystem) Rename(from string, to string) error {
cleanedFrom, err := fs.SafePath(from) cleanedFrom, err := fs.SafePath(from)
if err != nil { if err != nil {
return errors.WithStack(err) return err
} }
cleanedTo, err := fs.SafePath(to) cleanedTo, err := fs.SafePath(to)
if err != nil { if err != nil {
return errors.WithStack(err) return err
} }
// If the target file or directory already exists the rename function will fail, so just // If the target file or directory already exists the rename function will fail, so just
@@ -202,10 +215,7 @@ func (fs *Filesystem) Rename(from string, to string) error {
} }
} }
if err := os.Rename(cleanedFrom, cleanedTo); err != nil { return os.Rename(cleanedFrom, cleanedTo)
return errors.WithStack(err)
}
return nil
} }
// Recursively iterates over a file or directory and sets the permissions on all of the // Recursively iterates over a file or directory and sets the permissions on all of the
@@ -482,11 +492,7 @@ func (fs *Filesystem) ListDirectory(p string) ([]Stat, error) {
cleanedp, _ = fs.SafePath(filepath.Join(cleaned, f.Name())) cleanedp, _ = fs.SafePath(filepath.Join(cleaned, f.Name()))
} }
// Don't try to detect the type on a pipe — this will just hang the application and if cleanedp != "" {
// you'll never get a response back.
//
// @see https://github.com/pterodactyl/panel/issues/4059
if cleanedp != "" && f.Mode()&os.ModeNamedPipe == 0 {
m, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name())) m, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name()))
} else { } else {
// Just pass this for an unknown type because the file could not safely be resolved within // Just pass this for an unknown type because the file could not safely be resolved within

View File

@@ -1,7 +1,6 @@
package filesystem package filesystem
import ( import (
"bufio"
"bytes" "bytes"
"errors" "errors"
"math/rand" "math/rand"
@@ -45,14 +44,6 @@ type rootFs struct {
root string root string
} }
func getFileContent(file *os.File) string {
var w bytes.Buffer
if _, err := bufio.NewReader(file).WriteTo(&w); err != nil {
panic(err)
}
return w.String()
}
func (rfs *rootFs) CreateServerFile(p string, c []byte) error { func (rfs *rootFs) CreateServerFile(p string, c []byte) error {
f, err := os.Create(filepath.Join(rfs.root, "/server", p)) f, err := os.Create(filepath.Join(rfs.root, "/server", p))
@@ -84,6 +75,54 @@ func (rfs *rootFs) reset() {
} }
} }
func TestFilesystem_Readfile(t *testing.T) {
g := Goblin(t)
fs, rfs := NewFs()
g.Describe("Readfile", func() {
buf := &bytes.Buffer{}
g.It("opens a file if it exists on the system", func() {
err := rfs.CreateServerFileFromString("test.txt", "testing")
g.Assert(err).IsNil()
err = fs.Readfile("test.txt", buf)
g.Assert(err).IsNil()
g.Assert(buf.String()).Equal("testing")
})
g.It("returns an error if the file does not exist", func() {
err := fs.Readfile("test.txt", buf)
g.Assert(err).IsNotNil()
g.Assert(errors.Is(err, os.ErrNotExist)).IsTrue()
})
g.It("returns an error if the \"file\" is a directory", func() {
err := os.Mkdir(filepath.Join(rfs.root, "/server/test.txt"), 0o755)
g.Assert(err).IsNil()
err = fs.Readfile("test.txt", buf)
g.Assert(err).IsNotNil()
g.Assert(IsErrorCode(err, ErrCodeIsDirectory)).IsTrue()
})
g.It("cannot open a file outside the root directory", func() {
err := rfs.CreateServerFileFromString("/../test.txt", "testing")
g.Assert(err).IsNil()
err = fs.Readfile("/../test.txt", buf)
g.Assert(err).IsNotNil()
g.Assert(IsErrorCode(err, ErrCodePathResolution)).IsTrue()
})
g.AfterEach(func() {
buf.Truncate(0)
atomic.StoreInt64(&fs.diskUsed, 0)
rfs.reset()
})
})
}
func TestFilesystem_Writefile(t *testing.T) { func TestFilesystem_Writefile(t *testing.T) {
g := Goblin(t) g := Goblin(t)
fs, rfs := NewFs() fs, rfs := NewFs()
@@ -101,10 +140,9 @@ func TestFilesystem_Writefile(t *testing.T) {
err := fs.Writefile("test.txt", r) err := fs.Writefile("test.txt", r)
g.Assert(err).IsNil() g.Assert(err).IsNil()
f, _, err := fs.File("test.txt") err = fs.Readfile("test.txt", buf)
g.Assert(err).IsNil() g.Assert(err).IsNil()
defer f.Close() g.Assert(buf.String()).Equal("test file content")
g.Assert(getFileContent(f)).Equal("test file content")
g.Assert(atomic.LoadInt64(&fs.diskUsed)).Equal(r.Size()) g.Assert(atomic.LoadInt64(&fs.diskUsed)).Equal(r.Size())
}) })
@@ -114,10 +152,9 @@ func TestFilesystem_Writefile(t *testing.T) {
err := fs.Writefile("/some/nested/test.txt", r) err := fs.Writefile("/some/nested/test.txt", r)
g.Assert(err).IsNil() g.Assert(err).IsNil()
f, _, err := fs.File("/some/nested/test.txt") err = fs.Readfile("/some/nested/test.txt", buf)
g.Assert(err).IsNil() g.Assert(err).IsNil()
defer f.Close() g.Assert(buf.String()).Equal("test file content")
g.Assert(getFileContent(f)).Equal("test file content")
}) })
g.It("can create a new file inside a nested directory without a trailing slash", func() { g.It("can create a new file inside a nested directory without a trailing slash", func() {
@@ -126,10 +163,9 @@ func TestFilesystem_Writefile(t *testing.T) {
err := fs.Writefile("some/../foo/bar/test.txt", r) err := fs.Writefile("some/../foo/bar/test.txt", r)
g.Assert(err).IsNil() g.Assert(err).IsNil()
f, _, err := fs.File("foo/bar/test.txt") err = fs.Readfile("foo/bar/test.txt", buf)
g.Assert(err).IsNil() g.Assert(err).IsNil()
defer f.Close() g.Assert(buf.String()).Equal("test file content")
g.Assert(getFileContent(f)).Equal("test file content")
}) })
g.It("cannot create a file outside the root directory", func() { g.It("cannot create a file outside the root directory", func() {
@@ -154,6 +190,28 @@ func TestFilesystem_Writefile(t *testing.T) {
g.Assert(IsErrorCode(err, ErrCodeDiskSpace)).IsTrue() g.Assert(IsErrorCode(err, ErrCodeDiskSpace)).IsTrue()
}) })
/*g.It("updates the total space used when a file is appended to", func() {
atomic.StoreInt64(&fs.diskUsed, 100)
b := make([]byte, 100)
_, _ = rand.Read(b)
r := bytes.NewReader(b)
err := fs.Writefile("test.txt", r)
g.Assert(err).IsNil()
g.Assert(atomic.LoadInt64(&fs.diskUsed)).Equal(int64(200))
// If we write less data than already exists, we should expect the total
// disk used to be decremented.
b = make([]byte, 50)
_, _ = rand.Read(b)
r = bytes.NewReader(b)
err = fs.Writefile("test.txt", r)
g.Assert(err).IsNil()
g.Assert(atomic.LoadInt64(&fs.diskUsed)).Equal(int64(150))
})*/
g.It("truncates the file when writing new contents", func() { g.It("truncates the file when writing new contents", func() {
r := bytes.NewReader([]byte("original data")) r := bytes.NewReader([]byte("original data"))
err := fs.Writefile("test.txt", r) err := fs.Writefile("test.txt", r)
@@ -163,10 +221,9 @@ func TestFilesystem_Writefile(t *testing.T) {
err = fs.Writefile("test.txt", r) err = fs.Writefile("test.txt", r)
g.Assert(err).IsNil() g.Assert(err).IsNil()
f, _, err := fs.File("test.txt") err = fs.Readfile("test.txt", buf)
g.Assert(err).IsNil() g.Assert(err).IsNil()
defer f.Close() g.Assert(buf.String()).Equal("new data")
g.Assert(getFileContent(f)).Equal("new data")
}) })
g.AfterEach(func() { g.AfterEach(func() {

View File

@@ -119,6 +119,16 @@ func TestFilesystem_Blocks_Symlinks(t *testing.T) {
panic(err) panic(err)
} }
g.Describe("Readfile", func() {
g.It("cannot read a file symlinked outside the root", func() {
b := bytes.Buffer{}
err := fs.Readfile("symlinked.txt", &b)
g.Assert(err).IsNotNil()
g.Assert(IsErrorCode(err, ErrCodePathResolution)).IsTrue()
})
})
g.Describe("Writefile", func() { g.Describe("Writefile", func() {
g.It("cannot write to a file symlinked outside the root", func() { g.It("cannot write to a file symlinked outside the root", func() {
r := bytes.NewReader([]byte("testing")) r := bytes.NewReader([]byte("testing"))

View File

@@ -10,7 +10,6 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
@@ -18,23 +17,23 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/remote" "github.com/pterodactyl/wings/remote"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
) )
// Install executes the installation stack for a server process. Bubbles any // Executes the installation stack for a server process. Bubbles any errors up to the calling
// errors up to the calling function which should handle contacting the panel to // function which should handle contacting the panel to notify it of the server state.
// notify it of the server state.
// //
// Pass true as the first argument in order to execute a server sync before the // Pass true as the first argument in order to execute a server sync before the process to
// process to ensure the latest information is used. // ensure the latest information is used.
func (s *Server) Install(sync bool) error { func (s *Server) Install(sync bool) error {
if sync { if sync {
s.Log().Info("syncing server state with remote source before executing installation process") s.Log().Info("syncing server state with remote source before executing installation process")
if err := s.Sync(); err != nil { if err := s.Sync(); err != nil {
return errors.WrapIf(err, "install: failed to sync server state with Panel") return err
} }
} }
@@ -58,7 +57,7 @@ func (s *Server) Install(sync bool) error {
// error to this log entry. Otherwise ignore it in this log since whatever is calling // error to this log entry. Otherwise ignore it in this log since whatever is calling
// this function should handle the error and will end up logging the same one. // this function should handle the error and will end up logging the same one.
if err == nil { if err == nil {
l.WithField("error", err) l.WithField("error", serr)
} }
l.Warn("failed to notify panel of server install state") l.Warn("failed to notify panel of server install state")
@@ -72,7 +71,7 @@ func (s *Server) Install(sync bool) error {
// the install is completed. // the install is completed.
s.Events().Publish(InstallCompletedEvent, "") s.Events().Publish(InstallCompletedEvent, "")
return errors.WithStackIf(err) return err
} }
// Reinstalls a server's software by utilizing the install script for the server egg. This // Reinstalls a server's software by utilizing the install script for the server egg. This
@@ -80,8 +79,8 @@ func (s *Server) Install(sync bool) error {
func (s *Server) Reinstall() error { func (s *Server) Reinstall() error {
if s.Environment.State() != environment.ProcessOfflineState { if s.Environment.State() != environment.ProcessOfflineState {
s.Log().Debug("waiting for server instance to enter a stopped state") s.Log().Debug("waiting for server instance to enter a stopped state")
if err := s.Environment.WaitForStop(s.Context(), time.Second*10, true); err != nil { if err := s.Environment.WaitForStop(10, true); err != nil {
return errors.WrapIf(err, "install: failed to stop running environment") return err
} }
} }
@@ -111,7 +110,9 @@ func (s *Server) internalInstall() error {
type InstallationProcess struct { type InstallationProcess struct {
Server *Server Server *Server
Script *remote.InstallationScript Script *remote.InstallationScript
client *client.Client
client *client.Client
context context.Context
} }
// Generates a new installation process struct that will be used to create containers, // Generates a new installation process struct that will be used to create containers,
@@ -126,6 +127,7 @@ func NewInstallationProcess(s *Server, script *remote.InstallationScript) (*Inst
return nil, err return nil, err
} else { } else {
proc.client = c proc.client = c
proc.context = s.Context()
} }
return proc, nil return proc, nil
@@ -155,7 +157,7 @@ func (s *Server) SetRestoring(state bool) {
// Removes the installer container for the server. // Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() error { func (ip *InstallationProcess) RemoveContainer() error {
err := ip.client.ContainerRemove(ip.Server.Context(), ip.Server.ID()+"_installer", types.ContainerRemoveOptions{ err := ip.client.ContainerRemove(ip.context, ip.Server.ID()+"_installer", types.ContainerRemoveOptions{
RemoveVolumes: true, RemoveVolumes: true,
Force: true, Force: true,
}) })
@@ -165,10 +167,11 @@ func (ip *InstallationProcess) RemoveContainer() error {
return nil return nil
} }
// Run runs the installation process, this is done as in a background thread. // Runs the installation process, this is done as in a background thread. This will configure
// This will configure the required environment, and then spin up the // the required environment, and then spin up the installation container.
// installation container. Once the container finishes installing the results //
// are stored in an installation log in the server's configuration directory. // Once the container finishes installing the results will be stored in an installation
// log in the server's configuration directory.
func (ip *InstallationProcess) Run() error { func (ip *InstallationProcess) Run() error {
ip.Server.Log().Debug("acquiring installation process lock") ip.Server.Log().Debug("acquiring installation process lock")
if !ip.Server.installing.SwapIf(true) { if !ip.Server.installing.SwapIf(true) {
@@ -204,7 +207,7 @@ func (ip *InstallationProcess) Run() error {
// Returns the location of the temporary data for the installation process. // Returns the location of the temporary data for the installation process.
func (ip *InstallationProcess) tempDir() string { func (ip *InstallationProcess) tempDir() string {
return filepath.Join(config.Get().System.TmpDirectory, ip.Server.ID()) return filepath.Join(os.TempDir(), "pterodactyl/", ip.Server.ID())
} }
// Writes the installation script to a temporary file on the host machine so that it // Writes the installation script to a temporary file on the host machine so that it
@@ -264,9 +267,9 @@ func (ip *InstallationProcess) pullInstallationImage() error {
imagePullOptions.RegistryAuth = b64 imagePullOptions.RegistryAuth = b64
} }
r, err := ip.client.ImagePull(ip.Server.Context(), ip.Script.ContainerImage, imagePullOptions) r, err := ip.client.ImagePull(context.Background(), ip.Script.ContainerImage, imagePullOptions)
if err != nil { if err != nil {
images, ierr := ip.client.ImageList(ip.Server.Context(), types.ImageListOptions{}) images, ierr := ip.client.ImageList(context.Background(), types.ImageListOptions{})
if ierr != nil { if ierr != nil {
// Well damn, something has gone really wrong here, just go ahead and abort there // Well damn, something has gone really wrong here, just go ahead and abort there
// isn't much anything we can do to try and self-recover from this. // isn't much anything we can do to try and self-recover from this.
@@ -309,10 +312,9 @@ func (ip *InstallationProcess) pullInstallationImage() error {
return nil return nil
} }
// BeforeExecute runs before the container is executed. This pulls down the // Runs before the container is executed. This pulls down the required docker container image
// required docker container image as well as writes the installation script to // as well as writes the installation script to the disk. This process is executed in an async
// the disk. This process is executed in an async manner, if either one fails // manner, if either one fails the error is returned.
// the error is returned.
func (ip *InstallationProcess) BeforeExecute() error { func (ip *InstallationProcess) BeforeExecute() error {
if err := ip.writeScriptToDisk(); err != nil { if err := ip.writeScriptToDisk(); err != nil {
return errors.WithMessage(err, "failed to write installation script to disk") return errors.WithMessage(err, "failed to write installation script to disk")
@@ -338,7 +340,7 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
defer ip.RemoveContainer() defer ip.RemoveContainer()
ip.Server.Log().WithField("container_id", containerId).Debug("pulling installation logs for server") ip.Server.Log().WithField("container_id", containerId).Debug("pulling installation logs for server")
reader, err := ip.client.ContainerLogs(ip.Server.Context(), containerId, types.ContainerLogsOptions{ reader, err := ip.client.ContainerLogs(ip.context, containerId, types.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
ShowStderr: true, ShowStderr: true,
Follow: false, Follow: false,
@@ -393,13 +395,12 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
return nil return nil
} }
// Execute executes the installation process inside a specially created docker // Executes the installation process inside a specially created docker container.
// container.
func (ip *InstallationProcess) Execute() (string, error) { func (ip *InstallationProcess) Execute() (string, error) {
// Create a child context that is canceled once this function is done running. This // Create a child context that is canceled once this function is done running. This
// will also be canceled if the parent context (from the Server struct) is canceled // will also be canceled if the parent context (from the Server struct) is canceled
// which occurs if the server is deleted. // which occurs if the server is deleted.
ctx, cancel := context.WithCancel(ip.Server.Context()) ctx, cancel := context.WithCancel(ip.context)
defer cancel() defer cancel()
conf := &container.Config{ conf := &container.Config{
@@ -510,15 +511,18 @@ func (ip *InstallationProcess) Execute() (string, error) {
// the server configuration directory, as well as to a websocket listener so // the server configuration directory, as well as to a websocket listener so
// that the process can be viewed in the panel by administrators. // that the process can be viewed in the panel by administrators.
func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error { func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error {
opts := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true} reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{
reader, err := ip.client.ContainerLogs(ctx, id, opts) ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil { if err != nil {
return err return err
} }
defer reader.Close() defer reader.Close()
err = system.ScanReader(reader, ip.Server.Sink(system.InstallSink).Push) err = system.ScanReader(reader, ip.Server.Sink(InstallSink).Push)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil {
ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines")
} }
return nil return nil

View File

@@ -5,13 +5,11 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"sync" "sync"
"time"
"github.com/apex/log" "github.com/apex/log"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/system"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/remote" "github.com/pterodactyl/wings/remote"
) )
@@ -46,7 +44,7 @@ func (dsl *diskSpaceLimiter) Reset() {
func (dsl *diskSpaceLimiter) Trigger() { func (dsl *diskSpaceLimiter) Trigger() {
dsl.o.Do(func() { dsl.o.Do(func() {
dsl.server.PublishConsoleOutputFromDaemon("Server is exceeding the assigned disk space limit, stopping process now.") dsl.server.PublishConsoleOutputFromDaemon("Server is exceeding the assigned disk space limit, stopping process now.")
if err := dsl.server.Environment.WaitForStop(dsl.server.Context(), time.Minute, true); err != nil { if err := dsl.server.Environment.WaitForStop(60, true); err != nil {
dsl.server.Log().WithField("error", err).Error("failed to stop server after exceeding space limit!") dsl.server.Log().WithField("error", err).Error("failed to stop server after exceeding space limit!")
} }
}) })
@@ -74,57 +72,47 @@ func (s *Server) processConsoleOutputEvent(v []byte) {
return return
} }
s.Sink(system.LogSink).Push(v) s.Sink(LogSink).Push(v)
} }
// StartEventListeners adds all the internal event listeners we want to use for // StartEventListeners adds all the internal event listeners we want to use for
// a server. These listeners can only be removed by deleting the server as they // a server. These listeners can only be removed by deleting the server as they
// should last for the duration of the process' lifetime. // should last for the duration of the process' lifetime.
func (s *Server) StartEventListeners() { func (s *Server) StartEventListeners() {
c := make(chan []byte, 8) state := make(chan events.Event)
limit := newDiskLimiter(s) stats := make(chan events.Event)
docker := make(chan events.Event)
s.Log().Debug("registering event listeners: console, state, resources...")
s.Environment.Events().On(c)
s.Environment.SetLogCallback(s.processConsoleOutputEvent)
go func() { go func() {
l := newDiskLimiter(s)
for { for {
select { select {
case v := <-c: case e := <-state:
go func(v []byte, limit *diskSpaceLimiter) { go func() {
var e events.Event // Reset the throttler when the process is started.
if err := events.DecodeTo(v, &e); err != nil { if e.Data == environment.ProcessStartingState {
return l.Reset()
s.Throttler().Reset()
} }
s.OnStateChange()
}()
case e := <-stats:
go func() {
s.resources.UpdateStats(e.Data.(environment.Stats))
// If there is no disk space available at this point, trigger the server
// disk limiter logic which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
l.Trigger()
}
s.Events().Publish(StatsEvent, s.Proc())
}()
case e := <-docker:
go func() {
switch e.Topic { switch e.Topic {
case environment.ResourceEvent:
{
var stats struct {
Topic string
Data environment.Stats
}
if err := events.DecodeTo(v, &stats); err != nil {
s.Log().WithField("error", err).Warn("failed to decode server resource event")
return
}
s.resources.UpdateStats(stats.Data)
// If there is no disk space available at this point, trigger the server
// disk limiter logic which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
limit.Trigger()
}
s.Events().Publish(StatsEvent, s.Proc())
}
case environment.StateChangeEvent:
{
// Reset the throttler when the process is started.
if e.Data == environment.ProcessStartingState {
limit.Reset()
s.Throttler().Reset()
}
s.OnStateChange()
}
case environment.DockerImagePullStatus: case environment.DockerImagePullStatus:
s.Events().Publish(InstallOutputEvent, e.Data) s.Events().Publish(InstallOutputEvent, e.Data)
case environment.DockerImagePullStarted: case environment.DockerImagePullStarted:
@@ -132,13 +120,18 @@ func (s *Server) StartEventListeners() {
case environment.DockerImagePullCompleted: case environment.DockerImagePullCompleted:
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image") s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
default: default:
s.Log().WithField("topic", e.Topic).Error("unhandled docker event topic")
} }
}(v, limit) }()
case <-s.Context().Done():
return
} }
} }
}() }()
s.Log().Debug("registering event listeners: console, state, resources...")
s.Environment.SetLogCallback(s.processConsoleOutputEvent)
s.Environment.Events().On(state, environment.StateChangeEvent)
s.Environment.Events().On(stats, environment.ResourceEvent)
s.Environment.Events().On(docker, dockerEvents...)
} }
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")

View File

@@ -133,11 +133,11 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
return s.Environment.Start(s.Context()) return s.Environment.Start(s.Context())
case PowerActionStop: case PowerActionStop:
fallthrough // We're specifically waiting for the process to be stopped here, otherwise the lock is released
// too soon, and you can rack up all sorts of issues.
return s.Environment.WaitForStop(10*60, true)
case PowerActionRestart: case PowerActionRestart:
// We're specifically waiting for the process to be stopped here, otherwise the lock is if err := s.Environment.WaitForStop(10*60, true); err != nil {
// released too soon, and you can rack up all sorts of issues.
if err := s.Environment.WaitForStop(s.Context(), time.Minute*10, true); err != nil {
// Even timeout errors should be bubbled back up the stack. If the process didn't stop // Even timeout errors should be bubbled back up the stack. If the process didn't stop
// nicely, but the terminate argument was passed then the server is stopped without an // nicely, but the terminate argument was passed then the server is stopped without an
// error being returned. // error being returned.
@@ -149,10 +149,6 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
return err return err
} }
if action == PowerActionStop {
return nil
}
// Now actually try to start the process by executing the normal pre-boot logic. // Now actually try to start the process by executing the normal pre-boot logic.
if err := s.onBeforeStart(); err != nil { if err := s.onBeforeStart(); err != nil {
return err return err
@@ -160,7 +156,7 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
return s.Environment.Start(s.Context()) return s.Environment.Start(s.Context())
case PowerActionTerminate: case PowerActionTerminate:
return s.Environment.Terminate(s.Context(), os.Kill) return s.Environment.Terminate(os.Kill)
} }
return errors.New("attempting to handle unknown power action") return errors.New("attempting to handle unknown power action")
@@ -201,19 +197,15 @@ func (s *Server) onBeforeStart() error {
// we don't need to actively do anything about it at this point, worse comes to worst the // we don't need to actively do anything about it at this point, worse comes to worst the
// server starts in a weird state and the user can manually adjust. // server starts in a weird state and the user can manually adjust.
s.PublishConsoleOutputFromDaemon("Updating process configuration files...") s.PublishConsoleOutputFromDaemon("Updating process configuration files...")
s.Log().Debug("updating server configuration files...")
s.UpdateConfigurationFiles() s.UpdateConfigurationFiles()
s.Log().Debug("updated server configuration files")
if config.Get().System.CheckPermissionsOnBoot { if config.Get().System.CheckPermissionsOnBoot {
s.PublishConsoleOutputFromDaemon("Ensuring file permissions are set correctly, this could take a few seconds...") s.PublishConsoleOutputFromDaemon("Ensuring file permissions are set correctly, this could take a few seconds...")
// Ensure all the server file permissions are set correctly before booting the process. // Ensure all the server file permissions are set correctly before booting the process.
s.Log().Debug("chowning server root directory...")
if err := s.Filesystem().Chown("/"); err != nil { if err := s.Filesystem().Chown("/"); err != nil {
return errors.WithMessage(err, "failed to chown root server directory during pre-boot process") return errors.WithMessage(err, "failed to chown root server directory during pre-boot process")
} }
} }
s.Log().Info("completed server preflight, starting boot process...")
return nil return nil
} }

View File

@@ -70,10 +70,10 @@ type Server struct {
wsBag *WebsocketBag wsBag *WebsocketBag
wsBagLocker sync.Mutex wsBagLocker sync.Mutex
sinks map[system.SinkName]*system.SinkPool sinks map[SinkName]*sinkPool
logSink *system.SinkPool logSink *sinkPool
installSink *system.SinkPool installSink *sinkPool
} }
// New returns a new server instance with a context and all of the default // New returns a new server instance with a context and all of the default
@@ -88,9 +88,9 @@ func New(client remote.Client) (*Server, error) {
transferring: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false),
restoring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false),
powerLock: system.NewLocker(), powerLock: system.NewLocker(),
sinks: map[system.SinkName]*system.SinkPool{ sinks: map[SinkName]*sinkPool{
system.LogSink: system.NewSinkPool(), LogSink: newSinkPool(),
system.InstallSink: system.NewSinkPool(), InstallSink: newSinkPool(),
}, },
} }
if err := defaults.Set(&s); err != nil { if err := defaults.Set(&s); err != nil {

View File

@@ -1,4 +1,4 @@
package system package server
import ( import (
"sync" "sync"
@@ -16,20 +16,20 @@ const (
InstallSink SinkName = "install" InstallSink SinkName = "install"
) )
// SinkPool represents a pool with sinks. // sinkPool represents a pool with sinks.
type SinkPool struct { type sinkPool struct {
mu sync.RWMutex mu sync.RWMutex
sinks []chan []byte sinks []chan []byte
} }
// NewSinkPool returns a new empty SinkPool. A sink pool generally lives with a // newSinkPool returns a new empty sinkPool. A sink pool generally lives with a
// server instance for it's full lifetime. // server instance for it's full lifetime.
func NewSinkPool() *SinkPool { func newSinkPool() *sinkPool {
return &SinkPool{} return &sinkPool{}
} }
// On adds a channel to the sink pool instance. // On adds a channel to the sink pool instance.
func (p *SinkPool) On(c chan []byte) { func (p *sinkPool) On(c chan []byte) {
p.mu.Lock() p.mu.Lock()
p.sinks = append(p.sinks, c) p.sinks = append(p.sinks, c)
p.mu.Unlock() p.mu.Unlock()
@@ -37,7 +37,7 @@ func (p *SinkPool) On(c chan []byte) {
// Off removes a given channel from the sink pool. If no matching sink is found // Off removes a given channel from the sink pool. If no matching sink is found
// this function is a no-op. If a matching channel is found, it will be removed. // this function is a no-op. If a matching channel is found, it will be removed.
func (p *SinkPool) Off(c chan []byte) { func (p *sinkPool) Off(c chan []byte) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@@ -66,7 +66,7 @@ func (p *SinkPool) Off(c chan []byte) {
// Destroy destroys the pool by removing and closing all sinks and destroying // Destroy destroys the pool by removing and closing all sinks and destroying
// all of the channels that are present. // all of the channels that are present.
func (p *SinkPool) Destroy() { func (p *sinkPool) Destroy() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@@ -95,7 +95,7 @@ func (p *SinkPool) Destroy() {
// likely the best option anyways. This uses waitgroups to allow every channel // likely the best option anyways. This uses waitgroups to allow every channel
// to attempt its send concurrently thus making the total blocking time of this // to attempt its send concurrently thus making the total blocking time of this
// function "O(1)" instead of "O(n)". // function "O(1)" instead of "O(n)".
func (p *SinkPool) Push(data []byte) { func (p *sinkPool) Push(data []byte) {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -119,3 +119,24 @@ func (p *SinkPool) Push(data []byte) {
} }
wg.Wait() wg.Wait()
} }
// Sink returns the instantiated and named sink for a server. If the sink has
// not been configured yet this function will cause a panic condition.
func (s *Server) Sink(name SinkName) *sinkPool {
sink, ok := s.sinks[name]
if !ok {
s.Log().Fatalf("attempt to access nil sink: %s", name)
}
return sink
}
// DestroyAllSinks iterates over all of the sinks configured for the server and
// destroys their instances. Note that this will cause a panic if you attempt
// to call Server.Sink() again after. This function is only used when a server
// is being deleted from the system.
func (s *Server) DestroyAllSinks() {
s.Log().Info("destroying all registered sinks for server instance")
for _, sink := range s.sinks {
sink.Destroy()
}
}

View File

@@ -1,4 +1,4 @@
package system package server
import ( import (
"fmt" "fmt"
@@ -23,7 +23,7 @@ func TestSink(t *testing.T) {
g.Describe("SinkPool#On", func() { g.Describe("SinkPool#On", func() {
g.It("pushes additional channels to a sink", func() { g.It("pushes additional channels to a sink", func() {
pool := &SinkPool{} pool := &sinkPool{}
g.Assert(pool.sinks).IsZero() g.Assert(pool.sinks).IsZero()
@@ -36,9 +36,9 @@ func TestSink(t *testing.T) {
}) })
g.Describe("SinkPool#Off", func() { g.Describe("SinkPool#Off", func() {
var pool *SinkPool var pool *sinkPool
g.BeforeEach(func() { g.BeforeEach(func() {
pool = &SinkPool{} pool = &sinkPool{}
}) })
g.It("works when no sinks are registered", func() { g.It("works when no sinks are registered", func() {
@@ -97,9 +97,9 @@ func TestSink(t *testing.T) {
}) })
g.Describe("SinkPool#Push", func() { g.Describe("SinkPool#Push", func() {
var pool *SinkPool var pool *sinkPool
g.BeforeEach(func() { g.BeforeEach(func() {
pool = &SinkPool{} pool = &sinkPool{}
}) })
g.It("works when no sinks are registered", func() { g.It("works when no sinks are registered", func() {
@@ -190,9 +190,9 @@ func TestSink(t *testing.T) {
}) })
g.Describe("SinkPool#Destroy", func() { g.Describe("SinkPool#Destroy", func() {
var pool *SinkPool var pool *sinkPool
g.BeforeEach(func() { g.BeforeEach(func() {
pool = &SinkPool{} pool = &sinkPool{}
}) })
g.It("works if no sinks are registered", func() { g.It("works if no sinks are registered", func() {

View File

@@ -1,8 +1,6 @@
package server package server
import ( import (
"time"
"github.com/pterodactyl/wings/environment/docker" "github.com/pterodactyl/wings/environment/docker"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
@@ -60,7 +58,7 @@ func (s *Server) SyncWithEnvironment() {
s.Log().Info("server suspended with running process state, terminating now") s.Log().Info("server suspended with running process state, terminating now")
go func(s *Server) { go func(s *Server) {
if err := s.Environment.WaitForStop(s.Context(), time.Minute, true); err != nil { if err := s.Environment.WaitForStop(60, true); err != nil {
s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension") s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension")
} }
}(s) }(s)

View File

@@ -288,10 +288,14 @@ func (h *Handler) can(permission string) bool {
return false return false
} }
// SFTPServer owners and super admins have their permissions returned as '[*]' via the Panel
// API, so for the sake of speed do an initial check for that before iterating over the
// entire array of permissions.
if len(h.permissions) == 1 && h.permissions[0] == "*" {
return true
}
for _, p := range h.permissions { for _, p := range h.permissions {
// If we match the permission specifically, or the user has been granted the "*" if p == permission {
// permission because they're an admin, let them through.
if p == permission || p == "*" {
return true return true
} }
} }

View File

@@ -68,14 +68,9 @@ func (c *SFTPServer) Run() error {
} }
conf := &ssh.ServerConfig{ conf := &ssh.ServerConfig{
NoClientAuth: false, NoClientAuth: false,
MaxAuthTries: 6, MaxAuthTries: 6,
PasswordCallback: func(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) { PasswordCallback: c.passwordCallback,
return c.makeCredentialsRequest(conn, remote.SftpAuthPassword, string(password))
},
PublicKeyCallback: func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
return c.makeCredentialsRequest(conn, remote.SftpAuthPublicKey, string(ssh.MarshalAuthorizedKey(key)))
},
} }
conf.AddHostKey(private) conf.AddHostKey(private)
@@ -182,17 +177,17 @@ func (c *SFTPServer) generateED25519PrivateKey() error {
return nil return nil
} }
func (c *SFTPServer) makeCredentialsRequest(conn ssh.ConnMetadata, t remote.SftpAuthRequestType, p string) (*ssh.Permissions, error) { // A function capable of validating user credentials with the Panel API.
func (c *SFTPServer) passwordCallback(conn ssh.ConnMetadata, pass []byte) (*ssh.Permissions, error) {
request := remote.SftpAuthRequest{ request := remote.SftpAuthRequest{
Type: t,
User: conn.User(), User: conn.User(),
Pass: p, Pass: string(pass),
IP: conn.RemoteAddr().String(), IP: conn.RemoteAddr().String(),
SessionID: conn.SessionID(), SessionID: conn.SessionID(),
ClientVersion: conn.ClientVersion(), ClientVersion: conn.ClientVersion(),
} }
logger := log.WithFields(log.Fields{"subsystem": "sftp", "method": request.Type, "username": request.User, "ip": request.IP}) logger := log.WithFields(log.Fields{"subsystem": "sftp", "username": conn.User(), "ip": conn.RemoteAddr().String()})
logger.Debug("validating credentials for SFTP connection") logger.Debug("validating credentials for SFTP connection")
if !validUsernameRegexp.MatchString(request.User) { if !validUsernameRegexp.MatchString(request.User) {
@@ -211,7 +206,7 @@ func (c *SFTPServer) makeCredentialsRequest(conn ssh.ConnMetadata, t remote.Sftp
} }
logger.WithField("server", resp.Server).Debug("credentials validated and matched to server instance") logger.WithField("server", resp.Server).Debug("credentials validated and matched to server instance")
permissions := ssh.Permissions{ sshPerm := &ssh.Permissions{
Extensions: map[string]string{ Extensions: map[string]string{
"uuid": resp.Server, "uuid": resp.Server,
"user": conn.User(), "user": conn.User(),
@@ -219,7 +214,7 @@ func (c *SFTPServer) makeCredentialsRequest(conn ssh.ConnMetadata, t remote.Sftp
}, },
} }
return &permissions, nil return sshPerm, nil
} }
// PrivateKeyPath returns the path the host private key for this server instance. // PrivateKeyPath returns the path the host private key for this server instance.

View File

@@ -1,3 +1,3 @@
package system package system
var Version = "develop" var Version = "1.6.0"

View File

@@ -42,6 +42,7 @@ func (l *Locker) Acquire() error {
return nil return nil
} }
// TryAcquire will attempt to acquire a power-lock until the context provided // TryAcquire will attempt to acquire a power-lock until the context provided
// is canceled. // is canceled.
func (l *Locker) TryAcquire(ctx context.Context) error { func (l *Locker) TryAcquire(ctx context.Context) error {
@@ -50,9 +51,7 @@ func (l *Locker) TryAcquire(ctx context.Context) error {
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return err
return ErrLockerLocked
}
} }
return nil return nil
} }

View File

@@ -81,7 +81,7 @@ func TestPower(t *testing.T) {
err := l.TryAcquire(ctx) err := l.TryAcquire(ctx)
g.Assert(err).IsNotNil() g.Assert(err).IsNotNil()
g.Assert(errors.Is(err, ErrLockerLocked)).IsTrue() g.Assert(errors.Is(err, context.DeadlineExceeded)).IsTrue()
g.Assert(cap(l.ch)).Equal(1) g.Assert(cap(l.ch)).Equal(1)
g.Assert(len(l.ch)).Equal(1) g.Assert(len(l.ch)).Equal(1)
g.Assert(l.IsLocked()).IsTrue() g.Assert(l.IsLocked()).IsTrue()
@@ -95,7 +95,7 @@ func TestPower(t *testing.T) {
l.Acquire() l.Acquire()
go func() { go func() {
time.AfterFunc(time.Millisecond*50, func() { time.AfterFunc(time.Millisecond * 50, func() {
l.Release() l.Release()
}) })
}() }()

View File

@@ -44,7 +44,7 @@ func (r *Rate) Try() bool {
// Reset resets the internal state of the rate limiter back to zero. // Reset resets the internal state of the rate limiter back to zero.
func (r *Rate) Reset() { func (r *Rate) Reset() {
r.mu.Lock() r.mu.Lock()
r.count = 0 r.count = 0
r.last = time.Now() r.last = time.Now()
r.mu.Unlock() r.mu.Unlock()
} }

View File

@@ -47,7 +47,7 @@ func TestRate(t *testing.T) {
g.It("resets back to zero when called", func() { g.It("resets back to zero when called", func() {
r := NewRate(10, time.Second) r := NewRate(10, time.Second)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
if i%10 == 0 { if i % 10 == 0 {
r.Reset() r.Reset()
} }
g.Assert(r.Try()).IsTrue() g.Assert(r.Try()).IsTrue()

View File

@@ -3,10 +3,12 @@ package system
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"strconv" "strconv"
"sync" "sync"
"time"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/goccy/go-json" "github.com/goccy/go-json"
@@ -88,16 +90,16 @@ func ScanReader(r io.Reader, callback func(line []byte)) error {
} else { } else {
buf.Write(line) buf.Write(line)
} }
// If we encountered an error with something in ReadLine that was not an
// EOF just abort the entire process here.
if err != nil && err != io.EOF {
return err
}
// Finish this loop and begin outputting the line if there is no prefix // Finish this loop and begin outputting the line if there is no prefix
// (the line fit into the default buffer), or if we hit the end of the line. // (the line fit into the default buffer), or if we hit the end of the line.
if !isPrefix || err == io.EOF { if !isPrefix || err == io.EOF {
break break
} }
// If we encountered an error with something in ReadLine that was not an
// EOF just abort the entire process here.
if err != nil {
return err
}
} }
// Send the full buffer length over to the event handler to be emitted in // Send the full buffer length over to the event handler to be emitted in
@@ -120,6 +122,22 @@ func ScanReader(r io.Reader, callback func(line []byte)) error {
return nil return nil
} }
// Runs a given work function every "d" duration until the provided context is canceled.
func Every(ctx context.Context, d time.Duration, work func(t time.Time)) {
ticker := time.NewTicker(d)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case t := <-ticker.C:
work(t)
}
}
}()
}
func FormatBytes(b int64) string { func FormatBytes(b int64) string {
if b < 1024 { if b < 1024 {
return fmt.Sprintf("%d B", b) return fmt.Sprintf("%d B", b)