Better race condition prevention
This commit is contained in:
parent
17d204a631
commit
03045c94be
|
@ -134,7 +134,7 @@ func rootCmdRun(*cobra.Command, []string) {
|
||||||
//
|
//
|
||||||
// This will also validate that a server process is running if the last tracked state we have
|
// This will also validate that a server process is running if the last tracked state we have
|
||||||
// is that it was running, but we see that the container process is not currently running.
|
// is that it was running, but we see that the container process is not currently running.
|
||||||
if r || (!r && (s.State == server.ProcessRunningState || s.State == server.ProcessStartingState)) {
|
if r || (!r && s.IsRunning()) {
|
||||||
zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid))
|
zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid))
|
||||||
if err := s.Environment.Start(); err != nil {
|
if err := s.Environment.Start(); err != nil {
|
||||||
zap.S().Warnw(
|
zap.S().Warnw(
|
||||||
|
|
|
@ -227,7 +227,7 @@ func (h *Handler) HandleInbound(m Message) error {
|
||||||
|
|
||||||
// On every authentication event, send the current server status back
|
// On every authentication event, send the current server status back
|
||||||
// to the client. :)
|
// to the client. :)
|
||||||
h.server.Events().Publish(server.StatusEvent, h.server.State)
|
h.server.Events().Publish(server.StatusEvent, h.server.GetState())
|
||||||
|
|
||||||
h.unsafeSendJson(Message{
|
h.unsafeSendJson(Message{
|
||||||
Event: AuthenticationSuccessEvent,
|
Event: AuthenticationSuccessEvent,
|
||||||
|
@ -293,7 +293,7 @@ func (h *Handler) HandleInbound(m Message) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.server.State == server.ProcessOfflineState {
|
if h.server.GetState() == server.ProcessOfflineState {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (s *Server) handleServerCrash() error {
|
||||||
// No point in doing anything here if the server isn't currently offline, there
|
// No point in doing anything here if the server isn't currently offline, there
|
||||||
// is no reason to do a crash detection event. If the server crash detection is
|
// is no reason to do a crash detection event. If the server crash detection is
|
||||||
// disabled we want to skip anything after this as well.
|
// disabled we want to skip anything after this as well.
|
||||||
if s.State != ProcessOfflineState || !s.CrashDetection.Enabled {
|
if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled {
|
||||||
if !s.CrashDetection.Enabled {
|
if !s.CrashDetection.Enabled {
|
||||||
zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid))
|
zap.S().Debugw("server triggered crash detection but handler is disabled for server process", zap.String("server", s.Uuid))
|
||||||
|
|
||||||
|
|
|
@ -271,7 +271,7 @@ func (d *DockerEnvironment) Stop() error {
|
||||||
// does not stop after seconds have passed, an error will be returned, or the instance
|
// does not stop after seconds have passed, an error will be returned, or the instance
|
||||||
// will be terminated forcefully depending on the value of the second argument.
|
// will be terminated forcefully depending on the value of the second argument.
|
||||||
func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
|
func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
|
||||||
if d.Server.State == ProcessOfflineState {
|
if d.Server.GetState() == ProcessOfflineState {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,7 +444,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
|
||||||
// information, instead just sit there with an async process that lets Docker stream all of this data
|
// information, instead just sit there with an async process that lets Docker stream all of this data
|
||||||
// to us automatically.
|
// to us automatically.
|
||||||
func (d *DockerEnvironment) EnableResourcePolling() error {
|
func (d *DockerEnvironment) EnableResourcePolling() error {
|
||||||
if d.Server.State == ProcessOfflineState {
|
if d.Server.GetState() == ProcessOfflineState {
|
||||||
return errors.New("cannot enable resource polling on a server that is not running")
|
return errors.New("cannot enable resource polling on a server that is not running")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,7 +472,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
|
||||||
|
|
||||||
// Disable collection if the server is in an offline state and this process is
|
// Disable collection if the server is in an offline state and this process is
|
||||||
// still running.
|
// still running.
|
||||||
if s.State == ProcessOfflineState {
|
if s.GetState() == ProcessOfflineState {
|
||||||
d.DisableResourcePolling()
|
d.DisableResourcePolling()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (s *Server) Install() error {
|
||||||
// Reinstalls a server's software by utilizing the install script for the server egg. This
|
// Reinstalls a server's software by utilizing the install script for the server egg. This
|
||||||
// does not touch any existing files for the server, other than what the script modifies.
|
// does not touch any existing files for the server, other than what the script modifies.
|
||||||
func (s *Server) Reinstall() error {
|
func (s *Server) Reinstall() error {
|
||||||
if s.State != ProcessOfflineState {
|
if s.GetState() != ProcessOfflineState {
|
||||||
zap.S().Debugw("waiting for server instance to enter a stopped state", zap.String("server", s.Uuid))
|
zap.S().Debugw("waiting for server instance to enter a stopped state", zap.String("server", s.Uuid))
|
||||||
if err := s.Environment.WaitForStop(10, true); err != nil {
|
if err := s.Environment.WaitForStop(10, true); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (s *Server) onConsoleOutput(data string) {
|
||||||
// If the specific line of output is one that would mark the server as started,
|
// If the specific line of output is one that would mark the server as started,
|
||||||
// set the server to that state. Only do this if the server is not currently stopped
|
// set the server to that state. Only do this if the server is not currently stopped
|
||||||
// or stopping.
|
// or stopping.
|
||||||
if s.State == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) {
|
if s.GetState() == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) {
|
||||||
zap.S().Debugw(
|
zap.S().Debugw(
|
||||||
"detected server in running state based on line output", zap.String("match", s.processConfiguration.Startup.Done), zap.String("against", data),
|
"detected server in running state based on line output", zap.String("match", s.processConfiguration.Startup.Done), zap.String("against", data),
|
||||||
)
|
)
|
||||||
|
@ -38,7 +38,7 @@ func (s *Server) onConsoleOutput(data string) {
|
||||||
// If the command sent to the server is one that should stop the server we will need to
|
// If the command sent to the server is one that should stop the server we will need to
|
||||||
// set the server to be in a stopping state, otherwise crash detection will kick in and
|
// set the server to be in a stopping state, otherwise crash detection will kick in and
|
||||||
// cause the server to unexpectedly restart on the user.
|
// cause the server to unexpectedly restart on the user.
|
||||||
if s.State == ProcessStartingState || s.State == ProcessRunningState {
|
if s.IsRunning() {
|
||||||
if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value {
|
if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value {
|
||||||
s.SetState(ProcessStoppingState)
|
s.SetState(ProcessStoppingState)
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,8 +181,8 @@ func LoadDirectory() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if state, exists := states[s.Uuid]; exists {
|
if state, exists := states[s.Uuid]; exists {
|
||||||
s.State = state
|
s.SetState(state)
|
||||||
zap.S().Debugw("loaded server state from cache", zap.String("server", s.Uuid), zap.String("state", s.State))
|
zap.S().Debugw("loaded server state from cache", zap.String("server", s.Uuid), zap.String("state", s.GetState()))
|
||||||
}
|
}
|
||||||
|
|
||||||
servers.Add(s)
|
servers.Add(s)
|
||||||
|
@ -317,71 +317,6 @@ func (s *Server) CreateEnvironment() error {
|
||||||
return s.Environment.Create()
|
return s.Environment.Create()
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
ProcessOfflineState = "offline"
|
|
||||||
ProcessStartingState = "starting"
|
|
||||||
ProcessRunningState = "running"
|
|
||||||
ProcessStoppingState = "stopping"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Sets the state of the server internally. This function handles crash detection as
|
|
||||||
// well as reporting to event listeners for the server.
|
|
||||||
func (s *Server) SetState(state string) error {
|
|
||||||
if state != ProcessOfflineState && state != ProcessStartingState && state != ProcessRunningState && state != ProcessStoppingState {
|
|
||||||
return errors.New(fmt.Sprintf("invalid server state received: %s", state))
|
|
||||||
}
|
|
||||||
|
|
||||||
prevState := s.State
|
|
||||||
s.State = state
|
|
||||||
|
|
||||||
// Persist this change to the disk immediately so that should the Daemon be stopped or
|
|
||||||
// crash we can immediately restore the server state.
|
|
||||||
//
|
|
||||||
// This really only makes a difference if all of the Docker containers are also stopped,
|
|
||||||
// but this was a highly requested feature and isn't hard to work with, so lets do it.
|
|
||||||
//
|
|
||||||
// We also get the benefit of server status changes always propagating corrected configurations
|
|
||||||
// to the disk should we forget to do it elsewhere.
|
|
||||||
go func() {
|
|
||||||
/*if _, err := server.WriteConfigurationToDisk(); err != nil {
|
|
||||||
zap.S().Warnw("failed to write server state change to disk", zap.String("server", server.Uuid), zap.Error(err))
|
|
||||||
}*/
|
|
||||||
|
|
||||||
if err := SaveServerStates(); err != nil {
|
|
||||||
zap.S().Warnw("failed to write server states to disk", zap.Error(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State))
|
|
||||||
|
|
||||||
// Emit the event to any listeners that are currently registered.
|
|
||||||
s.Events().Publish(StatusEvent, s.State)
|
|
||||||
|
|
||||||
// If server was in an online state, and is now in an offline state we should handle
|
|
||||||
// that as a crash event. In that scenario, check the last crash time, and the crash
|
|
||||||
// counter.
|
|
||||||
//
|
|
||||||
// In the event that we have passed the thresholds, don't do anything, otherwise
|
|
||||||
// automatically attempt to start the process back up for the user. This is done in a
|
|
||||||
// separate thread as to not block any actions currently taking place in the flow
|
|
||||||
// that called this function.
|
|
||||||
if (prevState == ProcessStartingState || prevState == ProcessRunningState) && s.State == ProcessOfflineState {
|
|
||||||
zap.S().Infow("detected server as entering a potentially crashed state; running handler", zap.String("server", s.Uuid))
|
|
||||||
|
|
||||||
go func(server *Server) {
|
|
||||||
if err := server.handleServerCrash(); err != nil {
|
|
||||||
if IsTooFrequentCrashError(err) {
|
|
||||||
zap.S().Infow("did not restart server after crash; occurred too soon after last", zap.String("server", server.Uuid))
|
|
||||||
} else {
|
|
||||||
zap.S().Errorw("failed to handle server crash state", zap.String("server", server.Uuid), zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the process configuration data for the server.
|
// Gets the process configuration data for the server.
|
||||||
func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *api.RequestError, error) {
|
func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *api.RequestError, error) {
|
||||||
return api.NewRequester().GetServerConfiguration(s.Uuid)
|
return api.NewRequester().GetServerConfiguration(s.Uuid)
|
||||||
|
|
|
@ -2,7 +2,9 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"go.uber.org/zap"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -61,12 +63,12 @@ func getServerStates() (map[string]string, error) {
|
||||||
return states, nil
|
return states, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveServerStates .
|
// saveServerStates .
|
||||||
func SaveServerStates() error {
|
func saveServerStates() error {
|
||||||
// Get the states of all servers on the daemon.
|
// Get the states of all servers on the daemon.
|
||||||
states := map[string]string{}
|
states := map[string]string{}
|
||||||
for _, s := range GetServers().All() {
|
for _, s := range GetServers().All() {
|
||||||
states[s.Uuid] = s.State
|
states[s.Uuid] = s.GetState()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the map to a json object.
|
// Convert the map to a json object.
|
||||||
|
@ -85,3 +87,84 @@ func SaveServerStates() error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
ProcessOfflineState = "offline"
|
||||||
|
ProcessStartingState = "starting"
|
||||||
|
ProcessRunningState = "running"
|
||||||
|
ProcessStoppingState = "stopping"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sets the state of the server internally. This function handles crash detection as
|
||||||
|
// well as reporting to event listeners for the server.
|
||||||
|
func (s *Server) SetState(state string) error {
|
||||||
|
if state != ProcessOfflineState && state != ProcessStartingState && state != ProcessRunningState && state != ProcessStoppingState {
|
||||||
|
return errors.New(fmt.Sprintf("invalid server state received: %s", state))
|
||||||
|
}
|
||||||
|
|
||||||
|
prevState := s.GetState()
|
||||||
|
|
||||||
|
// Obtain a mutex lock and update the current state of the server.
|
||||||
|
s.Lock()
|
||||||
|
s.State = state
|
||||||
|
|
||||||
|
// Emit the event to any listeners that are currently registered.
|
||||||
|
zap.S().Debugw("saw server status change event", zap.String("server", s.Uuid), zap.String("status", s.State))
|
||||||
|
s.Events().Publish(StatusEvent, s.State)
|
||||||
|
|
||||||
|
// Release the lock as it is no longer needed for the following actions.
|
||||||
|
s.Unlock()
|
||||||
|
|
||||||
|
// Persist this change to the disk immediately so that should the Daemon be stopped or
|
||||||
|
// crash we can immediately restore the server state.
|
||||||
|
//
|
||||||
|
// This really only makes a difference if all of the Docker containers are also stopped,
|
||||||
|
// but this was a highly requested feature and isn't hard to work with, so lets do it.
|
||||||
|
//
|
||||||
|
// We also get the benefit of server status changes always propagating corrected configurations
|
||||||
|
// to the disk should we forget to do it elsewhere.
|
||||||
|
go func() {
|
||||||
|
if err := saveServerStates(); err != nil {
|
||||||
|
zap.S().Warnw("failed to write server states to disk", zap.Error(err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// If server was in an online state, and is now in an offline state we should handle
|
||||||
|
// that as a crash event. In that scenario, check the last crash time, and the crash
|
||||||
|
// counter.
|
||||||
|
//
|
||||||
|
// In the event that we have passed the thresholds, don't do anything, otherwise
|
||||||
|
// automatically attempt to start the process back up for the user. This is done in a
|
||||||
|
// separate thread as to not block any actions currently taking place in the flow
|
||||||
|
// that called this function.
|
||||||
|
if (prevState == ProcessStartingState || prevState == ProcessRunningState) && s.GetState() == ProcessOfflineState {
|
||||||
|
zap.S().Infow("detected server as entering a potentially crashed state; running handler", zap.String("server", s.Uuid))
|
||||||
|
|
||||||
|
go func(server *Server) {
|
||||||
|
if err := server.handleServerCrash(); err != nil {
|
||||||
|
if IsTooFrequentCrashError(err) {
|
||||||
|
zap.S().Infow("did not restart server after crash; occurred too soon after last", zap.String("server", server.Uuid))
|
||||||
|
} else {
|
||||||
|
zap.S().Errorw("failed to handle server crash state", zap.String("server", server.Uuid), zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the current state of the server in a race-safe manner.
|
||||||
|
func (s *Server) GetState() string {
|
||||||
|
s.RLock()
|
||||||
|
defer s.RUnlock()
|
||||||
|
|
||||||
|
return s.State
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determines if the server state is running or not. This is different than the
|
||||||
|
// environment state, it is simply the tracked state from this daemon instance, and
|
||||||
|
// not the response from Docker.
|
||||||
|
func (s *Server) IsRunning() bool {
|
||||||
|
return s.GetState() == ProcessRunningState || s.GetState() == ProcessStartingState
|
||||||
|
}
|
|
@ -102,7 +102,7 @@ func (s *Server) runBackgroundActions() {
|
||||||
// Check if the server is now suspended, and if so and the process is not terminated
|
// Check if the server is now suspended, and if so and the process is not terminated
|
||||||
// yet, do it immediately.
|
// yet, do it immediately.
|
||||||
go func(server *Server) {
|
go func(server *Server) {
|
||||||
if server.Suspended && server.State != ProcessOfflineState {
|
if server.Suspended && server.GetState() != ProcessOfflineState {
|
||||||
zap.S().Infow("server suspended with running process state, terminating now", zap.String("server", server.Uuid))
|
zap.S().Infow("server suspended with running process state, terminating now", zap.String("server", server.Uuid))
|
||||||
|
|
||||||
/*if err := server.Environment.Terminate(os.Kill); err != nil {
|
/*if err := server.Environment.Terminate(os.Kill); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user