Refactor power handling logic to be more robust and able to handle spam clicking and duplicate power actions

This commit is contained in:
Dane Everitt 2020-08-01 15:20:39 -07:00
parent ecb2cb05ce
commit 177aa8e436
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
8 changed files with 129 additions and 70 deletions

View File

@ -205,7 +205,7 @@ func rootCmdRun(*cobra.Command, []string) {
// is that it was running, but we see that the container process is not currently running. // is that it was running, but we see that the container process is not currently running.
if r || (!r && s.IsRunning()) { if r || (!r && s.IsRunning()) {
s.Log().Info("detected server is running, re-attaching to process...") s.Log().Info("detected server is running, re-attaching to process...")
if err := s.Environment.Start(); err != nil { if err := s.HandlePowerAction(server.PowerActionStart); err != nil {
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to properly start server detected as already running") s.Log().WithField("error", errors.WithStack(err)).Warn("failed to properly start server detected as already running")
} }

View File

@ -2,6 +2,7 @@ package router
import ( import (
"bytes" "bytes"
"context"
"github.com/apex/log" "github.com/apex/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -47,13 +48,15 @@ func getServerLogs(c *gin.Context) {
func postServerPower(c *gin.Context) { func postServerPower(c *gin.Context) {
s := GetServer(c.Param("server")) s := GetServer(c.Param("server"))
var data server.PowerAction var data struct{
// BindJSON sends 400 if the request fails, all we need to do is return Action server.PowerAction `json:"action"`
}
if err := c.BindJSON(&data); err != nil { if err := c.BindJSON(&data); err != nil {
return return
} }
if !data.IsValid() { if !data.Action.IsValid() {
c.AbortWithStatusJSON(http.StatusUnprocessableEntity, gin.H{ c.AbortWithStatusJSON(http.StatusUnprocessableEntity, gin.H{
"error": "The power action provided was not valid, should be one of \"stop\", \"start\", \"restart\", \"kill\"", "error": "The power action provided was not valid, should be one of \"stop\", \"start\", \"restart\", \"kill\"",
}) })
@ -66,7 +69,7 @@ func postServerPower(c *gin.Context) {
// //
// We don't really care about any of the other actions at this point, they'll all result // We don't really care about any of the other actions at this point, they'll all result
// in the process being stopped, which should have happened anyways if the server is suspended. // in the process being stopped, which should have happened anyways if the server is suspended.
if (data.Action == "start" || data.Action == "restart") && s.IsSuspended() { if (data.Action == server.PowerActionStart || data.Action == server.PowerActionRestart) && s.IsSuspended() {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "Cannot start or restart a server that is suspended.", "error": "Cannot start or restart a server that is suspended.",
}) })
@ -76,11 +79,16 @@ func postServerPower(c *gin.Context) {
// Pass the actual heavy processing off to a seperate thread to handle so that // Pass the actual heavy processing off to a seperate thread to handle so that
// we can immediately return a response from the server. Some of these actions // we can immediately return a response from the server. Some of these actions
// can take quite some time, especially stopping or restarting. // can take quite some time, especially stopping or restarting.
go func(server *server.Server) { go func(s *server.Server) {
if err := server.HandlePowerAction(data); err != nil { if err := s.HandlePowerAction(data.Action, 30); err != nil {
server.Log().WithFields(log.Fields{"action": data, "error": err}). if errors.Is(err, context.DeadlineExceeded) {
s.Log().WithField("action", data.Action).
Warn("could not acquire a lock while attempting to perform a power action")
} else {
s.Log().WithFields(log.Fields{"action": data, "error": err}).
Error("encountered error processing a server power action in the background") Error("encountered error processing a server power action in the background")
} }
}
}(s) }(s)
c.Status(http.StatusAccepted) c.Status(http.StatusAccepted)

View File

@ -1,6 +1,7 @@
package websocket package websocket
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/apex/log" "github.com/apex/log"
@ -12,7 +13,6 @@ import (
"github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"net/http" "net/http"
"os"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -150,7 +150,7 @@ func (h *Handler) TokenValid() error {
// Sends an error back to the connected websocket instance by checking the permissions // Sends an error back to the connected websocket instance by checking the permissions
// of the token. If the user has the "receive-errors" grant we will send back the actual // of the token. If the user has the "receive-errors" grant we will send back the actual
// error message, otherwise we just send back a standard error message. // error message, otherwise we just send back a standard error message.
func (h *Handler) SendErrorJson(msg Message, err error) error { func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
j := h.GetJwt() j := h.GetJwt()
message := "an unexpected error was encountered while handling this request" message := "an unexpected error was encountered while handling this request"
@ -163,10 +163,12 @@ func (h *Handler) SendErrorJson(msg Message, err error) error {
wsm := Message{Event: ErrorEvent} wsm := Message{Event: ErrorEvent}
wsm.Args = []string{m} wsm.Args = []string{m}
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
if !server.IsSuspendedError(err) { if !server.IsSuspendedError(err) {
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}). h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
Error("failed to handle websocket process; an error was encountered processing an event") Error("failed to handle websocket process; an error was encountered processing an event")
} }
}
return h.unsafeSendJson(wsm) return h.unsafeSendJson(wsm)
} }
@ -271,37 +273,34 @@ func (h *Handler) HandleInbound(m Message) error {
} }
case SetStateEvent: case SetStateEvent:
{ {
switch strings.Join(m.Args, "") { action := server.PowerAction(strings.Join(m.Args, ""))
case "start":
if h.GetJwt().HasPermission(PermissionSendPowerStart) { actions := make(map[server.PowerAction]string)
return h.server.Environment.Start() actions[server.PowerActionStart] = PermissionSendPowerStart
actions[server.PowerActionStop] = PermissionSendPowerStop
actions[server.PowerActionRestart] = PermissionSendPowerRestart
actions[server.PowerActionTerminate] = PermissionSendPowerStop
// Check that they have permission to perform this action if it is needed.
if permission, exists := actions[action]; exists {
if !h.GetJwt().HasPermission(permission) {
return nil
} }
break
case "stop":
if h.GetJwt().HasPermission(PermissionSendPowerStop) {
return h.server.Environment.Stop()
} }
break
case "restart": err := h.server.HandlePowerAction(action)
if h.GetJwt().HasPermission(PermissionSendPowerRestart) { if errors.Is(err, context.DeadlineExceeded) {
// If the server is alreay restarting don't do anything. Perhaps we send back an event m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later")
// in the future for this? For now no reason to knowingly trigger an error by trying to
// restart a process already restarting. h.SendJson(&Message{
if h.server.Environment.IsRestarting() { Event: ErrorEvent,
Args: []string{m},
})
return nil return nil
} }
return h.server.Environment.Restart() return err
}
break
case "kill":
if h.GetJwt().HasPermission(PermissionSendPowerStop) {
return h.server.Environment.Terminate(os.Kill)
}
break
}
return nil
} }
case SendServerLogsEvent: case SendServerLogsEvent:
{ {

View File

@ -82,5 +82,5 @@ func (s *Server) handleServerCrash() error {
s.crasher.SetLastCrash(time.Now()) s.crasher.SetLastCrash(time.Now())
return s.Environment.Start() return s.HandlePowerAction(PowerActionStart)
} }

View File

@ -36,7 +36,6 @@ type Environment interface {
// unnecessary double/triple/quad looping issues if multiple people press restart or spam the // unnecessary double/triple/quad looping issues if multiple people press restart or spam the
// button to restart. // button to restart.
Restart() error Restart() error
IsRestarting() bool
// Waits for a server instance to stop gracefully. If the server is still detected // Waits for a server instance to stop gracefully. If the server is still detected
// as running after seconds, an error will be returned, or the server will be terminated // as running after seconds, an error will be returned, or the server will be terminated

View File

@ -342,13 +342,13 @@ func (d *DockerEnvironment) acquireRestartLock() error {
// start command. This will return an error if there is already a restart process executing for the // start command. This will return an error if there is already a restart process executing for the
// server. The lock is released when the process is stopped and a start has begun. // server. The lock is released when the process is stopped and a start has begun.
func (d *DockerEnvironment) Restart() error { func (d *DockerEnvironment) Restart() error {
d.Server.Log().Debug("attempting to acquire restart lock...") d.Server.Log().Debug("acquiring process restart lock...")
if err := d.acquireRestartLock(); err != nil { if err := d.acquireRestartLock(); err != nil {
d.Server.Log().Warn("failed to acquire restart lock; already acquired by a different process") d.Server.Log().Warn("failed to acquire restart lock; already acquired by a different process")
return err return err
} }
d.Server.Log().Debug("acquired restart lock") d.Server.Log().Info("acquired process lock; beginning restart process...")
err := d.WaitForStop(60, false) err := d.WaitForStop(60, false)
if err != nil { if err != nil {
@ -496,12 +496,14 @@ func (d *DockerEnvironment) Attach() error {
} }
var err error var err error
d.Lock()
d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{ d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{
Stdin: true, Stdin: true,
Stdout: true, Stdout: true,
Stderr: true, Stderr: true,
Stream: true, Stream: true,
}) })
d.Unlock()
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)

View File

@ -1,12 +1,80 @@
package server package server
type PowerAction struct { import (
Action string `json:"action"` "context"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
"os"
"time"
)
type PowerAction string
// The power actions that can be performed for a given server. This taps into the given server
// environment and performs them in a way that prevents a race condition from occurring. For
// example, sending two "start" actions back to back will not process the second action until
// the first action has been completed.
//
// This utilizes a workerpool with a limit of one worker so that all of the actions execute
// in a sync manner.
const (
PowerActionStart = "start"
PowerActionStop = "stop"
PowerActionRestart = "restart"
PowerActionTerminate = "kill"
)
// Checks if the power action being received is valid.
func (pa PowerAction) IsValid() bool {
return pa == PowerActionStart ||
pa == PowerActionStop ||
pa == PowerActionTerminate ||
pa == PowerActionRestart
} }
func (pr *PowerAction) IsValid() bool { // Helper function that can receive a power action and then process the actions that need
return pr.Action == "start" || // to occur for it. This guards against someone calling Start() twice at the same time, or
pr.Action == "stop" || // trying to restart while another restart process is currently running.
pr.Action == "kill" || //
pr.Action == "restart" // However, the code design for the daemon does depend on the user correctly calling this
// function rather than making direct calls to the start/stop/restart functions on the
// environment struct.
func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error {
if s.powerLock == nil {
s.powerLock = semaphore.NewWeighted(1)
}
// Determines if we should wait for the lock or not. If a value greater than 0 is passed
// into this function we will wait that long for a lock to be acquired.
if len(waitSeconds) > 0 && waitSeconds[0] != 0 {
ctx, _ := context.WithTimeout(context.Background(), time.Second*time.Duration(waitSeconds[0]))
// Attempt to acquire a lock on the power action lock for up to 30 seconds. If more
// time than that passes an error will be propagated back up the chain and this
// request will be aborted.
if err := s.powerLock.Acquire(ctx, 1); err != nil {
return errors.WithMessage(err, "could not acquire lock on power state")
}
} else {
// If no wait duration was provided we will attempt to immediately acquire the lock
// and bail out with a context deadline error if it is not acquired immediately.
if ok := s.powerLock.TryAcquire(1); !ok {
return errors.WithMessage(context.DeadlineExceeded, "could not acquire lock on power state")
}
}
// Release the lock once the process being requested has finished executing.
defer s.powerLock.Release(1)
switch action {
case PowerActionStart:
return s.Environment.Start()
case PowerActionStop:
return s.Environment.Stop()
case PowerActionRestart:
return s.Environment.Restart()
case PowerActionTerminate:
return s.Environment.Terminate(os.Kill)
}
return errors.New("attempting to handle unknown power action")
} }

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"os"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -20,6 +19,7 @@ type Server struct {
// writing the configuration to the disk. // writing the configuration to the disk.
sync.RWMutex sync.RWMutex
emitterLock sync.Mutex emitterLock sync.Mutex
powerLock *semaphore.Weighted
// Maintains the configuration for the server. This is the data that gets returned by the Panel // Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images. // such as build settings and container images.
@ -158,23 +158,6 @@ func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *a
return api.NewRequester().GetServerConfiguration(s.Id()) return api.NewRequester().GetServerConfiguration(s.Id())
} }
// Helper function that can receive a power action and then process the
// actions that need to occur for it.
func (s *Server) HandlePowerAction(action PowerAction) error {
switch action.Action {
case "start":
return s.Environment.Start()
case "restart":
return s.Environment.Restart()
case "stop":
return s.Environment.Stop()
case "kill":
return s.Environment.Terminate(os.Kill)
default:
return errors.New("an invalid power action was provided")
}
}
// Checks if the server is marked as being suspended or not on the system. // Checks if the server is marked as being suspended or not on the system.
func (s *Server) IsSuspended() bool { func (s *Server) IsSuspended() bool {
return s.Config().Suspended return s.Config().Suspended