Add a server context that gets canceled when a server is deleted

This commit is contained in:
Dane Everitt 2020-12-25 11:21:09 -08:00
parent f7f5623c71
commit c0523df696
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
9 changed files with 93 additions and 104 deletions

View File

@ -356,6 +356,12 @@ func rootCmdRun(*cobra.Command, []string) {
if err := s.ListenAndServe(); err != nil { if err := s.ListenAndServe(); err != nil {
log.WithField("error", err).Fatal("failed to configure HTTP server") log.WithField("error", err).Fatal("failed to configure HTTP server")
} }
// Cancel the context on all of the running servers at this point, even though the
// program is just shutting down.
for _, s := range server.GetServers().All() {
s.CtxCancel()
}
} }
// Execute calls cobra to handle cli commands // Execute calls cobra to handle cli commands

View File

@ -47,7 +47,7 @@ type Environment struct {
emitter *events.EventBus emitter *events.EventBus
// Tracks the environment state. // Tracks the environment state.
st *system.AtomicString st system.AtomicString
} }
// Creates a new base Docker environment. The ID passed through will be the ID that is used to // Creates a new base Docker environment. The ID passed through will be the ID that is used to

View File

@ -189,16 +189,18 @@ func postServerReinstall(c *gin.Context) {
// Deletes a server from the wings daemon and dissociate it's objects. // Deletes a server from the wings daemon and dissociate it's objects.
func deleteServer(c *gin.Context) { func deleteServer(c *gin.Context) {
s := GetServer(c.Param("server")) s := ExtractServer(c)
// Immediately suspend the server to prevent a user from attempting // Immediately suspend the server to prevent a user from attempting
// to start it while this process is running. // to start it while this process is running.
s.Config().SetSuspended(true) s.Config().SetSuspended(true)
// If the server is currently installing, abort it. // Stop all running background tasks for this server that are using the context on
if s.IsInstalling() { // the server struct. This will cancel any running install processes for the server
s.AbortInstallation() // as well.
} s.CtxCancel()
s.Events().Destroy()
s.Websockets().CancelAll()
// Delete the server's archive if it exists. We intentionally don't return // Delete the server's archive if it exists. We intentionally don't return
// here, if the archive fails to delete, the server can still be removed. // here, if the archive fails to delete, the server can still be removed.
@ -206,11 +208,6 @@ func deleteServer(c *gin.Context) {
s.Log().WithField("error", err).Warn("failed to delete server archive during deletion process") s.Log().WithField("error", err).Warn("failed to delete server archive during deletion process")
} }
// Unsubscribe all of the event listeners.
s.Events().Destroy()
s.Throttler().StopTimer()
s.Websockets().CancelAll()
// Remove any pending remote file downloads for the server. // Remove any pending remote file downloads for the server.
for _, dl := range downloader.ByServer(s.Id()) { for _, dl := range downloader.ByServer(s.Id()) {
dl.Cancel() dl.Cancel()
@ -220,7 +217,8 @@ func deleteServer(c *gin.Context) {
// forcibly terminate it before removing the container, so we do not need to handle // forcibly terminate it before removing the container, so we do not need to handle
// that here. // that here.
if err := s.Environment.Destroy(); err != nil { if err := s.Environment.Destroy(); err != nil {
NewServerError(err, s).Abort(c) WithError(c, err)
return
} }
// Once the environment is terminated, remove the server files from the system. This is // Once the environment is terminated, remove the server files from the system. This is
@ -231,10 +229,7 @@ func deleteServer(c *gin.Context) {
// so we don't want to block the HTTP call while waiting on this. // so we don't want to block the HTTP call while waiting on this.
go func(p string) { go func(p string) {
if err := os.RemoveAll(p); err != nil { if err := os.RemoveAll(p); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process")
"path": p,
"error": err,
}).Warn("failed to remove server files during deletion process")
} }
}(s.Filesystem().Path()) }(s.Filesystem().Path())

View File

@ -61,51 +61,17 @@ func (ct *ConsoleThrottler) Throttled() bool {
} }
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed // Starts a timer that runs in a seperate thread and will continually decrement the lines processed
// and number of activations, regardless of the current console message volume. // and number of activations, regardless of the current console message volume. All of the timers
func (ct *ConsoleThrottler) StartTimer() { // are canceled if the context passed through is canceled.
ctx, cancel := context.WithCancel(context.Background()) func (ct *ConsoleThrottler) StartTimer(ctx context.Context) {
system.Every(ctx, time.Duration(int64(ct.LineResetInterval)) * time.Millisecond, func(_ time.Time) {
reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond)
decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond)
go func() {
for {
select {
case <-ctx.Done():
reset.Stop()
return
case <-reset.C:
ct.isThrottled.Set(false) ct.isThrottled.Set(false)
atomic.StoreUint64(&ct.count, 0) atomic.StoreUint64(&ct.count, 0)
} })
}
}()
go func() { system.Every(ctx, time.Duration(int64(ct.DecayInterval)) * time.Millisecond, func(_ time.Time) {
for {
select {
case <-ctx.Done():
decay.Stop()
return
case <-decay.C:
ct.markActivation(false) ct.markActivation(false)
} })
}
}()
ct.timerCancel = &cancel
}
// Stops a running timer processes if one exists. This is only called when the server is deleted since
// we want this to always be running. If there is no process currently running nothing will really happen.
func (ct *ConsoleThrottler) StopTimer() {
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.timerCancel != nil {
c := *ct.timerCancel
c()
ct.timerCancel = nil
}
} }
// Handles output from a server's console. This code ensures that a server is not outputting // Handles output from a server's console. This code ensures that a server is not outputting

View File

@ -127,14 +127,11 @@ func NewInstallationProcess(s *Server, script *api.InstallationScript) (*Install
Server: s, Server: s,
} }
ctx, cancel := context.WithCancel(context.Background())
s.installer.cancel = &cancel
if c, err := environment.DockerClient(); err != nil { if c, err := environment.DockerClient(); err != nil {
return nil, err return nil, err
} else { } else {
proc.client = c proc.client = c
proc.context = ctx proc.context = s.Context()
} }
return proc, nil return proc, nil
@ -171,21 +168,6 @@ func (s *Server) IsInstalling() bool {
return true return true
} }
// Aborts the server installation process by calling the cancel function on the installer
// context.
func (s *Server) AbortInstallation() {
if !s.IsInstalling() {
return
}
if s.installer.cancel != nil {
cancel := *s.installer.cancel
s.Log().Warn("aborting running installation process")
cancel()
}
}
// Removes the installer container for the server. // Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() { func (ip *InstallationProcess) RemoveContainer() {
err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{
@ -215,7 +197,6 @@ func (ip *InstallationProcess) Run() error {
defer func() { defer func() {
ip.Server.Log().Debug("releasing installation process lock") ip.Server.Log().Debug("releasing installation process lock")
ip.Server.installer.sem.Release(1) ip.Server.installer.sem.Release(1)
ip.Server.installer.cancel = nil
}() }()
if err := ip.BeforeExecute(); err != nil { if err := ip.BeforeExecute(); err != nil {

View File

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/apex/log" "github.com/apex/log"
"github.com/creasty/defaults"
"github.com/gammazero/workerpool" "github.com/gammazero/workerpool"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
@ -87,25 +86,14 @@ func LoadDirectory() error {
// given struct using a YAML marshaler. This will also configure the given environment // given struct using a YAML marshaler. This will also configure the given environment
// for a server. // for a server.
func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) { func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) {
cfg := Configuration{} s, err := New()
if err := defaults.Set(&cfg); err != nil { if err != nil {
return nil, errors.WithMessage(err, "failed to set struct defaults for server configuration") return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct")
} }
s := new(Server)
if err := defaults.Set(s); err != nil {
return nil, errors.WithMessage(err, "failed to set struct defaults for server")
}
s.cfg = cfg
if err := s.UpdateDataStructure(data.Settings); err != nil { if err := s.UpdateDataStructure(data.Settings); err != nil {
return nil, err return nil, err
} }
s.resources = ResourceUsage{}
defaults.Set(&s.resources)
s.resources.State.Store(environment.ProcessOfflineState)
s.Archiver = Archiver{Server: s} s.Archiver = Archiver{Server: s}
s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace()) s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace())
@ -128,7 +116,7 @@ func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) {
} else { } else {
s.Environment = env s.Environment = env
s.StartEventListeners() s.StartEventListeners()
s.Throttler().StartTimer() s.Throttler().StartTimer(s.Context())
} }
// Forces the configuration to be synced with the panel. // Forces the configuration to be synced with the panel.

View File

@ -17,7 +17,7 @@ type ResourceUsage struct {
environment.Stats environment.Stats
// The current server status. // The current server status.
State *system.AtomicString `json:"state" default:"{}"` State system.AtomicString `json:"state"`
// The current disk space being used by the server. This value is not guaranteed to be accurate // The current disk space being used by the server. This value is not guaranteed to be accurate
// at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a // at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a

View File

@ -5,6 +5,7 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
"fmt" "fmt"
"github.com/apex/log" "github.com/apex/log"
"github.com/creasty/defaults"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
@ -21,6 +22,9 @@ type Server struct {
// Internal mutex used to block actions that need to occur sequentially, such as // Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk. // writing the configuration to the disk.
sync.RWMutex sync.RWMutex
ctx context.Context
ctxCancel *context.CancelFunc
emitterLock sync.Mutex emitterLock sync.Mutex
powerLock *semaphore.Weighted powerLock *semaphore.Weighted
throttleLock sync.Mutex throttleLock sync.Mutex
@ -61,20 +65,50 @@ type Server struct {
} }
type InstallerDetails struct { type InstallerDetails struct {
// The cancel function for the installer. This will be a non-nil value while there
// is an installer running for the server.
cancel *context.CancelFunc
// Installer lock. You should obtain an exclusive lock on this context while running // Installer lock. You should obtain an exclusive lock on this context while running
// the installation process and release it when finished. // the installation process and release it when finished.
sem *semaphore.Weighted sem *semaphore.Weighted
} }
// Returns a new server instance with a context and all of the default values set on
// the instance.
func New() (*Server, error) {
ctx, cancel := context.WithCancel(context.Background())
s := Server{
ctx: ctx,
ctxCancel: &cancel,
}
if err := defaults.Set(&s); err != nil {
return nil, err
}
if err := defaults.Set(&s.cfg); err != nil {
return nil, err
}
s.resources.State.Store(environment.ProcessOfflineState)
return &s, nil
}
// Returns the UUID for the server instance. // Returns the UUID for the server instance.
func (s *Server) Id() string { func (s *Server) Id() string {
return s.Config().GetUuid() return s.Config().GetUuid()
} }
// Cancels the context assigned to this server instance. Assuming background tasks
// are using this server's context for things, all of the background tasks will be
// stopped as a result.
func (s *Server) CtxCancel() {
if s.ctxCancel != nil {
(*s.ctxCancel)()
}
}
// Returns a context instance for the server. This should be used to allow background
// tasks to be canceled if the server is removed. It will only be canceled when the
// application is stopped or if the server gets deleted.
func (s *Server) Context() context.Context {
return s.ctx
}
// Returns all of the environment variables that should be assigned to a running // Returns all of the environment variables that should be assigned to a running
// server instance. // server instance.
func (s *Server) GetEnvironmentVariables() []string { func (s *Server) GetEnvironmentVariables() []string {

View File

@ -1,10 +1,28 @@
package system package system
import ( import (
"context"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
// Runs a given work function every "d" duration until the provided context is canceled.
func Every(ctx context.Context, d time.Duration, work func(t time.Time)) {
ticker := time.NewTicker(d)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case t := <-ticker.C:
work(t)
}
}
}()
}
type AtomicBool struct { type AtomicBool struct {
flag uint32 flag uint32
} }
@ -30,8 +48,8 @@ type AtomicString struct {
mu sync.RWMutex mu sync.RWMutex
} }
func NewAtomicString(v string) *AtomicString { func NewAtomicString(v string) AtomicString {
return &AtomicString{v: v} return AtomicString{v: v}
} }
// Stores the string value passed atomically. // Stores the string value passed atomically.
@ -53,6 +71,7 @@ func (as *AtomicString) UnmarshalText(b []byte) error {
return nil return nil
} }
func (as *AtomicString) MarshalText() ([]byte, error) { //goland:noinspection GoVetCopyLock
func (as AtomicString) MarshalText() ([]byte, error) {
return []byte(as.Load()), nil return []byte(as.Load()), nil
} }