From 59c30c28427c3f6ed370ea29c00defafc4c5f5d2 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Fri, 25 Dec 2020 17:04:18 -0800 Subject: [PATCH] Fix use of atomics in codebase --- environment/docker/environment.go | 2 +- server/console.go | 23 ++++---- server/filesystem/disk_space.go | 6 +- server/filesystem/filesystem.go | 3 +- server/install.go | 95 ++++++++++++------------------- server/resources.go | 2 +- server/server.go | 21 +++---- system/utils.go | 67 ++++++++++++++++------ 8 files changed, 109 insertions(+), 110 deletions(-) diff --git a/environment/docker/environment.go b/environment/docker/environment.go index f4ea6ef..f481522 100644 --- a/environment/docker/environment.go +++ b/environment/docker/environment.go @@ -47,7 +47,7 @@ type Environment struct { emitter *events.EventBus // Tracks the environment state. - st system.AtomicString + st *system.AtomicString } // Creates a new base Docker environment. The ID passed through will be the ID that is used to diff --git a/server/console.go b/server/console.go index 9a43daf..2e6d7d7 100644 --- a/server/console.go +++ b/server/console.go @@ -26,7 +26,7 @@ type ConsoleThrottler struct { // Wether or not the console output is being throttled. It is up to calling code to // determine what to do if it is. - isThrottled system.AtomicBool + isThrottled *system.AtomicBool // The total number of lines processed so far during the given time period. timerCancel *context.CancelFunc @@ -36,7 +36,7 @@ type ConsoleThrottler struct { func (ct *ConsoleThrottler) Reset() { atomic.StoreUint64(&ct.count, 0) atomic.StoreUint64(&ct.activations, 0) - ct.isThrottled.Set(false) + ct.isThrottled.Store(false) } // Triggers an activation for a server. You can also decrement the number of activations @@ -57,19 +57,19 @@ func (ct *ConsoleThrottler) markActivation(increment bool) uint64 { // Determines if the console is currently being throttled. Calls to this function can be used to // determine if output should be funneled along to the websocket processes. func (ct *ConsoleThrottler) Throttled() bool { - return ct.isThrottled.Get() + return ct.isThrottled.Load() } // Starts a timer that runs in a seperate thread and will continually decrement the lines processed // and number of activations, regardless of the current console message volume. All of the timers // are canceled if the context passed through is canceled. func (ct *ConsoleThrottler) StartTimer(ctx context.Context) { - system.Every(ctx, time.Duration(int64(ct.LineResetInterval)) * time.Millisecond, func(_ time.Time) { - ct.isThrottled.Set(false) + system.Every(ctx, time.Duration(int64(ct.LineResetInterval))*time.Millisecond, func(_ time.Time) { + ct.isThrottled.Store(false) atomic.StoreUint64(&ct.count, 0) }) - system.Every(ctx, time.Duration(int64(ct.DecayInterval)) * time.Millisecond, func(_ time.Time) { + system.Every(ctx, time.Duration(int64(ct.DecayInterval))*time.Millisecond, func(_ time.Time) { ct.markActivation(false) }) } @@ -99,7 +99,7 @@ func (ct *ConsoleThrottler) Increment(onTrigger func()) error { // activation. Once the throttle is triggered and has passed the kill at value we will trigger a server // stop automatically. if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() { - ct.isThrottled.Set(true) + ct.isThrottled.Store(true) if ct.markActivation(true) >= ct.MaximumTriggerCount { return ErrTooMuchConsoleData } @@ -112,15 +112,12 @@ func (ct *ConsoleThrottler) Increment(onTrigger func()) error { // Returns the throttler instance for the server or creates a new one. func (s *Server) Throttler() *ConsoleThrottler { - s.throttleLock.Lock() - defer s.throttleLock.Unlock() - - if s.throttler == nil { + s.throttleOnce.Do(func() { s.throttler = &ConsoleThrottler{ + isThrottled: system.NewAtomicBool(false), ConsoleThrottles: config.Get().Throttles, } - } - + }) return s.throttler } diff --git a/server/filesystem/disk_space.go b/server/filesystem/disk_space.go index d1f6286..3e5619c 100644 --- a/server/filesystem/disk_space.go +++ b/server/filesystem/disk_space.go @@ -107,7 +107,7 @@ func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) { // value. This is a blocking operation to the calling process. if !allowStaleValue { return fs.updateCachedDiskUsage() - } else if !fs.lookupInProgress.Get() { + } else if !fs.lookupInProgress.Load() { // Otherwise, if we allow a stale value and there isn't a valid item in the cache and we aren't // currently performing a lookup, just do the disk usage calculation in the background. go func(fs *Filesystem) { @@ -133,8 +133,8 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) { // Signal that we're currently updating the disk size so that other calls to the disk checking // functions can determine if they should queue up additional calls to this function. Ensure that // we always set this back to "false" when this process is done executing. - fs.lookupInProgress.Set(true) - defer fs.lookupInProgress.Set(false) + fs.lookupInProgress.Store(true) + defer fs.lookupInProgress.Store(false) // If there is no size its either because there is no data (in which case running this function // will have effectively no impact), or there is nothing in the cache, in which case we need to diff --git a/server/filesystem/filesystem.go b/server/filesystem/filesystem.go index 01c8361..c814b20 100644 --- a/server/filesystem/filesystem.go +++ b/server/filesystem/filesystem.go @@ -22,7 +22,7 @@ import ( type Filesystem struct { mu sync.RWMutex lastLookupTime *usageLookupTime - lookupInProgress system.AtomicBool + lookupInProgress *system.AtomicBool diskUsed int64 diskCheckInterval time.Duration @@ -42,6 +42,7 @@ func New(root string, size int64) *Filesystem { diskLimit: size, diskCheckInterval: time.Duration(config.Get().System.DiskCheckInterval), lastLookupTime: &usageLookupTime{}, + lookupInProgress: system.NewAtomicBool(false), } } diff --git a/server/install.go b/server/install.go index 78e9799..2088233 100644 --- a/server/install.go +++ b/server/install.go @@ -13,14 +13,13 @@ import ( "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/environment" - "golang.org/x/sync/semaphore" + "github.com/pterodactyl/wings/system" "html/template" "io" "os" "path/filepath" "strconv" "strings" - "time" ) // Executes the installation stack for a server process. Bubbles any errors up to the calling @@ -137,56 +136,30 @@ func NewInstallationProcess(s *Server, script *api.InstallationScript) (*Install return proc, nil } -// Try to obtain an exclusive lock on the installation process for the server. Waits up to 10 -// seconds before aborting with a context timeout. -func (s *Server) acquireInstallationLock() error { - if s.installer.sem == nil { - s.installer.sem = semaphore.NewWeighted(1) - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - return s.installer.sem.Acquire(ctx, 1) -} - // Determines if the server is actively running the installation process by checking the status -// of the semaphore lock. +// of the installer lock. func (s *Server) IsInstalling() bool { - if s.installer.sem == nil { - return false - } - - if s.installer.sem.TryAcquire(1) { - // If we made it into this block it means we were able to obtain an exclusive lock - // on the semaphore. In that case, go ahead and release that lock immediately, and - // return false. - s.installer.sem.Release(1) - - return false - } - - return true + return s.installing.Load() } func (s *Server) IsTransferring() bool { - return s.transferring.Get() + return s.transferring.Load() } func (s *Server) SetTransferring(state bool) { - s.transferring.Set(state) + s.transferring.Store(state) } // Removes the installer container for the server. -func (ip *InstallationProcess) RemoveContainer() { +func (ip *InstallationProcess) RemoveContainer() error { err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ RemoveVolumes: true, Force: true, }) - if err != nil && !client.IsErrNotFound(err) { - ip.Server.Log().WithField("error", err).Warn("failed to delete server install container") + return err } + return nil } // Runs the installation process, this is done as in a background thread. This will configure @@ -196,8 +169,8 @@ func (ip *InstallationProcess) RemoveContainer() { // log in the server's configuration directory. func (ip *InstallationProcess) Run() error { ip.Server.Log().Debug("acquiring installation process lock") - if err := ip.Server.acquireInstallationLock(); err != nil { - return err + if !ip.Server.installing.SwapIf(true) { + return errors.New("install: cannot obtain installation lock") } // We now have an exclusive lock on this installation process. Ensure that whenever this @@ -205,7 +178,7 @@ func (ip *InstallationProcess) Run() error { // without encountering a wait timeout. defer func() { ip.Server.Log().Debug("releasing installation process lock") - ip.Server.installer.sem.Release(1) + ip.Server.installing.Store(false) }() if err := ip.BeforeExecute(); err != nil { @@ -215,7 +188,6 @@ func (ip *InstallationProcess) Run() error { cid, err := ip.Execute() if err != nil { ip.RemoveContainer() - return err } @@ -342,22 +314,12 @@ func (ip *InstallationProcess) BeforeExecute() error { if err := ip.writeScriptToDisk(); err != nil { return errors.WithMessage(err, "failed to write installation script to disk") } - if err := ip.pullInstallationImage(); err != nil { return errors.WithMessage(err, "failed to pull updated installation container image for server") } - - opts := types.ContainerRemoveOptions{ - RemoveVolumes: true, - Force: true, + if err := ip.RemoveContainer(); err != nil { + return errors.WithMessage(err, "failed to remove existing install container for server") } - - if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil { - if !client.IsErrNotFound(err) { - return errors.WithMessage(err, "failed to remove existing install container for server") - } - } - return nil } @@ -430,6 +392,12 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error { // Executes the installation process inside a specially created docker container. func (ip *InstallationProcess) Execute() (string, error) { + // Create a child context that is canceled once this function is done running. This + // will also be canceled if the parent context (from the Server struct) is canceled + // which occurs if the server is deleted. + ctx, cancel := context.WithCancel(ip.context) + defer cancel() + conf := &container.Config{ Hostname: "installer", AttachStdout: true, @@ -488,28 +456,35 @@ func (ip *InstallationProcess) Execute() (string, error) { } }() - r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Id()+"_installer") + r, err := ip.client.ContainerCreate(ctx, conf, hostConf, nil, ip.Server.Id()+"_installer") if err != nil { return "", err } ip.Server.Log().WithField("container_id", r.ID).Info("running installation script for server in container") - if err := ip.client.ContainerStart(ip.context, r.ID, types.ContainerStartOptions{}); err != nil { + if err := ip.client.ContainerStart(ctx, r.ID, types.ContainerStartOptions{}); err != nil { return "", err } + // Process the install event in the background by listening to the stream output until the + // container has stopped, at which point we'll disconnect from it. + // + // If there is an error during the streaming output just report it and do nothing else, the + // install can still run, the console just won't have any output. go func(id string) { ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") - if err := ip.StreamOutput(id); err != nil { - ip.Server.Log().WithField("error", err).Error("error while handling output stream for server install process") + if err := ip.StreamOutput(ctx, id); err != nil { + ip.Server.Log().WithField("error", err).Warn("error connecting to server install stream output") } - ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.") }(r.ID) - sChan, eChan := ip.client.ContainerWait(ip.context, r.ID, container.WaitConditionNotRunning) + sChan, eChan := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning) select { case err := <-eChan: - if err != nil { + // Once the container has stopped running we can mark the install process as being completed. + if err == nil { + ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.") + } else { return "", err } case <-sChan: @@ -521,8 +496,8 @@ func (ip *InstallationProcess) Execute() (string, error) { // Streams the output of the installation process to a log file in the server configuration // directory, as well as to a websocket listener so that the process can be viewed in // the panel by administrators. -func (ip *InstallationProcess) StreamOutput(id string) error { - reader, err := ip.client.ContainerLogs(ip.context, id, types.ContainerLogsOptions{ +func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error { + reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{ ShowStdout: true, ShowStderr: true, Follow: true, diff --git a/server/resources.go b/server/resources.go index 9bc047c..8f873e9 100644 --- a/server/resources.go +++ b/server/resources.go @@ -17,7 +17,7 @@ type ResourceUsage struct { environment.Stats // The current server status. - State system.AtomicString `json:"state"` + State *system.AtomicString `json:"state"` // The current disk space being used by the server. This value is not guaranteed to be accurate // at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a diff --git a/server/server.go b/server/server.go index 4d9c2f9..70efc1e 100644 --- a/server/server.go +++ b/server/server.go @@ -28,7 +28,7 @@ type Server struct { emitterLock sync.Mutex powerLock *semaphore.Weighted - throttleLock sync.Mutex + throttleOnce sync.Once // Maintains the configuration for the server. This is the data that gets returned by the Panel // such as build settings and container images. @@ -55,9 +55,8 @@ type Server struct { // two installer processes at the same time. This also allows us to cancel a running // installation process, for example when a server is deleted from the panel while the // installer process is still running. - installer InstallerDetails - - transferring system.AtomicBool + installing *system.AtomicBool + transferring *system.AtomicBool // The console throttler instance used to control outputs. throttler *ConsoleThrottler @@ -67,19 +66,15 @@ type Server struct { wsBagLocker sync.Mutex } -type InstallerDetails struct { - // Installer lock. You should obtain an exclusive lock on this context while running - // the installation process and release it when finished. - sem *semaphore.Weighted -} - // Returns a new server instance with a context and all of the default values set on // the instance. func New() (*Server, error) { ctx, cancel := context.WithCancel(context.Background()) s := Server{ - ctx: ctx, - ctxCancel: &cancel, + ctx: ctx, + ctxCancel: &cancel, + installing: system.NewAtomicBool(false), + transferring: system.NewAtomicBool(false), } if err := defaults.Set(&s); err != nil { return nil, err @@ -87,7 +82,7 @@ func New() (*Server, error) { if err := defaults.Set(&s.cfg); err != nil { return nil, err } - s.resources.State.Store(environment.ProcessOfflineState) + s.resources.State = system.NewAtomicString(environment.ProcessOfflineState) return &s, nil } diff --git a/system/utils.go b/system/utils.go index 5e97735..c550882 100644 --- a/system/utils.go +++ b/system/utils.go @@ -1,10 +1,14 @@ package system import ( + "bufio" + "bytes" "context" + "encoding/json" "fmt" + "io" + "strings" "sync" - "sync/atomic" "time" ) @@ -37,20 +41,47 @@ func FormatBytes(b int64) string { } type AtomicBool struct { - flag uint32 + v bool + mu sync.RWMutex } -func (ab *AtomicBool) Set(v bool) { - i := 0 - if v { - i = 1 +func NewAtomicBool(v bool) *AtomicBool { + return &AtomicBool{v: v} +} + +func (ab *AtomicBool) Store(v bool) { + ab.mu.Lock() + ab.v = v + ab.mu.Unlock() +} + +// Stores the value "v" if the current value stored in the AtomicBool is the opposite +// boolean value. If successfully swapped, the response is "true", otherwise "false" +// is returned. +func (ab *AtomicBool) SwapIf(v bool) bool { + ab.mu.Lock() + defer ab.mu.Unlock() + if ab.v != v { + ab.v = v + return true } - - atomic.StoreUint32(&ab.flag, uint32(i)) + return false } -func (ab *AtomicBool) Get() bool { - return atomic.LoadUint32(&ab.flag) == 1 +func (ab *AtomicBool) Load() bool { + ab.mu.RLock() + defer ab.mu.RUnlock() + return ab.v +} + +func (ab *AtomicBool) UnmarshalJSON(b []byte) error { + ab.mu.Lock() + defer ab.mu.Unlock() + return json.Unmarshal(b, &ab.v) +} + +func (ab *AtomicBool) MarshalJSON() ([]byte, error) { + return json.Marshal(ab.Load()) } // AtomicString allows for reading/writing to a given struct field without having to worry @@ -61,8 +92,8 @@ type AtomicString struct { mu sync.RWMutex } -func NewAtomicString(v string) AtomicString { - return AtomicString{v: v} +func NewAtomicString(v string) *AtomicString { + return &AtomicString{v: v} } // Stores the string value passed atomically. @@ -79,12 +110,12 @@ func (as *AtomicString) Load() string { return as.v } -func (as *AtomicString) UnmarshalText(b []byte) error { - as.Store(string(b)) - return nil +func (as *AtomicString) UnmarshalJSON(b []byte) error { + as.mu.Lock() + defer as.mu.Unlock() + return json.Unmarshal(b, &as.v) } -//goland:noinspection GoVetCopyLock -func (as AtomicString) MarshalText() ([]byte, error) { - return []byte(as.Load()), nil +func (as *AtomicString) MarshalJSON() ([]byte, error) { + return json.Marshal(as.Load()) }