From c8e6e29abcfd3cdb95189266c0f9bd57dead7ea9 Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sat, 17 Aug 2019 13:19:56 -0700 Subject: [PATCH] Add support for streaming usage stats back to the calling process --- server/environment.go | 8 +++++ server/environment_docker.go | 70 +++++++++++++++++++++++++++++++++++- server/resources.go | 51 ++++++++++++++++++++++++++ server/server.go | 3 ++ wings.go | 4 +-- 5 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 server/resources.go diff --git a/server/environment.go b/server/environment.go index 1e66a05..b7b9658 100644 --- a/server/environment.go +++ b/server/environment.go @@ -51,4 +51,12 @@ type Environment interface { // Reads the log file for the process from the end backwards until the provided // number of bytes is met. Readlog(int64) ([]string, error) + + // Polls the given environment for resource usage of the server when the process + // is running. + EnableResourcePolling() error + + // Disables the polling operation for resource usage and sets the required values + // to 0 in the server resource usage struct. + DisableResourcePolling() error } diff --git a/server/environment_docker.go b/server/environment_docker.go index 1bce5af..17445c4 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -43,6 +43,10 @@ type DockerEnvironment struct { // Controls the hijacked response stream which exists only when we're attached to // the running container instance. stream types.HijackedResponse + + // Holds the stats stream used by the polling commands so that we can easily close + // it out. + stats io.ReadCloser } // Creates a new base Docker environment. A server must still be attached to it. @@ -209,6 +213,7 @@ func (d *DockerEnvironment) Attach() error { Server: d.Server, } + d.EnableResourcePolling() d.attached = true go func() { @@ -261,6 +266,69 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { return err } +// Enables resource polling on the docker instance. Except we aren't actually polling Docker for this +// information, instead just sit there with an async process that lets Docker stream all of this data +// to us automatically. +func (d *DockerEnvironment) EnableResourcePolling() error { + fmt.Println("called") + if d.Server.State == ProcessOfflineState { + fmt.Println("not running") + return errors.New("cannot enable resource polling on a server that is not running") + } + + ctx := context.Background() + + stats, err := d.Client.ContainerStats(ctx, d.Server.Uuid, true) + if err != nil { + return err + } + d.stats = stats.Body + + dec := json.NewDecoder(d.stats) + go func(s *Server) { + pCpu := 0.0 + pSystem := 0.0 + + for { + var v *types.StatsJSON + + if err := dec.Decode(&v); err != nil { + zap.S().Warnw("encountered error processing server stats; stopping collection", zap.Error(err)) + d.DisableResourcePolling() + return + } + + // Disable collection if the server is in an offline state and this process is + // still running. + if s.State == ProcessOfflineState { + d.DisableResourcePolling() + return + } + + s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(pCpu, pSystem, &v.CPUStats) + s.Resources.Memory = v.MemoryStats.Usage + s.Resources.MemoryLimit = v.MemoryStats.Limit + s.Resources.Disk = 0 + + for _, nw := range v.Networks { + s.Resources.Network.RxBytes += nw.RxBytes + s.Resources.Network.TxBytes += nw.TxBytes + } + } + }(d.Server) + + return nil +} + +// Closes the stats stream for a server process. +func (d *DockerEnvironment) DisableResourcePolling() error { + if d.stats == nil { + return nil + } + + return d.stats.Close() +} + // Creates a new container for the server using all of the data that is currently // available for it. If the container already exists it will be returned. func (d *DockerEnvironment) Create() error { @@ -529,4 +597,4 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet { } return out -} +} \ No newline at end of file diff --git a/server/resources.go b/server/resources.go new file mode 100644 index 0000000..83e8ba9 --- /dev/null +++ b/server/resources.go @@ -0,0 +1,51 @@ +package server + +import ( + "github.com/docker/docker/api/types" + "math" +) + +// Defines the current resource usage for a given server instance. If a server is offline you +// should obviously expect memory and CPU usage to be 0. However, disk will always be returned +// since that is not dependent on the server being running to collect that data. +type ResourceUsage struct { + // The total amount of memory, in bytes, that this server instance is consuming. + Memory uint64 `json:"memory_bytes"` + // The total amount of memory this container or resource can use. Inside Docker this is + // going to be higher than you'd expect because we're automatically allocating overhead + // abilities for the container, so its not going to be a perfect match. + MemoryLimit uint64 `json:"memory_limit_bytes"` + // The absolute CPU usage is the amount of CPU used in relation to the entire system and + // does not take into account any limits on the server process itself. + CpuAbsolute float64 `json:"cpu_absolute"` + // The current disk space being used by the server. This is cached to prevent slow lookup + // issues on frequent refreshes. + Disk uint64 `json:"disk_bytes"` + // Current network transmit in & out for a container. + Network struct { + RxBytes uint64 `json:"rx_bytes"` + TxBytes uint64 `json:"tx_bytes"` + } `json:"network"` +} + +// Calculates the absolute CPU usage used by the server process on the system, not constrained +// by the defined CPU limits on the container. +// +// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166 +func (ru *ResourceUsage) CalculateAbsoluteCpu(previousCpu, previousSystem float64, stats *types.CPUStats) float64 { + // Calculate the change in CPU usage between the current and previous reading. + cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(previousCpu) + + // Calculate the change for the entire system's CPU usage between current and previous reading. + systemDelta := float64(stats.SystemUsage) - float64(previousSystem) + + // Calculate the total number of CPU cores being used. + cpus := float64(len(stats.CPUUsage.PercpuUsage)) + + percent := 0.0 + if systemDelta > 0.0 && cpuDelta > 0.0 { + percent = (cpuDelta / systemDelta) * cpus * 100.0 + } + + return math.Round(percent * 1000) / 1000 +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index 0eabedb..dddadbb 100644 --- a/server/server.go +++ b/server/server.go @@ -48,6 +48,8 @@ type Server struct { Filesystem *Filesystem `json:"-"` + Resources *ResourceUsage `json:"resources"` + // Server cache used to store frequently requested information in memory and make // certain long operations return faster. For example, FS disk space usage. Cache *cache.Cache `json:"-"` @@ -203,6 +205,7 @@ func FromConfiguration(data []byte, cfg *config.SystemConfiguration) (*Server, e Configuration: cfg, Server: s, } + s.Resources = &ResourceUsage{} return s, nil } diff --git a/wings.go b/wings.go index 9845914..0f947a3 100644 --- a/wings.go +++ b/wings.go @@ -72,10 +72,10 @@ func main() { zap.S().Errorw("error checking server environment status", zap.String("server", s.Uuid), zap.Error(err)) } else if r { zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid)) + s.SetState(server.ProcessRunningState) if err := s.Environment.Attach(); err != nil { zap.S().Errorw("error attaching to server environment", zap.String("server", s.Uuid), zap.Error(err)) - } else { - s.SetState(server.ProcessRunningState) + s.SetState(server.ProcessOfflineState) } } }