Add console throttling; closes pterodactyl/panel#2214 (#60)
This commit is contained in:
parent
ce76b9339e
commit
6ba1b75696
|
@ -4,20 +4,24 @@ type ConsoleThrottles struct {
|
|||
// Whether or not the throttler is enabled for this instance.
|
||||
Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
|
||||
|
||||
// The total number of throttle activations that must accumulate before a server is
|
||||
// forcibly stopped for violating these limits.
|
||||
KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"`
|
||||
|
||||
// The amount of time in milliseconds that a server process must go through without
|
||||
// triggering an output warning before the throttle activation count begins decreasing.
|
||||
// This time is measured in milliseconds.
|
||||
Decay uint64 `json:"decay" yaml:"decay" default:"10000"`
|
||||
|
||||
// The total number of lines that can be output in a given CheckInterval period before
|
||||
// The total number of lines that can be output in a given LineResetInterval period before
|
||||
// a warning is triggered and counted against the server.
|
||||
Lines uint64 `json:"lines" yaml:"lines" default:"1000"`
|
||||
|
||||
// The amount of time that must pass between intervals before the count is reset. This
|
||||
// value is in milliseconds.
|
||||
CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"`
|
||||
// The total number of throttle activations that can accumulate before a server is considered
|
||||
// to be breaching and will be stopped. This value is decremented by one every DecayInterval.
|
||||
MaximumTriggerCount uint64 `json:"maximum_trigger_count" yaml:"maximum_trigger_count" default:"5"`
|
||||
|
||||
// The amount of time after which the number of lines processed is reset to 0. This runs in
|
||||
// a constant loop and is not affected by the current console output volumes. By default, this
|
||||
// will reset the processed line count back to 0 every 100ms.
|
||||
LineResetInterval uint64 `json:"line_reset_interval" yaml:"line_reset_interval" default:"100"`
|
||||
|
||||
// The amount of time in milliseconds that must pass without an output warning being triggered
|
||||
// before a throttle activation is decremented.
|
||||
DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"`
|
||||
|
||||
// The amount of time that a server is allowed to be stopping for before it is terminated
|
||||
// forfully if it triggers output throttles.
|
||||
StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"`
|
||||
}
|
||||
|
|
|
@ -183,13 +183,21 @@ func (e *Environment) WaitForStop(seconds uint, terminate bool) error {
|
|||
case <-ctx.Done():
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
if terminate {
|
||||
return e.Terminate(os.Kill)
|
||||
log.WithField("container_id", e.Id).Debug("server did not stop in time, executing process termination")
|
||||
|
||||
return errors.WithStack(e.Terminate(os.Kill))
|
||||
}
|
||||
|
||||
return errors.WithStack(ctxErr)
|
||||
}
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
if terminate {
|
||||
log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while waiting for container stop, attempting process termination")
|
||||
|
||||
return errors.WithStack(e.Terminate(os.Kill))
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
case <-ok:
|
||||
|
|
|
@ -199,6 +199,7 @@ func deleteServer(c *gin.Context) {
|
|||
|
||||
// Unsubscribe all of the event listeners.
|
||||
s.Events().Destroy()
|
||||
s.Throttler().StopTimer()
|
||||
|
||||
// Destroy the environment; in Docker this will handle a running container and
|
||||
// forcibly terminate it before removing the container, so we do not need to handle
|
||||
|
|
|
@ -1,60 +1,111 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/mitchellh/colorstring"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/system"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrTooMuchConsoleData = errors.New("console is outputting too much data")
|
||||
|
||||
type ConsoleThrottler struct {
|
||||
sync.RWMutex
|
||||
mu sync.Mutex
|
||||
config.ConsoleThrottles
|
||||
|
||||
// The total number of activations that have occurred thus far.
|
||||
activations uint64
|
||||
|
||||
// The total number of lines that have been sent since the last reset timer period.
|
||||
count uint64
|
||||
|
||||
// Wether or not the console output is being throttled. It is up to calling code to
|
||||
// determine what to do if it is.
|
||||
isThrottled system.AtomicBool
|
||||
|
||||
// The total number of lines processed so far during the given time period.
|
||||
lines uint64
|
||||
|
||||
lastIntervalTime *time.Time
|
||||
lastDecayTime *time.Time
|
||||
timerCancel *context.CancelFunc
|
||||
}
|
||||
|
||||
// Increments the number of activations for a server.
|
||||
func (ct *ConsoleThrottler) AddActivation() uint64 {
|
||||
ct.Lock()
|
||||
defer ct.Unlock()
|
||||
|
||||
ct.activations += 1
|
||||
|
||||
return ct.activations
|
||||
// Resets the state of the throttler.
|
||||
func (ct *ConsoleThrottler) Reset() {
|
||||
atomic.StoreUint64(&ct.count, 0)
|
||||
atomic.StoreUint64(&ct.activations, 0)
|
||||
ct.isThrottled.Set(false)
|
||||
}
|
||||
|
||||
// Decrements the number of activations for a server.
|
||||
func (ct *ConsoleThrottler) RemoveActivation() uint64 {
|
||||
ct.Lock()
|
||||
defer ct.Unlock()
|
||||
|
||||
if ct.activations == 0 {
|
||||
// Triggers an activation for a server. You can also decrement the number of activations
|
||||
// by passing a negative number.
|
||||
func (ct *ConsoleThrottler) markActivation(increment bool) uint64 {
|
||||
if !increment {
|
||||
if atomic.LoadUint64(&ct.activations) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
ct.activations -= 1
|
||||
// This weird dohicky subtracts 1 from the activation count.
|
||||
return atomic.AddUint64(&ct.activations, ^uint64(0))
|
||||
}
|
||||
|
||||
return ct.activations
|
||||
return atomic.AddUint64(&ct.activations, 1)
|
||||
}
|
||||
|
||||
// Increment the total count of lines that we have processed so far.
|
||||
func (ct *ConsoleThrottler) IncrementLineCount() uint64 {
|
||||
return atomic.AddUint64(&ct.lines, 1)
|
||||
// Determines if the console is currently being throttled. Calls to this function can be used to
|
||||
// determine if output should be funneled along to the websocket processes.
|
||||
func (ct *ConsoleThrottler) Throttled() bool {
|
||||
return ct.isThrottled.Get()
|
||||
}
|
||||
|
||||
// Reset the line count to zero.
|
||||
func (ct *ConsoleThrottler) ResetLineCount() {
|
||||
atomic.SwapUint64(&ct.lines, 0)
|
||||
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed
|
||||
// and number of activations, regardless of the current console message volume.
|
||||
func (ct *ConsoleThrottler) StartTimer() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond)
|
||||
decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
reset.Stop()
|
||||
return
|
||||
case <-reset.C:
|
||||
ct.isThrottled.Set(false)
|
||||
atomic.StoreUint64(&ct.count, 0)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
decay.Stop()
|
||||
return
|
||||
case <-decay.C:
|
||||
ct.markActivation(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
ct.timerCancel = &cancel
|
||||
}
|
||||
|
||||
// Stops a running timer processes if one exists. This is only called when the server is deleted since
|
||||
// we want this to always be running. If there is no process currently running nothing will really happen.
|
||||
func (ct *ConsoleThrottler) StopTimer() {
|
||||
ct.mu.Lock()
|
||||
defer ct.mu.Unlock()
|
||||
if ct.timerCancel != nil {
|
||||
c := *ct.timerCancel
|
||||
c()
|
||||
ct.timerCancel = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Handles output from a server's console. This code ensures that a server is not outputting
|
||||
|
@ -70,30 +121,41 @@ func (ct *ConsoleThrottler) ResetLineCount() {
|
|||
// data all at once. These values are all configurable via the wings configuration file, however the
|
||||
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite
|
||||
// confident in them.
|
||||
func (ct *ConsoleThrottler) Handle() {
|
||||
//
|
||||
// This function returns an error if the server should be stopped due to violating throttle constraints
|
||||
// and a boolean value indicating if a throttle is being violated when it is checked.
|
||||
func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
|
||||
if !ct.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Increment the line count and if we have now output more lines than are allowed, trigger a throttle
|
||||
// activation. Once the throttle is triggered and has passed the kill at value we will trigger a server
|
||||
// stop automatically.
|
||||
if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() {
|
||||
ct.isThrottled.Set(true)
|
||||
if ct.markActivation(true) >= ct.MaximumTriggerCount {
|
||||
return ErrTooMuchConsoleData
|
||||
}
|
||||
|
||||
onTrigger()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns the throttler instance for the server or creates a new one.
|
||||
func (s *Server) Throttler() *ConsoleThrottler {
|
||||
s.throttleLock.RLock()
|
||||
s.throttleLock.Lock()
|
||||
defer s.throttleLock.Unlock()
|
||||
|
||||
if s.throttler == nil {
|
||||
// Release the read lock so that we can acquire a normal lock on the process and
|
||||
// make modifications to the throttler.
|
||||
s.throttleLock.RUnlock()
|
||||
|
||||
s.throttleLock.Lock()
|
||||
s.throttler = &ConsoleThrottler{
|
||||
ConsoleThrottles: config.Get().Throttles,
|
||||
}
|
||||
s.throttleLock.Unlock()
|
||||
}
|
||||
|
||||
return s.throttler
|
||||
} else {
|
||||
defer s.throttleLock.RUnlock()
|
||||
return s.throttler
|
||||
}
|
||||
}
|
||||
|
||||
// Sends output to the server console formatted to appear correctly as being sent
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"github.com/apex/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/api"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/environment"
|
||||
"github.com/pterodactyl/wings/events"
|
||||
"regexp"
|
||||
|
@ -21,16 +22,50 @@ var dockerEvents = []string{
|
|||
// removed by deleting the server as they should last for the duration of the process' lifetime.
|
||||
func (s *Server) StartEventListeners() {
|
||||
console := func(e events.Event) {
|
||||
// Immediately emit this event back over the server event stream since it is
|
||||
// being called from the environment event stream and things probably aren't
|
||||
// listening to that event.
|
||||
t := s.Throttler()
|
||||
err := t.Increment(func () {
|
||||
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
|
||||
})
|
||||
|
||||
// An error is only returned if the server has breached the thresholds set.
|
||||
if err != nil {
|
||||
// If the process is already stopping, just let it continue with that action rather than attempting
|
||||
// to terminate again.
|
||||
if s.GetState() != environment.ProcessStoppingState {
|
||||
s.SetState(environment.ProcessStoppingState)
|
||||
go func() {
|
||||
s.Log().Warn("stopping server instance, violating throttle limits")
|
||||
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.")
|
||||
// Completely skip over server power actions and terminate the running instance. This gives the
|
||||
// server 15 seconds to finish stopping gracefully before it is forcefully terminated.
|
||||
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil {
|
||||
// If there is an error set the process back to running so that this throttler is called
|
||||
// again and hopefully kills the server.
|
||||
if s.GetState() != environment.ProcessOfflineState {
|
||||
s.SetState(environment.ProcessRunningState)
|
||||
}
|
||||
|
||||
s.Log().WithField("error", errors.WithStack(err)).Error("failed to terminate environment after triggering throttle")
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// If we are not throttled, go ahead and output the data.
|
||||
if !t.Throttled() {
|
||||
s.Events().Publish(ConsoleOutputEvent, e.Data)
|
||||
}
|
||||
|
||||
// Also pass the data along to the console output channel.
|
||||
s.onConsoleOutput(e.Data)
|
||||
}
|
||||
|
||||
state := func(e events.Event) {
|
||||
// Reset the throttler when the process is started.
|
||||
if e.Data == environment.ProcessStartingState {
|
||||
s.Throttler().Reset()
|
||||
}
|
||||
|
||||
s.SetState(e.Data)
|
||||
}
|
||||
|
||||
|
|
|
@ -111,6 +111,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
|||
} else {
|
||||
s.Environment = env
|
||||
s.StartEventListeners()
|
||||
s.Throttler().StartTimer()
|
||||
}
|
||||
|
||||
// Forces the configuration to be synced with the panel.
|
||||
|
|
|
@ -22,7 +22,7 @@ type Server struct {
|
|||
sync.RWMutex
|
||||
emitterLock sync.Mutex
|
||||
powerLock *semaphore.Weighted
|
||||
throttleLock sync.RWMutex
|
||||
throttleLock sync.Mutex
|
||||
|
||||
// Maintains the configuration for the server. This is the data that gets returned by the Panel
|
||||
// such as build settings and container images.
|
||||
|
|
20
system/bool.go
Normal file
20
system/bool.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package system
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
type AtomicBool struct {
|
||||
flag uint32
|
||||
}
|
||||
|
||||
func (ab *AtomicBool) Set(v bool) {
|
||||
i := 0
|
||||
if v {
|
||||
i = 1
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&ab.flag, uint32(i))
|
||||
}
|
||||
|
||||
func (ab *AtomicBool) Get() bool {
|
||||
return atomic.LoadUint32(&ab.flag) == 1
|
||||
}
|
Loading…
Reference in New Issue
Block a user