From c0523df6963732511f238c74df2e2de93de4f1ec Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Fri, 25 Dec 2020 11:21:09 -0800 Subject: [PATCH] Add a server context that gets canceled when a server is deleted --- cmd/root.go | 6 ++++ environment/docker/environment.go | 2 +- router/router_server.go | 25 ++++++-------- server/console.go | 54 ++++++------------------------- server/install.go | 21 +----------- server/loader.go | 20 +++--------- server/resources.go | 2 +- server/server.go | 42 +++++++++++++++++++++--- system/utils.go | 25 ++++++++++++-- 9 files changed, 93 insertions(+), 104 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index b6ffe35..83616d9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -356,6 +356,12 @@ func rootCmdRun(*cobra.Command, []string) { if err := s.ListenAndServe(); err != nil { log.WithField("error", err).Fatal("failed to configure HTTP server") } + + // Cancel the context on all of the running servers at this point, even though the + // program is just shutting down. + for _, s := range server.GetServers().All() { + s.CtxCancel() + } } // Execute calls cobra to handle cli commands diff --git a/environment/docker/environment.go b/environment/docker/environment.go index f481522..f4ea6ef 100644 --- a/environment/docker/environment.go +++ b/environment/docker/environment.go @@ -47,7 +47,7 @@ type Environment struct { emitter *events.EventBus // Tracks the environment state. - st *system.AtomicString + st system.AtomicString } // Creates a new base Docker environment. The ID passed through will be the ID that is used to diff --git a/router/router_server.go b/router/router_server.go index b7e1525..cfd682f 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -189,16 +189,18 @@ func postServerReinstall(c *gin.Context) { // Deletes a server from the wings daemon and dissociate it's objects. func deleteServer(c *gin.Context) { - s := GetServer(c.Param("server")) + s := ExtractServer(c) // Immediately suspend the server to prevent a user from attempting // to start it while this process is running. s.Config().SetSuspended(true) - // If the server is currently installing, abort it. - if s.IsInstalling() { - s.AbortInstallation() - } + // Stop all running background tasks for this server that are using the context on + // the server struct. This will cancel any running install processes for the server + // as well. + s.CtxCancel() + s.Events().Destroy() + s.Websockets().CancelAll() // Delete the server's archive if it exists. We intentionally don't return // here, if the archive fails to delete, the server can still be removed. @@ -206,11 +208,6 @@ func deleteServer(c *gin.Context) { s.Log().WithField("error", err).Warn("failed to delete server archive during deletion process") } - // Unsubscribe all of the event listeners. - s.Events().Destroy() - s.Throttler().StopTimer() - s.Websockets().CancelAll() - // Remove any pending remote file downloads for the server. for _, dl := range downloader.ByServer(s.Id()) { dl.Cancel() @@ -220,7 +217,8 @@ func deleteServer(c *gin.Context) { // forcibly terminate it before removing the container, so we do not need to handle // that here. if err := s.Environment.Destroy(); err != nil { - NewServerError(err, s).Abort(c) + WithError(c, err) + return } // Once the environment is terminated, remove the server files from the system. This is @@ -231,10 +229,7 @@ func deleteServer(c *gin.Context) { // so we don't want to block the HTTP call while waiting on this. go func(p string) { if err := os.RemoveAll(p); err != nil { - log.WithFields(log.Fields{ - "path": p, - "error": err, - }).Warn("failed to remove server files during deletion process") + log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process") } }(s.Filesystem().Path()) diff --git a/server/console.go b/server/console.go index 8fb19d3..9a43daf 100644 --- a/server/console.go +++ b/server/console.go @@ -61,51 +61,17 @@ func (ct *ConsoleThrottler) Throttled() bool { } // Starts a timer that runs in a seperate thread and will continually decrement the lines processed -// and number of activations, regardless of the current console message volume. -func (ct *ConsoleThrottler) StartTimer() { - ctx, cancel := context.WithCancel(context.Background()) +// and number of activations, regardless of the current console message volume. All of the timers +// are canceled if the context passed through is canceled. +func (ct *ConsoleThrottler) StartTimer(ctx context.Context) { + system.Every(ctx, time.Duration(int64(ct.LineResetInterval)) * time.Millisecond, func(_ time.Time) { + ct.isThrottled.Set(false) + atomic.StoreUint64(&ct.count, 0) + }) - reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond) - decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond) - - go func() { - for { - select { - case <-ctx.Done(): - reset.Stop() - return - case <-reset.C: - ct.isThrottled.Set(false) - atomic.StoreUint64(&ct.count, 0) - } - } - }() - - go func() { - for { - select { - case <-ctx.Done(): - decay.Stop() - return - case <-decay.C: - ct.markActivation(false) - } - } - }() - - ct.timerCancel = &cancel -} - -// Stops a running timer processes if one exists. This is only called when the server is deleted since -// we want this to always be running. If there is no process currently running nothing will really happen. -func (ct *ConsoleThrottler) StopTimer() { - ct.mu.Lock() - defer ct.mu.Unlock() - if ct.timerCancel != nil { - c := *ct.timerCancel - c() - ct.timerCancel = nil - } + system.Every(ctx, time.Duration(int64(ct.DecayInterval)) * time.Millisecond, func(_ time.Time) { + ct.markActivation(false) + }) } // Handles output from a server's console. This code ensures that a server is not outputting diff --git a/server/install.go b/server/install.go index 007b3fd..394eb69 100644 --- a/server/install.go +++ b/server/install.go @@ -127,14 +127,11 @@ func NewInstallationProcess(s *Server, script *api.InstallationScript) (*Install Server: s, } - ctx, cancel := context.WithCancel(context.Background()) - s.installer.cancel = &cancel - if c, err := environment.DockerClient(); err != nil { return nil, err } else { proc.client = c - proc.context = ctx + proc.context = s.Context() } return proc, nil @@ -171,21 +168,6 @@ func (s *Server) IsInstalling() bool { return true } -// Aborts the server installation process by calling the cancel function on the installer -// context. -func (s *Server) AbortInstallation() { - if !s.IsInstalling() { - return - } - - if s.installer.cancel != nil { - cancel := *s.installer.cancel - - s.Log().Warn("aborting running installation process") - cancel() - } -} - // Removes the installer container for the server. func (ip *InstallationProcess) RemoveContainer() { err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ @@ -215,7 +197,6 @@ func (ip *InstallationProcess) Run() error { defer func() { ip.Server.Log().Debug("releasing installation process lock") ip.Server.installer.sem.Release(1) - ip.Server.installer.cancel = nil }() if err := ip.BeforeExecute(); err != nil { diff --git a/server/loader.go b/server/loader.go index b05a379..101987d 100644 --- a/server/loader.go +++ b/server/loader.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "github.com/apex/log" - "github.com/creasty/defaults" "github.com/gammazero/workerpool" "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" @@ -87,25 +86,14 @@ func LoadDirectory() error { // given struct using a YAML marshaler. This will also configure the given environment // for a server. func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) { - cfg := Configuration{} - if err := defaults.Set(&cfg); err != nil { - return nil, errors.WithMessage(err, "failed to set struct defaults for server configuration") + s, err := New() + if err != nil { + return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct") } - - s := new(Server) - if err := defaults.Set(s); err != nil { - return nil, errors.WithMessage(err, "failed to set struct defaults for server") - } - - s.cfg = cfg if err := s.UpdateDataStructure(data.Settings); err != nil { return nil, err } - s.resources = ResourceUsage{} - defaults.Set(&s.resources) - s.resources.State.Store(environment.ProcessOfflineState) - s.Archiver = Archiver{Server: s} s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace()) @@ -128,7 +116,7 @@ func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) { } else { s.Environment = env s.StartEventListeners() - s.Throttler().StartTimer() + s.Throttler().StartTimer(s.Context()) } // Forces the configuration to be synced with the panel. diff --git a/server/resources.go b/server/resources.go index 45c4292..9bc047c 100644 --- a/server/resources.go +++ b/server/resources.go @@ -17,7 +17,7 @@ type ResourceUsage struct { environment.Stats // The current server status. - State *system.AtomicString `json:"state" default:"{}"` + State system.AtomicString `json:"state"` // The current disk space being used by the server. This value is not guaranteed to be accurate // at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a diff --git a/server/server.go b/server/server.go index 10945a2..52cb03f 100644 --- a/server/server.go +++ b/server/server.go @@ -5,6 +5,7 @@ import ( "emperror.dev/errors" "fmt" "github.com/apex/log" + "github.com/creasty/defaults" "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/environment" @@ -21,6 +22,9 @@ type Server struct { // Internal mutex used to block actions that need to occur sequentially, such as // writing the configuration to the disk. sync.RWMutex + ctx context.Context + ctxCancel *context.CancelFunc + emitterLock sync.Mutex powerLock *semaphore.Weighted throttleLock sync.Mutex @@ -61,20 +65,50 @@ type Server struct { } type InstallerDetails struct { - // The cancel function for the installer. This will be a non-nil value while there - // is an installer running for the server. - cancel *context.CancelFunc - // Installer lock. You should obtain an exclusive lock on this context while running // the installation process and release it when finished. sem *semaphore.Weighted } +// Returns a new server instance with a context and all of the default values set on +// the instance. +func New() (*Server, error) { + ctx, cancel := context.WithCancel(context.Background()) + s := Server{ + ctx: ctx, + ctxCancel: &cancel, + } + if err := defaults.Set(&s); err != nil { + return nil, err + } + if err := defaults.Set(&s.cfg); err != nil { + return nil, err + } + s.resources.State.Store(environment.ProcessOfflineState) + return &s, nil +} + // Returns the UUID for the server instance. func (s *Server) Id() string { return s.Config().GetUuid() } +// Cancels the context assigned to this server instance. Assuming background tasks +// are using this server's context for things, all of the background tasks will be +// stopped as a result. +func (s *Server) CtxCancel() { + if s.ctxCancel != nil { + (*s.ctxCancel)() + } +} + +// Returns a context instance for the server. This should be used to allow background +// tasks to be canceled if the server is removed. It will only be canceled when the +// application is stopped or if the server gets deleted. +func (s *Server) Context() context.Context { + return s.ctx +} + // Returns all of the environment variables that should be assigned to a running // server instance. func (s *Server) GetEnvironmentVariables() []string { diff --git a/system/utils.go b/system/utils.go index 5862a0c..2a7fe67 100644 --- a/system/utils.go +++ b/system/utils.go @@ -1,10 +1,28 @@ package system import ( + "context" "sync" "sync/atomic" + "time" ) +// Runs a given work function every "d" duration until the provided context is canceled. +func Every(ctx context.Context, d time.Duration, work func(t time.Time)) { + ticker := time.NewTicker(d) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case t := <-ticker.C: + work(t) + } + } + }() +} + type AtomicBool struct { flag uint32 } @@ -30,8 +48,8 @@ type AtomicString struct { mu sync.RWMutex } -func NewAtomicString(v string) *AtomicString { - return &AtomicString{v: v} +func NewAtomicString(v string) AtomicString { + return AtomicString{v: v} } // Stores the string value passed atomically. @@ -53,6 +71,7 @@ func (as *AtomicString) UnmarshalText(b []byte) error { return nil } -func (as *AtomicString) MarshalText() ([]byte, error) { +//goland:noinspection GoVetCopyLock +func (as AtomicString) MarshalText() ([]byte, error) { return []byte(as.Load()), nil }