diff --git a/config/config_throttles.go b/config/config_throttles.go index bf3cc93..d63bb50 100644 --- a/config/config_throttles.go +++ b/config/config_throttles.go @@ -4,20 +4,24 @@ type ConsoleThrottles struct { // Whether or not the throttler is enabled for this instance. Enabled bool `json:"enabled" yaml:"enabled" default:"true"` - // The total number of throttle activations that must accumulate before a server is - // forcibly stopped for violating these limits. - KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"` - - // The amount of time in milliseconds that a server process must go through without - // triggering an output warning before the throttle activation count begins decreasing. - // This time is measured in milliseconds. - Decay uint64 `json:"decay" yaml:"decay" default:"10000"` - - // The total number of lines that can be output in a given CheckInterval period before + // The total number of lines that can be output in a given LineResetInterval period before // a warning is triggered and counted against the server. Lines uint64 `json:"lines" yaml:"lines" default:"1000"` - // The amount of time that must pass between intervals before the count is reset. This - // value is in milliseconds. - CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"` + // The total number of throttle activations that can accumulate before a server is considered + // to be breaching and will be stopped. This value is decremented by one every DecayInterval. + MaximumTriggerCount uint64 `json:"maximum_trigger_count" yaml:"maximum_trigger_count" default:"5"` + + // The amount of time after which the number of lines processed is reset to 0. This runs in + // a constant loop and is not affected by the current console output volumes. By default, this + // will reset the processed line count back to 0 every 100ms. + LineResetInterval uint64 `json:"line_reset_interval" yaml:"line_reset_interval" default:"100"` + + // The amount of time in milliseconds that must pass without an output warning being triggered + // before a throttle activation is decremented. + DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"` + + // The amount of time that a server is allowed to be stopping for before it is terminated + // forfully if it triggers output throttles. + StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"` } diff --git a/environment/docker/power.go b/environment/docker/power.go index 01ab4a7..20bcaa7 100644 --- a/environment/docker/power.go +++ b/environment/docker/power.go @@ -183,13 +183,21 @@ func (e *Environment) WaitForStop(seconds uint, terminate bool) error { case <-ctx.Done(): if ctxErr := ctx.Err(); ctxErr != nil { if terminate { - return e.Terminate(os.Kill) + log.WithField("container_id", e.Id).Debug("server did not stop in time, executing process termination") + + return errors.WithStack(e.Terminate(os.Kill)) } return errors.WithStack(ctxErr) } case err := <-errChan: if err != nil { + if terminate { + log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while waiting for container stop, attempting process termination") + + return errors.WithStack(e.Terminate(os.Kill)) + } + return errors.WithStack(err) } case <-ok: diff --git a/router/router_server.go b/router/router_server.go index 74b74db..231da0a 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -199,6 +199,7 @@ func deleteServer(c *gin.Context) { // Unsubscribe all of the event listeners. s.Events().Destroy() + s.Throttler().StopTimer() // Destroy the environment; in Docker this will handle a running container and // forcibly terminate it before removing the container, so we do not need to handle diff --git a/server/console.go b/server/console.go index 7268cf0..e6637ea 100644 --- a/server/console.go +++ b/server/console.go @@ -1,60 +1,111 @@ package server import ( + "context" "fmt" "github.com/mitchellh/colorstring" + "github.com/pkg/errors" "github.com/pterodactyl/wings/config" + "github.com/pterodactyl/wings/system" "sync" "sync/atomic" "time" ) +var ErrTooMuchConsoleData = errors.New("console is outputting too much data") + type ConsoleThrottler struct { - sync.RWMutex + mu sync.Mutex config.ConsoleThrottles // The total number of activations that have occurred thus far. activations uint64 + // The total number of lines that have been sent since the last reset timer period. + count uint64 + + // Wether or not the console output is being throttled. It is up to calling code to + // determine what to do if it is. + isThrottled system.AtomicBool + // The total number of lines processed so far during the given time period. - lines uint64 - - lastIntervalTime *time.Time - lastDecayTime *time.Time + timerCancel *context.CancelFunc } -// Increments the number of activations for a server. -func (ct *ConsoleThrottler) AddActivation() uint64 { - ct.Lock() - defer ct.Unlock() - - ct.activations += 1 - - return ct.activations +// Resets the state of the throttler. +func (ct *ConsoleThrottler) Reset() { + atomic.StoreUint64(&ct.count, 0) + atomic.StoreUint64(&ct.activations, 0) + ct.isThrottled.Set(false) } -// Decrements the number of activations for a server. -func (ct *ConsoleThrottler) RemoveActivation() uint64 { - ct.Lock() - defer ct.Unlock() +// Triggers an activation for a server. You can also decrement the number of activations +// by passing a negative number. +func (ct *ConsoleThrottler) markActivation(increment bool) uint64 { + if !increment { + if atomic.LoadUint64(&ct.activations) == 0 { + return 0 + } - if ct.activations == 0 { - return 0 + // This weird dohicky subtracts 1 from the activation count. + return atomic.AddUint64(&ct.activations, ^uint64(0)) } - ct.activations -= 1 - - return ct.activations + return atomic.AddUint64(&ct.activations, 1) } -// Increment the total count of lines that we have processed so far. -func (ct *ConsoleThrottler) IncrementLineCount() uint64 { - return atomic.AddUint64(&ct.lines, 1) +// Determines if the console is currently being throttled. Calls to this function can be used to +// determine if output should be funneled along to the websocket processes. +func (ct *ConsoleThrottler) Throttled() bool { + return ct.isThrottled.Get() } -// Reset the line count to zero. -func (ct *ConsoleThrottler) ResetLineCount() { - atomic.SwapUint64(&ct.lines, 0) +// Starts a timer that runs in a seperate thread and will continually decrement the lines processed +// and number of activations, regardless of the current console message volume. +func (ct *ConsoleThrottler) StartTimer() { + ctx, cancel := context.WithCancel(context.Background()) + + reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond) + decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond) + + go func() { + for { + select { + case <-ctx.Done(): + reset.Stop() + return + case <-reset.C: + ct.isThrottled.Set(false) + atomic.StoreUint64(&ct.count, 0) + } + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + decay.Stop() + return + case <-decay.C: + ct.markActivation(false) + } + } + }() + + ct.timerCancel = &cancel +} + +// Stops a running timer processes if one exists. This is only called when the server is deleted since +// we want this to always be running. If there is no process currently running nothing will really happen. +func (ct *ConsoleThrottler) StopTimer() { + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.timerCancel != nil { + c := *ct.timerCancel + c() + ct.timerCancel = nil + } } // Handles output from a server's console. This code ensures that a server is not outputting @@ -70,30 +121,41 @@ func (ct *ConsoleThrottler) ResetLineCount() { // data all at once. These values are all configurable via the wings configuration file, however the // defaults have been in the wild for almost two years at the time of this writing, so I feel quite // confident in them. -func (ct *ConsoleThrottler) Handle() { +// +// This function returns an error if the server should be stopped due to violating throttle constraints +// and a boolean value indicating if a throttle is being violated when it is checked. +func (ct *ConsoleThrottler) Increment(onTrigger func()) error { + if !ct.Enabled { + return nil + } + // Increment the line count and if we have now output more lines than are allowed, trigger a throttle + // activation. Once the throttle is triggered and has passed the kill at value we will trigger a server + // stop automatically. + if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() { + ct.isThrottled.Set(true) + if ct.markActivation(true) >= ct.MaximumTriggerCount { + return ErrTooMuchConsoleData + } + + onTrigger() + } + + return nil } // Returns the throttler instance for the server or creates a new one. func (s *Server) Throttler() *ConsoleThrottler { - s.throttleLock.RLock() + s.throttleLock.Lock() + defer s.throttleLock.Unlock() if s.throttler == nil { - // Release the read lock so that we can acquire a normal lock on the process and - // make modifications to the throttler. - s.throttleLock.RUnlock() - - s.throttleLock.Lock() s.throttler = &ConsoleThrottler{ ConsoleThrottles: config.Get().Throttles, } - s.throttleLock.Unlock() - - return s.throttler - } else { - defer s.throttleLock.RUnlock() - return s.throttler } + + return s.throttler } // Sends output to the server console formatted to appear correctly as being sent diff --git a/server/listeners.go b/server/listeners.go index bcef8fc..4029de4 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -5,6 +5,7 @@ import ( "github.com/apex/log" "github.com/pkg/errors" "github.com/pterodactyl/wings/api" + "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/events" "regexp" @@ -21,16 +22,50 @@ var dockerEvents = []string{ // removed by deleting the server as they should last for the duration of the process' lifetime. func (s *Server) StartEventListeners() { console := func(e events.Event) { - // Immediately emit this event back over the server event stream since it is - // being called from the environment event stream and things probably aren't - // listening to that event. - s.Events().Publish(ConsoleOutputEvent, e.Data) + t := s.Throttler() + err := t.Increment(func () { + s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.") + }) + + // An error is only returned if the server has breached the thresholds set. + if err != nil { + // If the process is already stopping, just let it continue with that action rather than attempting + // to terminate again. + if s.GetState() != environment.ProcessStoppingState { + s.SetState(environment.ProcessStoppingState) + go func() { + s.Log().Warn("stopping server instance, violating throttle limits") + s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.") + // Completely skip over server power actions and terminate the running instance. This gives the + // server 15 seconds to finish stopping gracefully before it is forcefully terminated. + if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil { + // If there is an error set the process back to running so that this throttler is called + // again and hopefully kills the server. + if s.GetState() != environment.ProcessOfflineState { + s.SetState(environment.ProcessRunningState) + } + + s.Log().WithField("error", errors.WithStack(err)).Error("failed to terminate environment after triggering throttle") + } + }() + } + } + + // If we are not throttled, go ahead and output the data. + if !t.Throttled() { + s.Events().Publish(ConsoleOutputEvent, e.Data) + } // Also pass the data along to the console output channel. s.onConsoleOutput(e.Data) } state := func(e events.Event) { + // Reset the throttler when the process is started. + if e.Data == environment.ProcessStartingState { + s.Throttler().Reset() + } + s.SetState(e.Data) } diff --git a/server/loader.go b/server/loader.go index 75095f8..dec6f79 100644 --- a/server/loader.go +++ b/server/loader.go @@ -111,6 +111,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { } else { s.Environment = env s.StartEventListeners() + s.Throttler().StartTimer() } // Forces the configuration to be synced with the panel. diff --git a/server/server.go b/server/server.go index 2f6996d..a24e185 100644 --- a/server/server.go +++ b/server/server.go @@ -22,7 +22,7 @@ type Server struct { sync.RWMutex emitterLock sync.Mutex powerLock *semaphore.Weighted - throttleLock sync.RWMutex + throttleLock sync.Mutex // Maintains the configuration for the server. This is the data that gets returned by the Panel // such as build settings and container images. diff --git a/system/bool.go b/system/bool.go new file mode 100644 index 0000000..f3cd08e --- /dev/null +++ b/system/bool.go @@ -0,0 +1,20 @@ +package system + +import "sync/atomic" + +type AtomicBool struct { + flag uint32 +} + +func (ab *AtomicBool) Set(v bool) { + i := 0 + if v { + i = 1 + } + + atomic.StoreUint32(&ab.flag, uint32(i)) +} + +func (ab *AtomicBool) Get() bool { + return atomic.LoadUint32(&ab.flag) == 1 +} \ No newline at end of file