Improve power lock logic (#118)

This commit is contained in:
Dane Everitt 2022-01-23 09:49:35 -08:00 committed by GitHub
parent c52db4eec0
commit 4c8f5c21a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 299 additions and 54 deletions

View File

@ -53,6 +53,7 @@ func postServerPower(c *gin.Context) {
var data struct {
Action server.PowerAction `json:"action"`
WaitSeconds int `json:"wait_seconds"`
}
if err := c.BindJSON(&data); err != nil {
@ -83,12 +84,16 @@ func postServerPower(c *gin.Context) {
// we can immediately return a response from the server. Some of these actions
// can take quite some time, especially stopping or restarting.
go func(s *server.Server) {
if err := s.HandlePowerAction(data.Action, 30); err != nil {
if data.WaitSeconds < 0 || data.WaitSeconds > 300 {
data.WaitSeconds = 30
}
if err := s.HandlePowerAction(data.Action, data.WaitSeconds); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
s.Log().WithField("action", data.Action).
Warn("could not acquire a lock while attempting to perform a power action")
s.Log().WithField("action", data.Action).WithField("error", err).Warn("could not process server power action")
} else if errors.Is(err, server.ErrIsRunning) {
// Do nothing, this isn't something we care about for logging,
} else {
s.Log().WithFields(log.Fields{"action": data, "error": err}).
s.Log().WithFields(log.Fields{"action": data.Action, "wait_seconds": data.WaitSeconds, "error": err}).
Error("encountered error processing a server power action in the background")
}
}
@ -182,14 +187,7 @@ func deleteServer(c *gin.Context) {
// Immediately suspend the server to prevent a user from attempting
// to start it while this process is running.
s.Config().SetSuspended(true)
// Stop all running background tasks for this server that are using the context on
// the server struct. This will cancel any running install processes for the server
// as well.
s.CtxCancel()
s.Events().Destroy()
s.DestroyAllSinks()
s.Websockets().CancelAll()
s.CleanupForDestroy()
// Remove any pending remote file downloads for the server.
for _, dl := range downloader.ByServer(s.ID()) {

View File

@ -2,12 +2,13 @@ package server
import (
"context"
"fmt"
"os"
"sync"
"time"
"emperror.dev/errors"
"golang.org/x/sync/semaphore"
"github.com/google/uuid"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
)
@ -40,19 +41,85 @@ func (pa PowerAction) IsStart() bool {
return pa == PowerActionStart || pa == PowerActionRestart
}
// ExecutingPowerAction checks if there is currently a power action being processed for the server.
type powerLocker struct {
mu sync.RWMutex
ch chan bool
}
func newPowerLocker() *powerLocker {
return &powerLocker{
ch: make(chan bool, 1),
}
}
type errPowerLockerLocked struct{}
func (e errPowerLockerLocked) Error() string {
return "cannot acquire a lock on the power state: already locked"
}
var ErrPowerLockerLocked error = errPowerLockerLocked{}
// IsLocked returns the current state of the locker channel. If there is
// currently a value in the channel, it is assumed to be locked.
func (pl *powerLocker) IsLocked() bool {
pl.mu.RLock()
defer pl.mu.RUnlock()
return len(pl.ch) == 1
}
// Acquire will acquire the power lock if it is not currently locked. If it is
// already locked, acquire will fail to acquire the lock, and will return false.
func (pl *powerLocker) Acquire() error {
pl.mu.Lock()
defer pl.mu.Unlock()
if len(pl.ch) == 1 {
return errors.WithStack(ErrPowerLockerLocked)
}
pl.ch <- true
return nil
}
// TryAcquire will attempt to acquire a power-lock until the context provided
// is canceled.
func (pl *powerLocker) TryAcquire(ctx context.Context) error {
select {
case pl.ch <- true:
return nil
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return errors.WithStack(err)
}
return nil
}
}
// Release will drain the locker channel so that we can properly re-acquire it
// at a later time.
func (pl *powerLocker) Release() {
pl.mu.Lock()
if len(pl.ch) == 1 {
<-pl.ch
}
pl.mu.Unlock()
}
// Destroy cleans up the power locker by closing the channel.
func (pl *powerLocker) Destroy() {
pl.mu.Lock()
if pl.ch != nil {
if len(pl.ch) == 1 {
<-pl.ch
}
close(pl.ch)
}
pl.mu.Unlock()
}
// ExecutingPowerAction checks if there is currently a power action being
// processed for the server.
func (s *Server) ExecutingPowerAction() bool {
if s.powerLock == nil {
return false
}
ok := s.powerLock.TryAcquire(1)
if ok {
s.powerLock.Release(1)
}
// Remember, if we acquired a lock it means nothing was running.
return !ok
return s.powerLock.IsLocked()
}
// HandlePowerAction is a helper function that can receive a power action and then process the
@ -63,22 +130,29 @@ func (s *Server) ExecutingPowerAction() bool {
// function rather than making direct calls to the start/stop/restart functions on the
// environment struct.
func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error {
if s.IsInstalling() {
if s.IsInstalling() || s.IsTransferring() || s.IsRestoring() {
if s.IsRestoring() {
return ErrServerIsRestoring
} else if s.IsTransferring() {
return ErrServerIsTransferring
}
return ErrServerIsInstalling
}
if s.IsTransferring() {
return ErrServerIsTransferring
lockId, _ := uuid.NewUUID()
log := s.Log().WithField("lock_id", lockId.String()).WithField("action", action)
cleanup := func() {
log.Info("releasing exclusive lock for power action")
s.powerLock.Release()
}
if s.IsRestoring() {
return ErrServerIsRestoring
}
if s.powerLock == nil {
s.powerLock = semaphore.NewWeighted(1)
var wait int
if len(waitSeconds) > 0 && waitSeconds[0] > 0 {
wait = waitSeconds[0]
}
log.WithField("wait_seconds", wait).Debug("acquiring power action lock for instance")
// Only attempt to acquire a lock on the process if this is not a termination event. We want to
// just allow those events to pass right through for good reason. If a server is currently trying
// to process a power action but has gotten stuck you still should be able to pass through the
@ -87,33 +161,38 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
if action != PowerActionTerminate {
// Determines if we should wait for the lock or not. If a value greater than 0 is passed
// into this function we will wait that long for a lock to be acquired.
if len(waitSeconds) > 0 && waitSeconds[0] != 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(waitSeconds[0]))
if wait > 0 {
ctx, cancel := context.WithTimeout(s.ctx, time.Second*time.Duration(wait))
defer cancel()
// Attempt to acquire a lock on the power action lock for up to 30 seconds. If more
// time than that passes an error will be propagated back up the chain and this
// request will be aborted.
if err := s.powerLock.Acquire(ctx, 1); err != nil {
return errors.WithMessage(err, "could not acquire lock on power state")
if err := s.powerLock.TryAcquire(ctx); err != nil {
return errors.Wrap(err, fmt.Sprintf("could not acquire lock on power action after %d seconds", wait))
}
} else {
// If no wait duration was provided we will attempt to immediately acquire the lock
// and bail out with a context deadline error if it is not acquired immediately.
if ok := s.powerLock.TryAcquire(1); !ok {
return errors.WithMessage(context.DeadlineExceeded, "could not acquire lock on power state")
if err := s.powerLock.Acquire(); err != nil {
return errors.Wrap(err, "failed to acquire exclusive lock for power actions")
}
}
// Release the lock once the process being requested has finished executing.
defer s.powerLock.Release(1)
log.Info("acquired exclusive lock on power actions, processing event...")
defer cleanup()
} else {
// Still try to acquire the lock if terminating, and it is available, just so that other power
// actions are blocked until it has completed. However, if it is unavailable we won't stop
// the entire process.
if ok := s.powerLock.TryAcquire(1); ok {
// If we managed to acquire the lock be sure to released it once this process is completed.
defer s.powerLock.Release(1)
// Still try to acquire the lock if terminating, and it is available, just so that
// other power actions are blocked until it has completed. However, if it cannot be
// acquired we won't stop the entire process.
//
// If we did successfully acquire the lock, make sure we release it once we're done
// executiong the power actions.
if err := s.powerLock.Acquire(); err == nil {
log.Info("acquired exclusive lock on power actions, processing event...")
defer cleanup()
} else {
log.Warn("failed to acquire exclusive lock, ignoring failure for termination event")
}
}

158
server/power_test.go Normal file
View File

@ -0,0 +1,158 @@
package server
import (
"context"
"testing"
"time"
"emperror.dev/errors"
. "github.com/franela/goblin"
)
func TestPower(t *testing.T) {
g := Goblin(t)
g.Describe("PowerLocker", func() {
var pl *powerLocker
g.BeforeEach(func() {
pl = newPowerLocker()
})
g.Describe("PowerLocker#IsLocked", func() {
g.It("should return false when the channel is empty", func() {
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(pl.IsLocked()).IsFalse()
})
g.It("should return true when the channel is at capacity", func() {
pl.ch <- true
g.Assert(pl.IsLocked()).IsTrue()
<-pl.ch
g.Assert(pl.IsLocked()).IsFalse()
// We don't care what the channel value is, just that there is
// something in it.
pl.ch <- false
g.Assert(pl.IsLocked()).IsTrue()
g.Assert(cap(pl.ch)).Equal(1)
})
})
g.Describe("PowerLocker#Acquire", func() {
g.It("should acquire a lock when channel is empty", func() {
err := pl.Acquire()
g.Assert(err).IsNil()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(1)
})
g.It("should return an error when the channel is full", func() {
pl.ch <- true
err := pl.Acquire()
g.Assert(err).IsNotNil()
g.Assert(errors.Is(err, ErrPowerLockerLocked)).IsTrue()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(1)
})
})
g.Describe("PowerLocker#TryAcquire", func() {
g.It("should acquire a lock when channel is empty", func() {
g.Timeout(time.Second)
err := pl.TryAcquire(context.Background())
g.Assert(err).IsNil()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(1)
g.Assert(pl.IsLocked()).IsTrue()
})
g.It("should block until context is canceled if channel is full", func() {
g.Timeout(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
pl.ch <- true
err := pl.TryAcquire(ctx)
g.Assert(err).IsNotNil()
g.Assert(errors.Is(err, context.DeadlineExceeded)).IsTrue()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(1)
g.Assert(pl.IsLocked()).IsTrue()
})
g.It("should block until lock can be acquired", func() {
g.Timeout(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
pl.Acquire()
go func() {
time.AfterFunc(time.Millisecond * 50, func() {
pl.Release()
})
}()
err := pl.TryAcquire(ctx)
g.Assert(err).IsNil()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(1)
g.Assert(pl.IsLocked()).IsTrue()
})
})
g.Describe("PowerLocker#Release", func() {
g.It("should release when channel is full", func() {
pl.Acquire()
g.Assert(pl.IsLocked()).IsTrue()
pl.Release()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(0)
g.Assert(pl.IsLocked()).IsFalse()
})
g.It("should release when channel is empty", func() {
g.Assert(pl.IsLocked()).IsFalse()
pl.Release()
g.Assert(cap(pl.ch)).Equal(1)
g.Assert(len(pl.ch)).Equal(0)
g.Assert(pl.IsLocked()).IsFalse()
})
})
g.Describe("PowerLocker#Destroy", func() {
g.It("should unlock and close the channel", func() {
pl.Acquire()
g.Assert(pl.IsLocked()).IsTrue()
pl.Destroy()
g.Assert(pl.IsLocked()).IsFalse()
defer func() {
r := recover()
g.Assert(r).IsNotNil()
g.Assert(r.(error).Error()).Equal("send on closed channel")
}()
pl.Acquire()
})
})
})
g.Describe("Server#ExecutingPowerAction", func() {
g.It("should return based on locker status", func() {
s := &Server{powerLock: newPowerLocker()}
g.Assert(s.ExecutingPowerAction()).IsFalse()
s.powerLock.Acquire()
g.Assert(s.ExecutingPowerAction()).IsTrue()
})
})
}

View File

@ -12,8 +12,6 @@ import (
"emperror.dev/errors"
"github.com/apex/log"
"github.com/creasty/defaults"
"golang.org/x/sync/semaphore"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events"
@ -32,7 +30,7 @@ type Server struct {
ctxCancel *context.CancelFunc
emitterLock sync.Mutex
powerLock *semaphore.Weighted
powerLock *powerLocker
throttleOnce sync.Once
// Maintains the configuration for the server. This is the data that gets returned by the Panel
@ -88,6 +86,7 @@ func New(client remote.Client) (*Server, error) {
installing: system.NewAtomicBool(false),
transferring: system.NewAtomicBool(false),
restoring: system.NewAtomicBool(false),
powerLock: newPowerLocker(),
sinks: map[SinkName]*sinkPool{
LogSink: newSinkPool(),
InstallSink: newSinkPool(),
@ -103,6 +102,17 @@ func New(client remote.Client) (*Server, error) {
return &s, nil
}
// CleanupForDestroy stops all running background tasks for this server that are
// using the context on the server struct. This will cancel any running install
// processes for the server as well.
func (s *Server) CleanupForDestroy() {
s.CtxCancel()
s.Events().Destroy()
s.DestroyAllSinks()
s.Websockets().CancelAll()
s.powerLock.Destroy()
}
// ID returns the UUID for the server instance.
func (s *Server) ID() string {
return s.Config().GetUuid()

View File

@ -16,7 +16,7 @@ func MutexLocked(m *sync.RWMutex) bool {
return state.Int()&1 == 1 || v.FieldByName("readerCount").Int() > 0
}
func Test(t *testing.T) {
func TestSink(t *testing.T) {
g := Goblin(t)
g.Describe("SinkPool#On", func() {