From 03045c94bee9d8ed037ea20261a92b9afc4492b6 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Fri, 10 Apr 2020 18:22:18 -0700 Subject: [PATCH] Better race condition prevention --- cmd/root.go | 2 +- router/websocket/websocket.go | 4 +- server/crash.go | 2 +- server/environment_docker.go | 6 +-- server/install.go | 2 +- server/listeners.go | 4 +- server/server.go | 69 +-------------------------- server/state.go | 89 +++++++++++++++++++++++++++++++++-- server/update.go | 2 +- 9 files changed, 99 insertions(+), 81 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 874e35f..0d9c7d8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -134,7 +134,7 @@ func rootCmdRun(*cobra.Command, []string) { // // This will also validate that a server process is running if the last tracked state we have // is that it was running, but we see that the container process is not currently running. - if r || (!r && (s.State == server.ProcessRunningState || s.State == server.ProcessStartingState)) { + if r || (!r && s.IsRunning()) { zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid)) if err := s.Environment.Start(); err != nil { zap.S().Warnw( diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index 25c660a..979c500 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -227,7 +227,7 @@ func (h *Handler) HandleInbound(m Message) error { // On every authentication event, send the current server status back // to the client. :) - h.server.Events().Publish(server.StatusEvent, h.server.State) + h.server.Events().Publish(server.StatusEvent, h.server.GetState()) h.unsafeSendJson(Message{ Event: AuthenticationSuccessEvent, @@ -293,7 +293,7 @@ func (h *Handler) HandleInbound(m Message) error { return nil } - if h.server.State == server.ProcessOfflineState { + if h.server.GetState() == server.ProcessOfflineState { return nil } diff --git a/server/crash.go b/server/crash.go index 328193e..0f08079 100644 --- a/server/crash.go +++ b/server/crash.go @@ -33,7 +33,7 @@ func (s *Server) handleServerCrash() error { // No point in doing anything here if the server isn't currently offline, there // is no reason to do a crash detection event. If the server crash detection is // disabled we want to skip anything after this as well. - if s.State != ProcessOfflineState || !s.CrashDetection.Enabled { + if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled { if !s.CrashDetection.Enabled { zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid)) diff --git a/server/environment_docker.go b/server/environment_docker.go index 7f9e91e..36afe74 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -271,7 +271,7 @@ func (d *DockerEnvironment) Stop() error { // does not stop after seconds have passed, an error will be returned, or the instance // will be terminated forcefully depending on the value of the second argument. func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error { - if d.Server.State == ProcessOfflineState { + if d.Server.GetState() == ProcessOfflineState { return nil } @@ -444,7 +444,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { // information, instead just sit there with an async process that lets Docker stream all of this data // to us automatically. func (d *DockerEnvironment) EnableResourcePolling() error { - if d.Server.State == ProcessOfflineState { + if d.Server.GetState() == ProcessOfflineState { return errors.New("cannot enable resource polling on a server that is not running") } @@ -472,7 +472,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error { // Disable collection if the server is in an offline state and this process is // still running. - if s.State == ProcessOfflineState { + if s.GetState() == ProcessOfflineState { d.DisableResourcePolling() return } diff --git a/server/install.go b/server/install.go index dd1206e..c2bcf2c 100644 --- a/server/install.go +++ b/server/install.go @@ -39,7 +39,7 @@ func (s *Server) Install() error { // Reinstalls a server's software by utilizing the install script for the server egg. This // does not touch any existing files for the server, other than what the script modifies. func (s *Server) Reinstall() error { - if s.State != ProcessOfflineState { + if s.GetState() != ProcessOfflineState { zap.S().Debugw("waiting for server instance to enter a stopped state", zap.String("server", s.Uuid)) if err := s.Environment.WaitForStop(10, true); err != nil { return err diff --git a/server/listeners.go b/server/listeners.go index 4fcc353..c164c99 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -27,7 +27,7 @@ func (s *Server) onConsoleOutput(data string) { // If the specific line of output is one that would mark the server as started, // set the server to that state. Only do this if the server is not currently stopped // or stopping. - if s.State == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { + if s.GetState() == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { zap.S().Debugw( "detected server in running state based on line output", zap.String("match", s.processConfiguration.Startup.Done), zap.String("against", data), ) @@ -38,7 +38,7 @@ func (s *Server) onConsoleOutput(data string) { // If the command sent to the server is one that should stop the server we will need to // set the server to be in a stopping state, otherwise crash detection will kick in and // cause the server to unexpectedly restart on the user. - if s.State == ProcessStartingState || s.State == ProcessRunningState { + if s.IsRunning() { if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value { s.SetState(ProcessStoppingState) } diff --git a/server/server.go b/server/server.go index 2977bb8..35c29a4 100644 --- a/server/server.go +++ b/server/server.go @@ -181,8 +181,8 @@ func LoadDirectory() error { } if state, exists := states[s.Uuid]; exists { - s.State = state - zap.S().Debugw("loaded server state from cache", zap.String("server", s.Uuid), zap.String("state", s.State)) + s.SetState(state) + zap.S().Debugw("loaded server state from cache", zap.String("server", s.Uuid), zap.String("state", s.GetState())) } servers.Add(s) @@ -317,71 +317,6 @@ func (s *Server) CreateEnvironment() error { return s.Environment.Create() } -const ( - ProcessOfflineState = "offline" - ProcessStartingState = "starting" - ProcessRunningState = "running" - ProcessStoppingState = "stopping" -) - -// Sets the state of the server internally. This function handles crash detection as -// well as reporting to event listeners for the server. -func (s *Server) SetState(state string) error { - if state != ProcessOfflineState && state != ProcessStartingState && state != ProcessRunningState && state != ProcessStoppingState { - return errors.New(fmt.Sprintf("invalid server state received: %s", state)) - } - - prevState := s.State - s.State = state - - // Persist this change to the disk immediately so that should the Daemon be stopped or - // crash we can immediately restore the server state. - // - // This really only makes a difference if all of the Docker containers are also stopped, - // but this was a highly requested feature and isn't hard to work with, so lets do it. - // - // We also get the benefit of server status changes always propagating corrected configurations - // to the disk should we forget to do it elsewhere. - go func() { - /*if _, err := server.WriteConfigurationToDisk(); err != nil { - zap.S().Warnw("failed to write server state change to disk", zap.String("server", server.Uuid), zap.Error(err)) - }*/ - - if err := SaveServerStates(); err != nil { - zap.S().Warnw("failed to write server states to disk", zap.Error(err)) - } - }() - - zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State)) - - // Emit the event to any listeners that are currently registered. - s.Events().Publish(StatusEvent, s.State) - - // If server was in an online state, and is now in an offline state we should handle - // that as a crash event. In that scenario, check the last crash time, and the crash - // counter. - // - // In the event that we have passed the thresholds, don't do anything, otherwise - // automatically attempt to start the process back up for the user. This is done in a - // separate thread as to not block any actions currently taking place in the flow - // that called this function. - if (prevState == ProcessStartingState || prevState == ProcessRunningState) && s.State == ProcessOfflineState { - zap.S().Infow("detected server as entering a potentially crashed state; running handler", zap.String("server", s.Uuid)) - - go func(server *Server) { - if err := server.handleServerCrash(); err != nil { - if IsTooFrequentCrashError(err) { - zap.S().Infow("did not restart server after crash; occurred too soon after last", zap.String("server", server.Uuid)) - } else { - zap.S().Errorw("failed to handle server crash state", zap.String("server", server.Uuid), zap.Error(err)) - } - } - }(s) - } - - return nil -} - // Gets the process configuration data for the server. func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *api.RequestError, error) { return api.NewRequester().GetServerConfiguration(s.Uuid) diff --git a/server/state.go b/server/state.go index d2bb00d..14ad182 100644 --- a/server/state.go +++ b/server/state.go @@ -2,7 +2,9 @@ package server import ( "encoding/json" + "fmt" "github.com/pkg/errors" + "go.uber.org/zap" "io/ioutil" "os" "sync" @@ -61,12 +63,12 @@ func getServerStates() (map[string]string, error) { return states, nil } -// SaveServerStates . -func SaveServerStates() error { +// saveServerStates . +func saveServerStates() error { // Get the states of all servers on the daemon. states := map[string]string{} for _, s := range GetServers().All() { - states[s.Uuid] = s.State + states[s.Uuid] = s.GetState() } // Convert the map to a json object. @@ -85,3 +87,84 @@ func SaveServerStates() error { return nil } + +const ( + ProcessOfflineState = "offline" + ProcessStartingState = "starting" + ProcessRunningState = "running" + ProcessStoppingState = "stopping" +) + +// Sets the state of the server internally. This function handles crash detection as +// well as reporting to event listeners for the server. +func (s *Server) SetState(state string) error { + if state != ProcessOfflineState && state != ProcessStartingState && state != ProcessRunningState && state != ProcessStoppingState { + return errors.New(fmt.Sprintf("invalid server state received: %s", state)) + } + + prevState := s.GetState() + + // Obtain a mutex lock and update the current state of the server. + s.Lock() + s.State = state + + // Emit the event to any listeners that are currently registered. + zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State)) + s.Events().Publish(StatusEvent, s.State) + + // Release the lock as it is no longer needed for the following actions. + s.Unlock() + + // Persist this change to the disk immediately so that should the Daemon be stopped or + // crash we can immediately restore the server state. + // + // This really only makes a difference if all of the Docker containers are also stopped, + // but this was a highly requested feature and isn't hard to work with, so lets do it. + // + // We also get the benefit of server status changes always propagating corrected configurations + // to the disk should we forget to do it elsewhere. + go func() { + if err := saveServerStates(); err != nil { + zap.S().Warnw("failed to write server states to disk", zap.Error(err)) + } + }() + + // If server was in an online state, and is now in an offline state we should handle + // that as a crash event. In that scenario, check the last crash time, and the crash + // counter. + // + // In the event that we have passed the thresholds, don't do anything, otherwise + // automatically attempt to start the process back up for the user. This is done in a + // separate thread as to not block any actions currently taking place in the flow + // that called this function. + if (prevState == ProcessStartingState || prevState == ProcessRunningState) && s.GetState() == ProcessOfflineState { + zap.S().Infow("detected server as entering a potentially crashed state; running handler", zap.String("server", s.Uuid)) + + go func(server *Server) { + if err := server.handleServerCrash(); err != nil { + if IsTooFrequentCrashError(err) { + zap.S().Infow("did not restart server after crash; occurred too soon after last", zap.String("server", server.Uuid)) + } else { + zap.S().Errorw("failed to handle server crash state", zap.String("server", server.Uuid), zap.Error(err)) + } + } + }(s) + } + + return nil +} + +// Returns the current state of the server in a race-safe manner. +func (s *Server) GetState() string { + s.RLock() + defer s.RUnlock() + + return s.State +} + +// Determines if the server state is running or not. This is different than the +// environment state, it is simply the tracked state from this daemon instance, and +// not the response from Docker. +func (s *Server) IsRunning() bool { + return s.GetState() == ProcessRunningState || s.GetState() == ProcessStartingState +} \ No newline at end of file diff --git a/server/update.go b/server/update.go index f852d64..1f0f5f8 100644 --- a/server/update.go +++ b/server/update.go @@ -102,7 +102,7 @@ func (s *Server) runBackgroundActions() { // Check if the server is now suspended, and if so and the process is not terminated // yet, do it immediately. go func(server *Server) { - if server.Suspended && server.State != ProcessOfflineState { + if server.Suspended && server.GetState() != ProcessOfflineState { zap.S().Infow("server suspended with running process state, terminating now", zap.String("server", server.Uuid)) /*if err := server.Environment.Terminate(os.Kill); err != nil {