avoid deadlocks while processing tons of data over server event listeners; closes pterodactyl/panel#2298
This commit is contained in:
		
							parent
							
								
									a31e805c5a
								
							
						
					
					
						commit
						0cd8dc2b5f
					
				| 
						 | 
					@ -15,11 +15,13 @@ import (
 | 
				
			||||||
// Attach to the instance and then automatically emit an event whenever the resource usage for the
 | 
					// Attach to the instance and then automatically emit an event whenever the resource usage for the
 | 
				
			||||||
// server process changes.
 | 
					// server process changes.
 | 
				
			||||||
func (e *Environment) pollResources(ctx context.Context) error {
 | 
					func (e *Environment) pollResources(ctx context.Context) error {
 | 
				
			||||||
	log.WithField("container_id", e.Id).Debug("starting resource polling..")
 | 
						l := log.WithField("container_id", e.Id)
 | 
				
			||||||
	defer log.WithField("container_id", e.Id).Debug("resource polling stopped")
 | 
					
 | 
				
			||||||
 | 
						l.Debug("starting resource polling for container")
 | 
				
			||||||
 | 
						defer l.Debug("stopped resource polling for container")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if e.State() == environment.ProcessOfflineState {
 | 
						if e.State() == environment.ProcessOfflineState {
 | 
				
			||||||
		return errors.New("attempting to enable resource polling on a stopped server instance")
 | 
							return errors.New("cannot enable resource polling on a stopped server")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stats, err := e.client.ContainerStats(context.Background(), e.Id, true)
 | 
						stats, err := e.client.ContainerStats(context.Background(), e.Id, true)
 | 
				
			||||||
| 
						 | 
					@ -34,22 +36,22 @@ func (e *Environment) pollResources(ctx context.Context) error {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-ctx.Done():
 | 
							case <-ctx.Done():
 | 
				
			||||||
			return ctx.Err()
 | 
								return ctx.Err()
 | 
				
			||||||
 | 
					 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			var v *types.StatsJSON
 | 
								var v *types.StatsJSON
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err := dec.Decode(&v); err != nil {
 | 
								if err := dec.Decode(&v); err != nil {
 | 
				
			||||||
				if err != io.EOF {
 | 
									if err != io.EOF {
 | 
				
			||||||
					log.WithField("container_id", e.Id).Warn("encountered error processing docker stats output, stopping collection")
 | 
										l.WithField("error", errors.WithStack(err)).Warn("error while processing Docker stats output for container")
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										l.Debug("io.EOF encountered during stats decode, stopping polling...")
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				log.WithField("container_id", e.Id).Debug("detected io.EOF, stopping resource polling")
 | 
					 | 
				
			||||||
				return nil
 | 
									return nil
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Disable collection if the server is in an offline state and this process is still running.
 | 
								// Disable collection if the server is in an offline state and this process is still running.
 | 
				
			||||||
			if e.State() == environment.ProcessOfflineState {
 | 
								if e.State() == environment.ProcessOfflineState {
 | 
				
			||||||
				log.WithField("container_id", e.Id).Debug("process in offline state while resource polling is still active; stopping poll")
 | 
									l.Debug("process in offline state while resource polling is still active; stopping poll")
 | 
				
			||||||
				return nil
 | 
									return nil
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -74,7 +76,7 @@ func (e *Environment) pollResources(ctx context.Context) error {
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if b, err := json.Marshal(st); err != nil {
 | 
								if b, err := json.Marshal(st); err != nil {
 | 
				
			||||||
				log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while marshaling stats object for environment")
 | 
									l.WithField("error", errors.WithStack(err)).Warn("error while marshaling stats object for environment")
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
				e.Events().Publish(environment.ResourceEvent, string(b))
 | 
									e.Events().Publish(environment.ResourceEvent, string(b))
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,7 +41,8 @@ func IsPathResolutionError(err error) bool {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Filesystem struct {
 | 
					type Filesystem struct {
 | 
				
			||||||
	mu sync.Mutex
 | 
						mu           sync.Mutex
 | 
				
			||||||
 | 
						lookupTimeMu sync.RWMutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	lastLookupTime   time.Time
 | 
						lastLookupTime   time.Time
 | 
				
			||||||
	lookupInProgress int32
 | 
						lookupInProgress int32
 | 
				
			||||||
| 
						 | 
					@ -254,17 +255,25 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
 | 
				
			||||||
// This is primarily to avoid a bunch of I/O operations from piling up on the server, especially on servers
 | 
					// This is primarily to avoid a bunch of I/O operations from piling up on the server, especially on servers
 | 
				
			||||||
// with a large amount of files.
 | 
					// with a large amount of files.
 | 
				
			||||||
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
 | 
					func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
 | 
				
			||||||
	// Check if cache is expired...
 | 
						// Check if cache is expired.
 | 
				
			||||||
	if !fs.lastLookupTime.After(time.Now().Add(time.Second * -150)) {
 | 
						fs.lookupTimeMu.RLock()
 | 
				
			||||||
		// If we are now allowing a stale response, or there is no lookup currently in progress, go ahead
 | 
						isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10))
 | 
				
			||||||
		// and perform the lookup and return the fresh value. This is a blocking operation to the calling
 | 
						fs.lookupTimeMu.RUnlock()
 | 
				
			||||||
		// process.
 | 
					 | 
				
			||||||
		if !allowStaleValue || atomic.LoadInt32(&fs.lookupInProgress) == 0 {
 | 
					 | 
				
			||||||
			return fs.updateCachedDiskUsage()
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Otherwise, just go ahead and perform the cached disk usage update.
 | 
						if !isValidInCache {
 | 
				
			||||||
		go fs.updateCachedDiskUsage()
 | 
							// If we are now allowing a stale response go ahead  and perform the lookup and return the fresh
 | 
				
			||||||
 | 
							// value. This is a blocking operation to the calling process.
 | 
				
			||||||
 | 
							if !allowStaleValue {
 | 
				
			||||||
 | 
								return fs.updateCachedDiskUsage()
 | 
				
			||||||
 | 
							} else if atomic.LoadInt32(&fs.lookupInProgress) == 0 {
 | 
				
			||||||
 | 
								// Otherwise, if we allow a stale value and there isn't a valid item in the cache and we aren't
 | 
				
			||||||
 | 
								// currently performing a lookup, just do the disk usage calculation in the background.
 | 
				
			||||||
 | 
								go func(fs *Filesystem) {
 | 
				
			||||||
 | 
									if _, err := fs.updateCachedDiskUsage(); err != nil {
 | 
				
			||||||
 | 
										fs.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to determine disk usage in go-routine")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}(fs)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Return the currently cached value back to the calling function.
 | 
						// Return the currently cached value back to the calling function.
 | 
				
			||||||
| 
						 | 
					@ -279,11 +288,11 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
 | 
				
			||||||
	fs.mu.Lock()
 | 
						fs.mu.Lock()
 | 
				
			||||||
	defer fs.mu.Unlock()
 | 
						defer fs.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Always clear the in progress flag when this process finishes.
 | 
						// Signal that we're currently updating the disk size so that other calls to the disk checking
 | 
				
			||||||
	defer atomic.StoreInt32(&fs.lookupInProgress, 0)
 | 
						// functions can determine if they should queue up additional calls to this function. Ensure that
 | 
				
			||||||
 | 
						// we always set this back to 0 when this process is done executing.
 | 
				
			||||||
	// Signal that we're currently updating the disk size, to prevent other routines to block on this.
 | 
					 | 
				
			||||||
	atomic.StoreInt32(&fs.lookupInProgress, 1)
 | 
						atomic.StoreInt32(&fs.lookupInProgress, 1)
 | 
				
			||||||
 | 
						defer atomic.StoreInt32(&fs.lookupInProgress, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// If there is no size its either because there is no data (in which case running this function
 | 
						// If there is no size its either because there is no data (in which case running this function
 | 
				
			||||||
	// will have effectively no impact), or there is nothing in the cache, in which case we need to
 | 
						// will have effectively no impact), or there is nothing in the cache, in which case we need to
 | 
				
			||||||
| 
						 | 
					@ -294,7 +303,10 @@ func (fs *Filesystem) updateCachedDiskUsage() (int64, error) {
 | 
				
			||||||
	// Always cache the size, even if there is an error. We want to always return that value
 | 
						// Always cache the size, even if there is an error. We want to always return that value
 | 
				
			||||||
	// so that we don't cause an endless loop of determining the disk size if there is a temporary
 | 
						// so that we don't cause an endless loop of determining the disk size if there is a temporary
 | 
				
			||||||
	// error encountered.
 | 
						// error encountered.
 | 
				
			||||||
 | 
						fs.lookupTimeMu.Lock()
 | 
				
			||||||
	fs.lastLookupTime = time.Now()
 | 
						fs.lastLookupTime = time.Now()
 | 
				
			||||||
 | 
						fs.lookupTimeMu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	atomic.StoreInt64(&fs.disk, size)
 | 
						atomic.StoreInt64(&fs.disk, size)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return size, err
 | 
						return size, err
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -21,40 +21,42 @@ func (s *Server) StartEventListeners() {
 | 
				
			||||||
	s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
 | 
						s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
 | 
				
			||||||
	s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
 | 
						s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// TODO: this is leaky I imagine since the routines aren't destroyed when the server is?
 | 
						go func(console chan events.Event) {
 | 
				
			||||||
	go func() {
 | 
							for data := range console {
 | 
				
			||||||
		for {
 | 
								// Immediately emit this event back over the server event stream since it is
 | 
				
			||||||
			select {
 | 
								// being called from the environment event stream and things probably aren't
 | 
				
			||||||
			case data := <-console:
 | 
								// listening to that event.
 | 
				
			||||||
				// Immediately emit this event back over the server event stream since it is
 | 
								s.Events().Publish(ConsoleOutputEvent, data.Data)
 | 
				
			||||||
				// being called from the environment event stream and things probably aren't
 | 
					 | 
				
			||||||
				// listening to that event.
 | 
					 | 
				
			||||||
				s.Events().Publish(ConsoleOutputEvent, data.Data)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// Also pass the data along to the console output channel.
 | 
								// Also pass the data along to the console output channel.
 | 
				
			||||||
				s.onConsoleOutput(data.Data)
 | 
								s.onConsoleOutput(data.Data)
 | 
				
			||||||
 | 
					 | 
				
			||||||
			case data := <-state:
 | 
					 | 
				
			||||||
				s.SetState(data.Data)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			case data := <-stats:
 | 
					 | 
				
			||||||
				st := new(environment.Stats)
 | 
					 | 
				
			||||||
				if err := json.Unmarshal([]byte(data.Data), st); err != nil {
 | 
					 | 
				
			||||||
					s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
 | 
					 | 
				
			||||||
					continue
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// Update the server resource tracking object with the resources we got here.
 | 
					 | 
				
			||||||
				s.resources.mu.Lock()
 | 
					 | 
				
			||||||
				s.resources.Stats = *st
 | 
					 | 
				
			||||||
				s.resources.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				s.Filesystem.HasSpaceAvailable(true)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				s.emitProcUsage()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}(console)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go func(state chan events.Event) {
 | 
				
			||||||
 | 
							for data := range state {
 | 
				
			||||||
 | 
								s.SetState(data.Data)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}(state)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go func(stats chan events.Event) {
 | 
				
			||||||
 | 
							for data := range stats {
 | 
				
			||||||
 | 
								st := new(environment.Stats)
 | 
				
			||||||
 | 
								if err := json.Unmarshal([]byte(data.Data), st); err != nil {
 | 
				
			||||||
 | 
									s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Update the server resource tracking object with the resources we got here.
 | 
				
			||||||
 | 
								s.resources.mu.Lock()
 | 
				
			||||||
 | 
								s.resources.Stats = *st
 | 
				
			||||||
 | 
								s.resources.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								s.Filesystem.HasSpaceAvailable(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								s.emitProcUsage()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}(stats)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
 | 
					var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -118,7 +118,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		s.Environment = env
 | 
							s.Environment = env
 | 
				
			||||||
		s.StartEventListeners()
 | 
							go s.StartEventListeners()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Forces the configuration to be synced with the panel.
 | 
						// Forces the configuration to be synced with the panel.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user