Misc mutex locking things to avoid data races

This commit is contained in:
Dane Everitt 2020-07-18 16:03:25 -07:00
parent 0b9d923d15
commit 8315ff8ae1
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
8 changed files with 98 additions and 40 deletions

View File

@ -4,10 +4,13 @@ import (
"encoding/json" "encoding/json"
"github.com/gbrlsnchs/jwt/v3" "github.com/gbrlsnchs/jwt/v3"
"strings" "strings"
"sync"
) )
type WebsocketPayload struct { type WebsocketPayload struct {
jwt.Payload jwt.Payload
sync.RWMutex
UserID json.Number `json:"user_id"` UserID json.Number `json:"user_id"`
ServerUUID string `json:"server_uuid"` ServerUUID string `json:"server_uuid"`
Permissions []string `json:"permissions"` Permissions []string `json:"permissions"`
@ -15,11 +18,24 @@ type WebsocketPayload struct {
// Returns the JWT payload. // Returns the JWT payload.
func (p *WebsocketPayload) GetPayload() *jwt.Payload { func (p *WebsocketPayload) GetPayload() *jwt.Payload {
p.RLock()
defer p.RUnlock()
return &p.Payload return &p.Payload
} }
func (p *WebsocketPayload) GetServerUuid() string {
p.RLock()
defer p.RUnlock()
return p.ServerUUID
}
// Checks if the given token payload has a permission string. // Checks if the given token payload has a permission string.
func (p *WebsocketPayload) HasPermission(permission string) bool { func (p *WebsocketPayload) HasPermission(permission string) bool {
p.RLock()
defer p.RUnlock()
for _, k := range p.Permissions { for _, k := range p.Permissions {
if k == permission || (!strings.HasPrefix(permission, "admin") && k == "*") { if k == permission || (!strings.HasPrefix(permission, "admin") && k == "*") {
return true return true

View File

@ -127,10 +127,7 @@ func (h *Handler) TokenValid() error {
return errors.New("jwt does not have connect permission") return errors.New("jwt does not have connect permission")
} }
h.server.RLock() if h.server.Id() != j.GetServerUuid() {
defer h.server.RUnlock()
if h.server.Uuid != j.ServerUUID {
return errors.New("jwt server uuid mismatch") return errors.New("jwt server uuid mismatch")
} }

View File

@ -10,7 +10,8 @@ import (
func (s *Server) UpdateConfigurationFiles() { func (s *Server) UpdateConfigurationFiles() {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
for _, v := range s.processConfiguration.ConfigurationFiles { files := s.ProcessConfiguration().ConfigurationFiles
for _, v := range files {
wg.Add(1) wg.Add(1)
go func(f parser.ConfigurationFile, server *Server) { go func(f parser.ConfigurationFile, server *Server) {

View File

@ -21,6 +21,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -44,6 +45,23 @@ type DockerEnvironment struct {
// Holds the stats stream used by the polling commands so that we can easily close // Holds the stats stream used by the polling commands so that we can easily close
// it out. // it out.
stats io.ReadCloser stats io.ReadCloser
sync.RWMutex
}
// Set if this process is currently attached to the process.
func (d *DockerEnvironment) SetAttached(a bool) {
d.Lock()
d.attached = a
d.Unlock()
}
// Determine if the this process is currently attached to the container.
func (d *DockerEnvironment) IsAttached() bool {
d.RLock()
defer d.RUnlock()
return d.attached
} }
// Creates a new base Docker environment. A server must still be attached to it. // Creates a new base Docker environment. A server must still be attached to it.
@ -72,7 +90,7 @@ func (d *DockerEnvironment) Type() string {
// Determines if the container exists in this environment. // Determines if the container exists in this environment.
func (d *DockerEnvironment) Exists() (bool, error) { func (d *DockerEnvironment) Exists() (bool, error) {
_, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil { if err != nil {
// If this error is because the container instance wasn't found via Docker we // If this error is because the container instance wasn't found via Docker we
@ -96,7 +114,7 @@ func (d *DockerEnvironment) Exists() (bool, error) {
// //
// @see docker/client/errors.go // @see docker/client/errors.go
func (d *DockerEnvironment) IsRunning() (bool, error) { func (d *DockerEnvironment) IsRunning() (bool, error) {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil { if err != nil {
return false, err return false, err
} }
@ -108,7 +126,7 @@ func (d *DockerEnvironment) IsRunning() (bool, error) {
// making any changes to the operational state of the container. This allows memory, cpu, // making any changes to the operational state of the container. This allows memory, cpu,
// and IO limitations to be adjusted on the fly for individual instances. // and IO limitations to be adjusted on the fly for individual instances.
func (d *DockerEnvironment) InSituUpdate() error { func (d *DockerEnvironment) InSituUpdate() error {
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil { if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil {
// If the container doesn't exist for some reason there really isn't anything // If the container doesn't exist for some reason there really isn't anything
// we can do to fix that in this process (it doesn't make sense at least). In those // we can do to fix that in this process (it doesn't make sense at least). In those
// cases just return without doing anything since we still want to save the configuration // cases just return without doing anything since we still want to save the configuration
@ -130,7 +148,7 @@ func (d *DockerEnvironment) InSituUpdate() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
if _, err := d.Client.ContainerUpdate(ctx, d.Server.Uuid, u); err != nil { if _, err := d.Client.ContainerUpdate(ctx, d.Server.Id(), u); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -156,7 +174,7 @@ func (d *DockerEnvironment) OnBeforeStart() error {
// Always destroy and re-create the server container to ensure that synced data from // Always destroy and re-create the server container to ensure that synced data from
// the Panel is used. // the Panel is used.
if err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil { if err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil {
if !client.IsErrNotFound(err) { if !client.IsErrNotFound(err) {
return err return err
} }
@ -204,7 +222,7 @@ func (d *DockerEnvironment) Start() error {
return &suspendedError{} return &suspendedError{}
} }
if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil { if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil {
// Do nothing if the container is not found, we just don't want to continue // Do nothing if the container is not found, we just don't want to continue
// to the next block of code here. This check was inlined here to guard againt // to the next block of code here. This check was inlined here to guard againt
// a nil-pointer when checking c.State below. // a nil-pointer when checking c.State below.
@ -259,7 +277,7 @@ func (d *DockerEnvironment) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
if err := d.Client.ContainerStart(ctx, d.Server.Uuid, types.ContainerStartOptions{}); err != nil { if err := d.Client.ContainerStart(ctx, d.Server.Id(), types.ContainerStartOptions{}); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -272,7 +290,7 @@ func (d *DockerEnvironment) Start() error {
// Stops the container that the server is running in. This will allow up to 10 // Stops the container that the server is running in. This will allow up to 10
// seconds to pass before a failure occurs. // seconds to pass before a failure occurs.
func (d *DockerEnvironment) Stop() error { func (d *DockerEnvironment) Stop() error {
stop := d.Server.processConfiguration.Stop stop := d.Server.ProcessConfiguration().Stop
if stop.Type == api.ProcessStopSignal { if stop.Type == api.ProcessStopSignal {
return d.Terminate(os.Kill) return d.Terminate(os.Kill)
} }
@ -284,7 +302,7 @@ func (d *DockerEnvironment) Stop() error {
t := time.Second * 10 t := time.Second * 10
return d.Client.ContainerStop(context.Background(), d.Server.Uuid, &t) return d.Client.ContainerStop(context.Background(), d.Server.Id(), &t)
} }
// Attempts to gracefully stop a server using the defined stop command. If the server // Attempts to gracefully stop a server using the defined stop command. If the server
@ -305,7 +323,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
// Block the return of this function until the container as been marked as no // Block the return of this function until the container as been marked as no
// longer running. If this wait does not end by the time seconds have passed, // longer running. If this wait does not end by the time seconds have passed,
// attempt to terminate the container, or return an error. // attempt to terminate the container, or return an error.
ok, errChan := d.Client.ContainerWait(ctx, d.Server.Uuid, container.WaitConditionNotRunning) ok, errChan := d.Client.ContainerWait(ctx, d.Server.Id(), container.WaitConditionNotRunning)
select { select {
case <-ctx.Done(): case <-ctx.Done():
if ctxErr := ctx.Err(); ctxErr != nil { if ctxErr := ctx.Err(); ctxErr != nil {
@ -327,7 +345,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
// Forcefully terminates the container using the signal passed through. // Forcefully terminates the container using the signal passed through.
func (d *DockerEnvironment) Terminate(signal os.Signal) error { func (d *DockerEnvironment) Terminate(signal os.Signal) error {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -339,7 +357,7 @@ func (d *DockerEnvironment) Terminate(signal os.Signal) error {
d.Server.SetState(ProcessStoppingState) d.Server.SetState(ProcessStoppingState)
return d.Client.ContainerKill( return d.Client.ContainerKill(
context.Background(), d.Server.Uuid, strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"), context.Background(), d.Server.Id(), strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"),
) )
} }
@ -349,7 +367,7 @@ func (d *DockerEnvironment) Destroy() error {
// Avoid crash detection firing off. // Avoid crash detection firing off.
d.Server.SetState(ProcessStoppingState) d.Server.SetState(ProcessStoppingState)
err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{ err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{
RemoveVolumes: true, RemoveVolumes: true,
RemoveLinks: false, RemoveLinks: false,
Force: true, Force: true,
@ -369,7 +387,7 @@ func (d *DockerEnvironment) Destroy() error {
// Determine the container exit state and return the exit code and wether or not // Determine the container exit state and return the exit code and wether or not
// the container was killed by the OOM killer. // the container was killed by the OOM killer.
func (d *DockerEnvironment) ExitState() (uint32, bool, error) { func (d *DockerEnvironment) ExitState() (uint32, bool, error) {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil { if err != nil {
// I'm not entirely sure how this can happen to be honest. I tried deleting a // I'm not entirely sure how this can happen to be honest. I tried deleting a
// container _while_ a server was running and wings gracefully saw the crash and // container _while_ a server was running and wings gracefully saw the crash and
@ -395,7 +413,7 @@ func (d *DockerEnvironment) ExitState() (uint32, bool, error) {
// miss important output at the beginning because of the time delay with attaching to the // miss important output at the beginning because of the time delay with attaching to the
// output. // output.
func (d *DockerEnvironment) Attach() error { func (d *DockerEnvironment) Attach() error {
if d.attached { if d.IsAttached() {
return nil return nil
} }
@ -404,7 +422,7 @@ func (d *DockerEnvironment) Attach() error {
} }
var err error var err error
d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Uuid, types.ContainerAttachOptions{ d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{
Stdin: true, Stdin: true,
Stdout: true, Stdout: true,
Stderr: true, Stderr: true,
@ -419,7 +437,7 @@ func (d *DockerEnvironment) Attach() error {
Server: d.Server, Server: d.Server,
} }
d.attached = true d.SetAttached(true)
go func() { go func() {
if err := d.EnableResourcePolling(); err != nil { if err := d.EnableResourcePolling(); err != nil {
d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server") d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server")
@ -430,7 +448,7 @@ func (d *DockerEnvironment) Attach() error {
defer d.stream.Close() defer d.stream.Close()
defer func() { defer func() {
d.Server.SetState(ProcessOfflineState) d.Server.SetState(ProcessOfflineState)
d.attached = false d.SetAttached(false)
}() }()
io.Copy(console, d.stream.Reader) io.Copy(console, d.stream.Reader)
@ -448,7 +466,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
return errors.WithStack(err) return errors.WithStack(err)
} }
return errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid)) return errors.New(fmt.Sprintf("no such container: %s", d.Server.Id()))
} }
opts := types.ContainerLogsOptions{ opts := types.ContainerLogsOptions{
@ -458,7 +476,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
Since: time.Now().Format(time.RFC3339), Since: time.Now().Format(time.RFC3339),
} }
reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Uuid, opts) reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Id(), opts)
go func(r io.ReadCloser) { go func(r io.ReadCloser) {
defer r.Close() defer r.Close()
@ -484,7 +502,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
return errors.New("cannot enable resource polling on a server that is not running") return errors.New("cannot enable resource polling on a server that is not running")
} }
stats, err := d.Client.ContainerStats(context.Background(), d.Server.Uuid, true) stats, err := d.Client.ContainerStats(context.Background(), d.Server.Id(), true)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -621,7 +639,7 @@ func (d *DockerEnvironment) Create() error {
// If the container already exists don't hit the user with an error, just return // If the container already exists don't hit the user with an error, just return
// the current information about it which is what we would do when creating the // the current information about it which is what we would do when creating the
// container anyways. // container anyways.
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err == nil { if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err == nil {
return nil return nil
} else if !client.IsErrNotFound(err) { } else if !client.IsErrNotFound(err) {
return errors.WithStack(err) return errors.WithStack(err)
@ -633,7 +651,7 @@ func (d *DockerEnvironment) Create() error {
} }
conf := &container.Config{ conf := &container.Config{
Hostname: d.Server.Uuid, Hostname: d.Server.Id(),
Domainname: config.Get().Docker.Domainname, Domainname: config.Get().Docker.Domainname,
User: strconv.Itoa(config.Get().System.User.Uid), User: strconv.Itoa(config.Get().System.User.Uid),
AttachStdin: true, AttachStdin: true,
@ -685,16 +703,17 @@ func (d *DockerEnvironment) Create() error {
break break
} }
log := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{
"server": d.Server.Uuid, "server": d.Server.Id(),
"source_path": source, "source_path": source,
"target_path": target, "target_path": target,
"read_only": m.ReadOnly, "read_only": m.ReadOnly,
}) })
if mounted { if mounted {
log.Debug("attaching mount to server's container") logger.Debug("attaching mount to server's container")
} else { } else {
log.Warn("skipping mount because it isn't allowed") logger.Warn("skipping mount because it isn't allowed")
} }
} }
@ -738,7 +757,7 @@ func (d *DockerEnvironment) Create() error {
NetworkMode: container.NetworkMode(config.Get().Docker.Network.Mode), NetworkMode: container.NetworkMode(config.Get().Docker.Network.Mode),
} }
if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Uuid); err != nil { if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Id()); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -748,7 +767,7 @@ func (d *DockerEnvironment) Create() error {
// Sends the specified command to the stdin of the running container instance. There is no // Sends the specified command to the stdin of the running container instance. There is no
// confirmation that this data is sent successfully, only that it gets pushed into the stdin. // confirmation that this data is sent successfully, only that it gets pushed into the stdin.
func (d *DockerEnvironment) SendCommand(c string) error { func (d *DockerEnvironment) SendCommand(c string) error {
if !d.attached { if !d.IsAttached() {
return errors.New("attempting to send command to non-attached instance") return errors.New("attempting to send command to non-attached instance")
} }
@ -760,7 +779,7 @@ func (d *DockerEnvironment) SendCommand(c string) error {
// Reads the log file for the server. This does not care if the server is running or not, it will // Reads the log file for the server. This does not care if the server is running or not, it will
// simply try to read the last X bytes of the file and return them. // simply try to read the last X bytes of the file and return them.
func (d *DockerEnvironment) Readlog(len int64) ([]string, error) { func (d *DockerEnvironment) Readlog(len int64) ([]string, error) {
j, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) j, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -27,9 +27,11 @@ func (s *Server) onConsoleOutput(data string) {
// If the specific line of output is one that would mark the server as started, // If the specific line of output is one that would mark the server as started,
// set the server to that state. Only do this if the server is not currently stopped // set the server to that state. Only do this if the server is not currently stopped
// or stopping. // or stopping.
if s.GetState() == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { match := s.ProcessConfiguration().Startup.Done
if s.GetState() == ProcessStartingState && strings.Contains(data, match) {
s.Log().WithFields(log.Fields{ s.Log().WithFields(log.Fields{
"match": s.processConfiguration.Startup.Done, "match": match,
"against": data, "against": data,
}).Debug("detected server in running state based on console line output") }).Debug("detected server in running state based on console line output")
@ -40,7 +42,8 @@ func (s *Server) onConsoleOutput(data string) {
// set the server to be in a stopping state, otherwise crash detection will kick in and // set the server to be in a stopping state, otherwise crash detection will kick in and
// cause the server to unexpectedly restart on the user. // cause the server to unexpectedly restart on the user.
if s.IsRunning() { if s.IsRunning() {
if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value { stop := s.ProcessConfiguration().Stop
if stop.Type == api.ProcessStopCommand && data == stop.Value {
s.SetState(ProcessStoppingState) s.SetState(ProcessStoppingState)
} }
} }

10
server/process.go Normal file
View File

@ -0,0 +1,10 @@
package server
import "github.com/pterodactyl/wings/api"
func (s *Server) ProcessConfiguration() *api.ProcessConfiguration {
s.RLock()
defer s.RUnlock()
return s.procConfig
}

View File

@ -104,7 +104,7 @@ type Server struct {
// Defines the process configuration for the server instance. This is dynamically // Defines the process configuration for the server instance. This is dynamically
// fetched from the Pterodactyl Server instance each time the server process is // fetched from the Pterodactyl Server instance each time the server process is
// started, and then cached here. // started, and then cached here.
processConfiguration *api.ProcessConfiguration procConfig *api.ProcessConfiguration
// Tracks the installation process for this server and prevents a server from running // Tracks the installation process for this server and prevents a server from running
// two installer processes at the same time. This also allows us to cancel a running // two installer processes at the same time. This also allows us to cancel a running
@ -153,6 +153,13 @@ type BuildSettings struct {
Threads string `json:"threads"` Threads string `json:"threads"`
} }
func (s *Server) Id() string {
s.RLock()
defer s.RUnlock()
return s.Uuid
}
// Converts the CPU limit for a server build into a number that can be better understood // Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to // by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota. // Docker that it has unlimited CPU quota.
@ -366,7 +373,10 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
return errors.WithStack(err) return errors.WithStack(err)
} }
s.processConfiguration = cfg.ProcessConfiguration s.Lock()
s.procConfig = cfg.ProcessConfiguration
s.Unlock()
return nil return nil
} }

View File

@ -27,12 +27,14 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.New("attempting to merge a data stack with an invalid UUID") return errors.New("attempting to merge a data stack with an invalid UUID")
} }
s.Lock()
// Merge the new data object that we have received with the existing server data object // Merge the new data object that we have received with the existing server data object
// and then save it to the disk so it is persistent. // and then save it to the disk so it is persistent.
if err := mergo.Merge(s, src, mergo.WithOverride); err != nil { if err := mergo.Merge(s, src, mergo.WithOverride); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
// Mergo makes that previous lock disappear. Handle that by just re-locking the object.
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()