From 4c8f5c21a377f016f820804506deb6149ef778e1 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 23 Jan 2022 09:49:35 -0800 Subject: [PATCH] Improve power lock logic (#118) --- router/router_server.go | 24 +++--- server/power.go | 153 ++++++++++++++++++++++++++++---------- server/power_test.go | 158 ++++++++++++++++++++++++++++++++++++++++ server/server.go | 16 +++- server/sink_test.go | 2 +- 5 files changed, 299 insertions(+), 54 deletions(-) create mode 100644 server/power_test.go diff --git a/router/router_server.go b/router/router_server.go index 2ae2030..83fc078 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -52,7 +52,8 @@ func postServerPower(c *gin.Context) { s := ExtractServer(c) var data struct { - Action server.PowerAction `json:"action"` + Action server.PowerAction `json:"action"` + WaitSeconds int `json:"wait_seconds"` } if err := c.BindJSON(&data); err != nil { @@ -83,12 +84,16 @@ func postServerPower(c *gin.Context) { // we can immediately return a response from the server. Some of these actions // can take quite some time, especially stopping or restarting. go func(s *server.Server) { - if err := s.HandlePowerAction(data.Action, 30); err != nil { + if data.WaitSeconds < 0 || data.WaitSeconds > 300 { + data.WaitSeconds = 30 + } + if err := s.HandlePowerAction(data.Action, data.WaitSeconds); err != nil { if errors.Is(err, context.DeadlineExceeded) { - s.Log().WithField("action", data.Action). - Warn("could not acquire a lock while attempting to perform a power action") + s.Log().WithField("action", data.Action).WithField("error", err).Warn("could not process server power action") + } else if errors.Is(err, server.ErrIsRunning) { + // Do nothing, this isn't something we care about for logging, } else { - s.Log().WithFields(log.Fields{"action": data, "error": err}). + s.Log().WithFields(log.Fields{"action": data.Action, "wait_seconds": data.WaitSeconds, "error": err}). Error("encountered error processing a server power action in the background") } } @@ -182,14 +187,7 @@ func deleteServer(c *gin.Context) { // Immediately suspend the server to prevent a user from attempting // to start it while this process is running. s.Config().SetSuspended(true) - - // Stop all running background tasks for this server that are using the context on - // the server struct. This will cancel any running install processes for the server - // as well. - s.CtxCancel() - s.Events().Destroy() - s.DestroyAllSinks() - s.Websockets().CancelAll() + s.CleanupForDestroy() // Remove any pending remote file downloads for the server. for _, dl := range downloader.ByServer(s.ID()) { diff --git a/server/power.go b/server/power.go index 0f83304..543c1a7 100644 --- a/server/power.go +++ b/server/power.go @@ -2,12 +2,13 @@ package server import ( "context" + "fmt" "os" + "sync" "time" "emperror.dev/errors" - "golang.org/x/sync/semaphore" - + "github.com/google/uuid" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/environment" ) @@ -40,19 +41,85 @@ func (pa PowerAction) IsStart() bool { return pa == PowerActionStart || pa == PowerActionRestart } -// ExecutingPowerAction checks if there is currently a power action being processed for the server. +type powerLocker struct { + mu sync.RWMutex + ch chan bool +} + +func newPowerLocker() *powerLocker { + return &powerLocker{ + ch: make(chan bool, 1), + } +} + +type errPowerLockerLocked struct{} + +func (e errPowerLockerLocked) Error() string { + return "cannot acquire a lock on the power state: already locked" +} + +var ErrPowerLockerLocked error = errPowerLockerLocked{} + +// IsLocked returns the current state of the locker channel. If there is +// currently a value in the channel, it is assumed to be locked. +func (pl *powerLocker) IsLocked() bool { + pl.mu.RLock() + defer pl.mu.RUnlock() + return len(pl.ch) == 1 +} + +// Acquire will acquire the power lock if it is not currently locked. If it is +// already locked, acquire will fail to acquire the lock, and will return false. +func (pl *powerLocker) Acquire() error { + pl.mu.Lock() + defer pl.mu.Unlock() + if len(pl.ch) == 1 { + return errors.WithStack(ErrPowerLockerLocked) + } + pl.ch <- true + return nil +} + +// TryAcquire will attempt to acquire a power-lock until the context provided +// is canceled. +func (pl *powerLocker) TryAcquire(ctx context.Context) error { + select { + case pl.ch <- true: + return nil + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + return nil + } +} + +// Release will drain the locker channel so that we can properly re-acquire it +// at a later time. +func (pl *powerLocker) Release() { + pl.mu.Lock() + if len(pl.ch) == 1 { + <-pl.ch + } + pl.mu.Unlock() +} + +// Destroy cleans up the power locker by closing the channel. +func (pl *powerLocker) Destroy() { + pl.mu.Lock() + if pl.ch != nil { + if len(pl.ch) == 1 { + <-pl.ch + } + close(pl.ch) + } + pl.mu.Unlock() +} + +// ExecutingPowerAction checks if there is currently a power action being +// processed for the server. func (s *Server) ExecutingPowerAction() bool { - if s.powerLock == nil { - return false - } - - ok := s.powerLock.TryAcquire(1) - if ok { - s.powerLock.Release(1) - } - - // Remember, if we acquired a lock it means nothing was running. - return !ok + return s.powerLock.IsLocked() } // HandlePowerAction is a helper function that can receive a power action and then process the @@ -63,22 +130,29 @@ func (s *Server) ExecutingPowerAction() bool { // function rather than making direct calls to the start/stop/restart functions on the // environment struct. func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error { - if s.IsInstalling() { + if s.IsInstalling() || s.IsTransferring() || s.IsRestoring() { + if s.IsRestoring() { + return ErrServerIsRestoring + } else if s.IsTransferring() { + return ErrServerIsTransferring + } return ErrServerIsInstalling } - if s.IsTransferring() { - return ErrServerIsTransferring + lockId, _ := uuid.NewUUID() + log := s.Log().WithField("lock_id", lockId.String()).WithField("action", action) + + cleanup := func() { + log.Info("releasing exclusive lock for power action") + s.powerLock.Release() } - if s.IsRestoring() { - return ErrServerIsRestoring - } - - if s.powerLock == nil { - s.powerLock = semaphore.NewWeighted(1) + var wait int + if len(waitSeconds) > 0 && waitSeconds[0] > 0 { + wait = waitSeconds[0] } + log.WithField("wait_seconds", wait).Debug("acquiring power action lock for instance") // Only attempt to acquire a lock on the process if this is not a termination event. We want to // just allow those events to pass right through for good reason. If a server is currently trying // to process a power action but has gotten stuck you still should be able to pass through the @@ -87,33 +161,38 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error if action != PowerActionTerminate { // Determines if we should wait for the lock or not. If a value greater than 0 is passed // into this function we will wait that long for a lock to be acquired. - if len(waitSeconds) > 0 && waitSeconds[0] != 0 { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(waitSeconds[0])) + if wait > 0 { + ctx, cancel := context.WithTimeout(s.ctx, time.Second*time.Duration(wait)) defer cancel() // Attempt to acquire a lock on the power action lock for up to 30 seconds. If more // time than that passes an error will be propagated back up the chain and this // request will be aborted. - if err := s.powerLock.Acquire(ctx, 1); err != nil { - return errors.WithMessage(err, "could not acquire lock on power state") + if err := s.powerLock.TryAcquire(ctx); err != nil { + return errors.Wrap(err, fmt.Sprintf("could not acquire lock on power action after %d seconds", wait)) } } else { // If no wait duration was provided we will attempt to immediately acquire the lock // and bail out with a context deadline error if it is not acquired immediately. - if ok := s.powerLock.TryAcquire(1); !ok { - return errors.WithMessage(context.DeadlineExceeded, "could not acquire lock on power state") + if err := s.powerLock.Acquire(); err != nil { + return errors.Wrap(err, "failed to acquire exclusive lock for power actions") } } - // Release the lock once the process being requested has finished executing. - defer s.powerLock.Release(1) + log.Info("acquired exclusive lock on power actions, processing event...") + defer cleanup() } else { - // Still try to acquire the lock if terminating, and it is available, just so that other power - // actions are blocked until it has completed. However, if it is unavailable we won't stop - // the entire process. - if ok := s.powerLock.TryAcquire(1); ok { - // If we managed to acquire the lock be sure to released it once this process is completed. - defer s.powerLock.Release(1) + // Still try to acquire the lock if terminating, and it is available, just so that + // other power actions are blocked until it has completed. However, if it cannot be + // acquired we won't stop the entire process. + // + // If we did successfully acquire the lock, make sure we release it once we're done + // executiong the power actions. + if err := s.powerLock.Acquire(); err == nil { + log.Info("acquired exclusive lock on power actions, processing event...") + defer cleanup() + } else { + log.Warn("failed to acquire exclusive lock, ignoring failure for termination event") } } diff --git a/server/power_test.go b/server/power_test.go new file mode 100644 index 0000000..02446e7 --- /dev/null +++ b/server/power_test.go @@ -0,0 +1,158 @@ +package server + +import ( + "context" + "testing" + "time" + + "emperror.dev/errors" + . "github.com/franela/goblin" +) + +func TestPower(t *testing.T) { + g := Goblin(t) + + g.Describe("PowerLocker", func() { + var pl *powerLocker + g.BeforeEach(func() { + pl = newPowerLocker() + }) + + g.Describe("PowerLocker#IsLocked", func() { + g.It("should return false when the channel is empty", func() { + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(pl.IsLocked()).IsFalse() + }) + + g.It("should return true when the channel is at capacity", func() { + pl.ch <- true + + g.Assert(pl.IsLocked()).IsTrue() + <-pl.ch + g.Assert(pl.IsLocked()).IsFalse() + + // We don't care what the channel value is, just that there is + // something in it. + pl.ch <- false + g.Assert(pl.IsLocked()).IsTrue() + g.Assert(cap(pl.ch)).Equal(1) + }) + }) + + g.Describe("PowerLocker#Acquire", func() { + g.It("should acquire a lock when channel is empty", func() { + err := pl.Acquire() + + g.Assert(err).IsNil() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(1) + }) + + g.It("should return an error when the channel is full", func() { + pl.ch <- true + + err := pl.Acquire() + + g.Assert(err).IsNotNil() + g.Assert(errors.Is(err, ErrPowerLockerLocked)).IsTrue() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(1) + }) + }) + + g.Describe("PowerLocker#TryAcquire", func() { + g.It("should acquire a lock when channel is empty", func() { + g.Timeout(time.Second) + + err := pl.TryAcquire(context.Background()) + + g.Assert(err).IsNil() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(1) + g.Assert(pl.IsLocked()).IsTrue() + }) + + g.It("should block until context is canceled if channel is full", func() { + g.Timeout(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + pl.ch <- true + err := pl.TryAcquire(ctx) + + g.Assert(err).IsNotNil() + g.Assert(errors.Is(err, context.DeadlineExceeded)).IsTrue() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(1) + g.Assert(pl.IsLocked()).IsTrue() + }) + + g.It("should block until lock can be acquired", func() { + g.Timeout(time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) + defer cancel() + + pl.Acquire() + go func() { + time.AfterFunc(time.Millisecond * 50, func() { + pl.Release() + }) + }() + + err := pl.TryAcquire(ctx) + g.Assert(err).IsNil() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(1) + g.Assert(pl.IsLocked()).IsTrue() + }) + }) + + g.Describe("PowerLocker#Release", func() { + g.It("should release when channel is full", func() { + pl.Acquire() + g.Assert(pl.IsLocked()).IsTrue() + pl.Release() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(0) + g.Assert(pl.IsLocked()).IsFalse() + }) + + g.It("should release when channel is empty", func() { + g.Assert(pl.IsLocked()).IsFalse() + pl.Release() + g.Assert(cap(pl.ch)).Equal(1) + g.Assert(len(pl.ch)).Equal(0) + g.Assert(pl.IsLocked()).IsFalse() + }) + }) + + g.Describe("PowerLocker#Destroy", func() { + g.It("should unlock and close the channel", func() { + pl.Acquire() + g.Assert(pl.IsLocked()).IsTrue() + pl.Destroy() + g.Assert(pl.IsLocked()).IsFalse() + + defer func() { + r := recover() + + g.Assert(r).IsNotNil() + g.Assert(r.(error).Error()).Equal("send on closed channel") + }() + + pl.Acquire() + }) + }) + }) + + g.Describe("Server#ExecutingPowerAction", func() { + g.It("should return based on locker status", func() { + s := &Server{powerLock: newPowerLocker()} + + g.Assert(s.ExecutingPowerAction()).IsFalse() + s.powerLock.Acquire() + g.Assert(s.ExecutingPowerAction()).IsTrue() + }) + }) +} diff --git a/server/server.go b/server/server.go index 7008bd4..e6fd66a 100644 --- a/server/server.go +++ b/server/server.go @@ -12,8 +12,6 @@ import ( "emperror.dev/errors" "github.com/apex/log" "github.com/creasty/defaults" - "golang.org/x/sync/semaphore" - "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/events" @@ -32,7 +30,7 @@ type Server struct { ctxCancel *context.CancelFunc emitterLock sync.Mutex - powerLock *semaphore.Weighted + powerLock *powerLocker throttleOnce sync.Once // Maintains the configuration for the server. This is the data that gets returned by the Panel @@ -88,6 +86,7 @@ func New(client remote.Client) (*Server, error) { installing: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false), + powerLock: newPowerLocker(), sinks: map[SinkName]*sinkPool{ LogSink: newSinkPool(), InstallSink: newSinkPool(), @@ -103,6 +102,17 @@ func New(client remote.Client) (*Server, error) { return &s, nil } +// CleanupForDestroy stops all running background tasks for this server that are +// using the context on the server struct. This will cancel any running install +// processes for the server as well. +func (s *Server) CleanupForDestroy() { + s.CtxCancel() + s.Events().Destroy() + s.DestroyAllSinks() + s.Websockets().CancelAll() + s.powerLock.Destroy() +} + // ID returns the UUID for the server instance. func (s *Server) ID() string { return s.Config().GetUuid() diff --git a/server/sink_test.go b/server/sink_test.go index 5713cee..97c763f 100644 --- a/server/sink_test.go +++ b/server/sink_test.go @@ -16,7 +16,7 @@ func MutexLocked(m *sync.RWMutex) bool { return state.Int()&1 == 1 || v.FieldByName("readerCount").Int() > 0 } -func Test(t *testing.T) { +func TestSink(t *testing.T) { g := Goblin(t) g.Describe("SinkPool#On", func() {