Fix use of atomics in codebase

This commit is contained in:
Dane Everitt 2020-12-25 17:04:18 -08:00
parent 3842f054a5
commit 59c30c2842
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
8 changed files with 109 additions and 110 deletions

View File

@ -47,7 +47,7 @@ type Environment struct {
emitter *events.EventBus emitter *events.EventBus
// Tracks the environment state. // Tracks the environment state.
st system.AtomicString st *system.AtomicString
} }
// Creates a new base Docker environment. The ID passed through will be the ID that is used to // Creates a new base Docker environment. The ID passed through will be the ID that is used to

View File

@ -26,7 +26,7 @@ type ConsoleThrottler struct {
// Wether or not the console output is being throttled. It is up to calling code to // Wether or not the console output is being throttled. It is up to calling code to
// determine what to do if it is. // determine what to do if it is.
isThrottled system.AtomicBool isThrottled *system.AtomicBool
// The total number of lines processed so far during the given time period. // The total number of lines processed so far during the given time period.
timerCancel *context.CancelFunc timerCancel *context.CancelFunc
@ -36,7 +36,7 @@ type ConsoleThrottler struct {
func (ct *ConsoleThrottler) Reset() { func (ct *ConsoleThrottler) Reset() {
atomic.StoreUint64(&ct.count, 0) atomic.StoreUint64(&ct.count, 0)
atomic.StoreUint64(&ct.activations, 0) atomic.StoreUint64(&ct.activations, 0)
ct.isThrottled.Set(false) ct.isThrottled.Store(false)
} }
// Triggers an activation for a server. You can also decrement the number of activations // Triggers an activation for a server. You can also decrement the number of activations
@ -57,7 +57,7 @@ func (ct *ConsoleThrottler) markActivation(increment bool) uint64 {
// Determines if the console is currently being throttled. Calls to this function can be used to // Determines if the console is currently being throttled. Calls to this function can be used to
// determine if output should be funneled along to the websocket processes. // determine if output should be funneled along to the websocket processes.
func (ct *ConsoleThrottler) Throttled() bool { func (ct *ConsoleThrottler) Throttled() bool {
return ct.isThrottled.Get() return ct.isThrottled.Load()
} }
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed // Starts a timer that runs in a seperate thread and will continually decrement the lines processed
@ -65,7 +65,7 @@ func (ct *ConsoleThrottler) Throttled() bool {
// are canceled if the context passed through is canceled. // are canceled if the context passed through is canceled.
func (ct *ConsoleThrottler) StartTimer(ctx context.Context) { func (ct *ConsoleThrottler) StartTimer(ctx context.Context) {
system.Every(ctx, time.Duration(int64(ct.LineResetInterval))*time.Millisecond, func(_ time.Time) { system.Every(ctx, time.Duration(int64(ct.LineResetInterval))*time.Millisecond, func(_ time.Time) {
ct.isThrottled.Set(false) ct.isThrottled.Store(false)
atomic.StoreUint64(&ct.count, 0) atomic.StoreUint64(&ct.count, 0)
}) })
@ -99,7 +99,7 @@ func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
// activation. Once the throttle is triggered and has passed the kill at value we will trigger a server // activation. Once the throttle is triggered and has passed the kill at value we will trigger a server
// stop automatically. // stop automatically.
if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() { if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() {
ct.isThrottled.Set(true) ct.isThrottled.Store(true)
if ct.markActivation(true) >= ct.MaximumTriggerCount { if ct.markActivation(true) >= ct.MaximumTriggerCount {
return ErrTooMuchConsoleData return ErrTooMuchConsoleData
} }
@ -112,15 +112,12 @@ func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
// Returns the throttler instance for the server or creates a new one. // Returns the throttler instance for the server or creates a new one.
func (s *Server) Throttler() *ConsoleThrottler { func (s *Server) Throttler() *ConsoleThrottler {
s.throttleLock.Lock() s.throttleOnce.Do(func() {
defer s.throttleLock.Unlock()
if s.throttler == nil {
s.throttler = &ConsoleThrottler{ s.throttler = &ConsoleThrottler{
isThrottled: system.NewAtomicBool(false),
ConsoleThrottles: config.Get().Throttles, ConsoleThrottles: config.Get().Throttles,
} }
} })
return s.throttler return s.throttler
} }

View File

@ -107,7 +107,7 @@ func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
// value. This is a blocking operation to the calling process. // value. This is a blocking operation to the calling process.
if !allowStaleValue { if !allowStaleValue {
return fs.updateCachedDiskUsage() return fs.updateCachedDiskUsage()
} else if !fs.lookupInProgress.Get() { } else if !fs.lookupInProgress.Load() {
// Otherwise, if we allow a stale value and there isn't a valid item in the cache and we aren't // Otherwise, if we allow a stale value and there isn't a valid item in the cache and we aren't
// currently performing a lookup, just do the disk usage calculation in the background. // currently performing a lookup, just do the disk usage calculation in the background.
go func(fs *Filesystem) { go func(fs *Filesystem) {
@ -133,8 +133,8 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
// Signal that we're currently updating the disk size so that other calls to the disk checking // Signal that we're currently updating the disk size so that other calls to the disk checking
// functions can determine if they should queue up additional calls to this function. Ensure that // functions can determine if they should queue up additional calls to this function. Ensure that
// we always set this back to "false" when this process is done executing. // we always set this back to "false" when this process is done executing.
fs.lookupInProgress.Set(true) fs.lookupInProgress.Store(true)
defer fs.lookupInProgress.Set(false) defer fs.lookupInProgress.Store(false)
// If there is no size its either because there is no data (in which case running this function // If there is no size its either because there is no data (in which case running this function
// will have effectively no impact), or there is nothing in the cache, in which case we need to // will have effectively no impact), or there is nothing in the cache, in which case we need to

View File

@ -22,7 +22,7 @@ import (
type Filesystem struct { type Filesystem struct {
mu sync.RWMutex mu sync.RWMutex
lastLookupTime *usageLookupTime lastLookupTime *usageLookupTime
lookupInProgress system.AtomicBool lookupInProgress *system.AtomicBool
diskUsed int64 diskUsed int64
diskCheckInterval time.Duration diskCheckInterval time.Duration
@ -42,6 +42,7 @@ func New(root string, size int64) *Filesystem {
diskLimit: size, diskLimit: size,
diskCheckInterval: time.Duration(config.Get().System.DiskCheckInterval), diskCheckInterval: time.Duration(config.Get().System.DiskCheckInterval),
lastLookupTime: &usageLookupTime{}, lastLookupTime: &usageLookupTime{},
lookupInProgress: system.NewAtomicBool(false),
} }
} }

View File

@ -13,14 +13,13 @@ import (
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"golang.org/x/sync/semaphore" "github.com/pterodactyl/wings/system"
"html/template" "html/template"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time"
) )
// Executes the installation stack for a server process. Bubbles any errors up to the calling // Executes the installation stack for a server process. Bubbles any errors up to the calling
@ -137,56 +136,30 @@ func NewInstallationProcess(s *Server, script *api.InstallationScript) (*Install
return proc, nil return proc, nil
} }
// Try to obtain an exclusive lock on the installation process for the server. Waits up to 10
// seconds before aborting with a context timeout.
func (s *Server) acquireInstallationLock() error {
if s.installer.sem == nil {
s.installer.sem = semaphore.NewWeighted(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return s.installer.sem.Acquire(ctx, 1)
}
// Determines if the server is actively running the installation process by checking the status // Determines if the server is actively running the installation process by checking the status
// of the semaphore lock. // of the installer lock.
func (s *Server) IsInstalling() bool { func (s *Server) IsInstalling() bool {
if s.installer.sem == nil { return s.installing.Load()
return false
}
if s.installer.sem.TryAcquire(1) {
// If we made it into this block it means we were able to obtain an exclusive lock
// on the semaphore. In that case, go ahead and release that lock immediately, and
// return false.
s.installer.sem.Release(1)
return false
}
return true
} }
func (s *Server) IsTransferring() bool { func (s *Server) IsTransferring() bool {
return s.transferring.Get() return s.transferring.Load()
} }
func (s *Server) SetTransferring(state bool) { func (s *Server) SetTransferring(state bool) {
s.transferring.Set(state) s.transferring.Store(state)
} }
// Removes the installer container for the server. // Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() { func (ip *InstallationProcess) RemoveContainer() error {
err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{
RemoveVolumes: true, RemoveVolumes: true,
Force: true, Force: true,
}) })
if err != nil && !client.IsErrNotFound(err) { if err != nil && !client.IsErrNotFound(err) {
ip.Server.Log().WithField("error", err).Warn("failed to delete server install container") return err
} }
return nil
} }
// Runs the installation process, this is done as in a background thread. This will configure // Runs the installation process, this is done as in a background thread. This will configure
@ -196,8 +169,8 @@ func (ip *InstallationProcess) RemoveContainer() {
// log in the server's configuration directory. // log in the server's configuration directory.
func (ip *InstallationProcess) Run() error { func (ip *InstallationProcess) Run() error {
ip.Server.Log().Debug("acquiring installation process lock") ip.Server.Log().Debug("acquiring installation process lock")
if err := ip.Server.acquireInstallationLock(); err != nil { if !ip.Server.installing.SwapIf(true) {
return err return errors.New("install: cannot obtain installation lock")
} }
// We now have an exclusive lock on this installation process. Ensure that whenever this // We now have an exclusive lock on this installation process. Ensure that whenever this
@ -205,7 +178,7 @@ func (ip *InstallationProcess) Run() error {
// without encountering a wait timeout. // without encountering a wait timeout.
defer func() { defer func() {
ip.Server.Log().Debug("releasing installation process lock") ip.Server.Log().Debug("releasing installation process lock")
ip.Server.installer.sem.Release(1) ip.Server.installing.Store(false)
}() }()
if err := ip.BeforeExecute(); err != nil { if err := ip.BeforeExecute(); err != nil {
@ -215,7 +188,6 @@ func (ip *InstallationProcess) Run() error {
cid, err := ip.Execute() cid, err := ip.Execute()
if err != nil { if err != nil {
ip.RemoveContainer() ip.RemoveContainer()
return err return err
} }
@ -342,22 +314,12 @@ func (ip *InstallationProcess) BeforeExecute() error {
if err := ip.writeScriptToDisk(); err != nil { if err := ip.writeScriptToDisk(); err != nil {
return errors.WithMessage(err, "failed to write installation script to disk") return errors.WithMessage(err, "failed to write installation script to disk")
} }
if err := ip.pullInstallationImage(); err != nil { if err := ip.pullInstallationImage(); err != nil {
return errors.WithMessage(err, "failed to pull updated installation container image for server") return errors.WithMessage(err, "failed to pull updated installation container image for server")
} }
if err := ip.RemoveContainer(); err != nil {
opts := types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
}
if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil {
if !client.IsErrNotFound(err) {
return errors.WithMessage(err, "failed to remove existing install container for server") return errors.WithMessage(err, "failed to remove existing install container for server")
} }
}
return nil return nil
} }
@ -430,6 +392,12 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
// Executes the installation process inside a specially created docker container. // Executes the installation process inside a specially created docker container.
func (ip *InstallationProcess) Execute() (string, error) { func (ip *InstallationProcess) Execute() (string, error) {
// Create a child context that is canceled once this function is done running. This
// will also be canceled if the parent context (from the Server struct) is canceled
// which occurs if the server is deleted.
ctx, cancel := context.WithCancel(ip.context)
defer cancel()
conf := &container.Config{ conf := &container.Config{
Hostname: "installer", Hostname: "installer",
AttachStdout: true, AttachStdout: true,
@ -488,28 +456,35 @@ func (ip *InstallationProcess) Execute() (string, error) {
} }
}() }()
r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Id()+"_installer") r, err := ip.client.ContainerCreate(ctx, conf, hostConf, nil, ip.Server.Id()+"_installer")
if err != nil { if err != nil {
return "", err return "", err
} }
ip.Server.Log().WithField("container_id", r.ID).Info("running installation script for server in container") ip.Server.Log().WithField("container_id", r.ID).Info("running installation script for server in container")
if err := ip.client.ContainerStart(ip.context, r.ID, types.ContainerStartOptions{}); err != nil { if err := ip.client.ContainerStart(ctx, r.ID, types.ContainerStartOptions{}); err != nil {
return "", err return "", err
} }
// Process the install event in the background by listening to the stream output until the
// container has stopped, at which point we'll disconnect from it.
//
// If there is an error during the streaming output just report it and do nothing else, the
// install can still run, the console just won't have any output.
go func(id string) { go func(id string) {
ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...")
if err := ip.StreamOutput(id); err != nil { if err := ip.StreamOutput(ctx, id); err != nil {
ip.Server.Log().WithField("error", err).Error("error while handling output stream for server install process") ip.Server.Log().WithField("error", err).Warn("error connecting to server install stream output")
} }
ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.")
}(r.ID) }(r.ID)
sChan, eChan := ip.client.ContainerWait(ip.context, r.ID, container.WaitConditionNotRunning) sChan, eChan := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning)
select { select {
case err := <-eChan: case err := <-eChan:
if err != nil { // Once the container has stopped running we can mark the install process as being completed.
if err == nil {
ip.Server.Events().Publish(DaemonMessageEvent, "Installation process completed.")
} else {
return "", err return "", err
} }
case <-sChan: case <-sChan:
@ -521,8 +496,8 @@ func (ip *InstallationProcess) Execute() (string, error) {
// Streams the output of the installation process to a log file in the server configuration // Streams the output of the installation process to a log file in the server configuration
// directory, as well as to a websocket listener so that the process can be viewed in // directory, as well as to a websocket listener so that the process can be viewed in
// the panel by administrators. // the panel by administrators.
func (ip *InstallationProcess) StreamOutput(id string) error { func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error {
reader, err := ip.client.ContainerLogs(ip.context, id, types.ContainerLogsOptions{ reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
ShowStderr: true, ShowStderr: true,
Follow: true, Follow: true,

View File

@ -17,7 +17,7 @@ type ResourceUsage struct {
environment.Stats environment.Stats
// The current server status. // The current server status.
State system.AtomicString `json:"state"` State *system.AtomicString `json:"state"`
// The current disk space being used by the server. This value is not guaranteed to be accurate // The current disk space being used by the server. This value is not guaranteed to be accurate
// at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a // at all times. It is "manually" set whenever server.Proc() is called. This is kind of just a

View File

@ -28,7 +28,7 @@ type Server struct {
emitterLock sync.Mutex emitterLock sync.Mutex
powerLock *semaphore.Weighted powerLock *semaphore.Weighted
throttleLock sync.Mutex throttleOnce sync.Once
// Maintains the configuration for the server. This is the data that gets returned by the Panel // Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images. // such as build settings and container images.
@ -55,9 +55,8 @@ type Server struct {
// two installer processes at the same time. This also allows us to cancel a running // two installer processes at the same time. This also allows us to cancel a running
// installation process, for example when a server is deleted from the panel while the // installation process, for example when a server is deleted from the panel while the
// installer process is still running. // installer process is still running.
installer InstallerDetails installing *system.AtomicBool
transferring *system.AtomicBool
transferring system.AtomicBool
// The console throttler instance used to control outputs. // The console throttler instance used to control outputs.
throttler *ConsoleThrottler throttler *ConsoleThrottler
@ -67,12 +66,6 @@ type Server struct {
wsBagLocker sync.Mutex wsBagLocker sync.Mutex
} }
type InstallerDetails struct {
// Installer lock. You should obtain an exclusive lock on this context while running
// the installation process and release it when finished.
sem *semaphore.Weighted
}
// Returns a new server instance with a context and all of the default values set on // Returns a new server instance with a context and all of the default values set on
// the instance. // the instance.
func New() (*Server, error) { func New() (*Server, error) {
@ -80,6 +73,8 @@ func New() (*Server, error) {
s := Server{ s := Server{
ctx: ctx, ctx: ctx,
ctxCancel: &cancel, ctxCancel: &cancel,
installing: system.NewAtomicBool(false),
transferring: system.NewAtomicBool(false),
} }
if err := defaults.Set(&s); err != nil { if err := defaults.Set(&s); err != nil {
return nil, err return nil, err
@ -87,7 +82,7 @@ func New() (*Server, error) {
if err := defaults.Set(&s.cfg); err != nil { if err := defaults.Set(&s.cfg); err != nil {
return nil, err return nil, err
} }
s.resources.State.Store(environment.ProcessOfflineState) s.resources.State = system.NewAtomicString(environment.ProcessOfflineState)
return &s, nil return &s, nil
} }

View File

@ -1,10 +1,14 @@
package system package system
import ( import (
"bufio"
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io"
"strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -37,20 +41,47 @@ func FormatBytes(b int64) string {
} }
type AtomicBool struct { type AtomicBool struct {
flag uint32 v bool
mu sync.RWMutex
} }
func (ab *AtomicBool) Set(v bool) { func NewAtomicBool(v bool) *AtomicBool {
i := 0 return &AtomicBool{v: v}
if v {
i = 1
} }
atomic.StoreUint32(&ab.flag, uint32(i)) func (ab *AtomicBool) Store(v bool) {
ab.mu.Lock()
ab.v = v
ab.mu.Unlock()
} }
func (ab *AtomicBool) Get() bool { // Stores the value "v" if the current value stored in the AtomicBool is the opposite
return atomic.LoadUint32(&ab.flag) == 1 // boolean value. If successfully swapped, the response is "true", otherwise "false"
// is returned.
func (ab *AtomicBool) SwapIf(v bool) bool {
ab.mu.Lock()
defer ab.mu.Unlock()
if ab.v != v {
ab.v = v
return true
}
return false
}
func (ab *AtomicBool) Load() bool {
ab.mu.RLock()
defer ab.mu.RUnlock()
return ab.v
}
func (ab *AtomicBool) UnmarshalJSON(b []byte) error {
ab.mu.Lock()
defer ab.mu.Unlock()
return json.Unmarshal(b, &ab.v)
}
func (ab *AtomicBool) MarshalJSON() ([]byte, error) {
return json.Marshal(ab.Load())
} }
// AtomicString allows for reading/writing to a given struct field without having to worry // AtomicString allows for reading/writing to a given struct field without having to worry
@ -61,8 +92,8 @@ type AtomicString struct {
mu sync.RWMutex mu sync.RWMutex
} }
func NewAtomicString(v string) AtomicString { func NewAtomicString(v string) *AtomicString {
return AtomicString{v: v} return &AtomicString{v: v}
} }
// Stores the string value passed atomically. // Stores the string value passed atomically.
@ -79,12 +110,12 @@ func (as *AtomicString) Load() string {
return as.v return as.v
} }
func (as *AtomicString) UnmarshalText(b []byte) error { func (as *AtomicString) UnmarshalJSON(b []byte) error {
as.Store(string(b)) as.mu.Lock()
return nil defer as.mu.Unlock()
return json.Unmarshal(b, &as.v)
} }
//goland:noinspection GoVetCopyLock func (as *AtomicString) MarshalJSON() ([]byte, error) {
func (as AtomicString) MarshalText() ([]byte, error) { return json.Marshal(as.Load())
return []byte(as.Load()), nil
} }