Add support for streaming usage stats back to the calling process
This commit is contained in:
parent
a1987f3aef
commit
c8e6e29abc
|
@ -51,4 +51,12 @@ type Environment interface {
|
||||||
// Reads the log file for the process from the end backwards until the provided
|
// Reads the log file for the process from the end backwards until the provided
|
||||||
// number of bytes is met.
|
// number of bytes is met.
|
||||||
Readlog(int64) ([]string, error)
|
Readlog(int64) ([]string, error)
|
||||||
|
|
||||||
|
// Polls the given environment for resource usage of the server when the process
|
||||||
|
// is running.
|
||||||
|
EnableResourcePolling() error
|
||||||
|
|
||||||
|
// Disables the polling operation for resource usage and sets the required values
|
||||||
|
// to 0 in the server resource usage struct.
|
||||||
|
DisableResourcePolling() error
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,10 @@ type DockerEnvironment struct {
|
||||||
// Controls the hijacked response stream which exists only when we're attached to
|
// Controls the hijacked response stream which exists only when we're attached to
|
||||||
// the running container instance.
|
// the running container instance.
|
||||||
stream types.HijackedResponse
|
stream types.HijackedResponse
|
||||||
|
|
||||||
|
// Holds the stats stream used by the polling commands so that we can easily close
|
||||||
|
// it out.
|
||||||
|
stats io.ReadCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new base Docker environment. A server must still be attached to it.
|
// Creates a new base Docker environment. A server must still be attached to it.
|
||||||
|
@ -209,6 +213,7 @@ func (d *DockerEnvironment) Attach() error {
|
||||||
Server: d.Server,
|
Server: d.Server,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.EnableResourcePolling()
|
||||||
d.attached = true
|
d.attached = true
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -261,6 +266,69 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enables resource polling on the docker instance. Except we aren't actually polling Docker for this
|
||||||
|
// information, instead just sit there with an async process that lets Docker stream all of this data
|
||||||
|
// to us automatically.
|
||||||
|
func (d *DockerEnvironment) EnableResourcePolling() error {
|
||||||
|
fmt.Println("called")
|
||||||
|
if d.Server.State == ProcessOfflineState {
|
||||||
|
fmt.Println("not running")
|
||||||
|
return errors.New("cannot enable resource polling on a server that is not running")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
stats, err := d.Client.ContainerStats(ctx, d.Server.Uuid, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.stats = stats.Body
|
||||||
|
|
||||||
|
dec := json.NewDecoder(d.stats)
|
||||||
|
go func(s *Server) {
|
||||||
|
pCpu := 0.0
|
||||||
|
pSystem := 0.0
|
||||||
|
|
||||||
|
for {
|
||||||
|
var v *types.StatsJSON
|
||||||
|
|
||||||
|
if err := dec.Decode(&v); err != nil {
|
||||||
|
zap.S().Warnw("encountered error processing server stats; stopping collection", zap.Error(err))
|
||||||
|
d.DisableResourcePolling()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable collection if the server is in an offline state and this process is
|
||||||
|
// still running.
|
||||||
|
if s.State == ProcessOfflineState {
|
||||||
|
d.DisableResourcePolling()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(pCpu, pSystem, &v.CPUStats)
|
||||||
|
s.Resources.Memory = v.MemoryStats.Usage
|
||||||
|
s.Resources.MemoryLimit = v.MemoryStats.Limit
|
||||||
|
s.Resources.Disk = 0
|
||||||
|
|
||||||
|
for _, nw := range v.Networks {
|
||||||
|
s.Resources.Network.RxBytes += nw.RxBytes
|
||||||
|
s.Resources.Network.TxBytes += nw.TxBytes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(d.Server)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Closes the stats stream for a server process.
|
||||||
|
func (d *DockerEnvironment) DisableResourcePolling() error {
|
||||||
|
if d.stats == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.stats.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Creates a new container for the server using all of the data that is currently
|
// Creates a new container for the server using all of the data that is currently
|
||||||
// available for it. If the container already exists it will be returned.
|
// available for it. If the container already exists it will be returned.
|
||||||
func (d *DockerEnvironment) Create() error {
|
func (d *DockerEnvironment) Create() error {
|
||||||
|
@ -529,4 +597,4 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet {
|
||||||
}
|
}
|
||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
51
server/resources.go
Normal file
51
server/resources.go
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/docker/api/types"
|
||||||
|
"math"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Defines the current resource usage for a given server instance. If a server is offline you
|
||||||
|
// should obviously expect memory and CPU usage to be 0. However, disk will always be returned
|
||||||
|
// since that is not dependent on the server being running to collect that data.
|
||||||
|
type ResourceUsage struct {
|
||||||
|
// The total amount of memory, in bytes, that this server instance is consuming.
|
||||||
|
Memory uint64 `json:"memory_bytes"`
|
||||||
|
// The total amount of memory this container or resource can use. Inside Docker this is
|
||||||
|
// going to be higher than you'd expect because we're automatically allocating overhead
|
||||||
|
// abilities for the container, so its not going to be a perfect match.
|
||||||
|
MemoryLimit uint64 `json:"memory_limit_bytes"`
|
||||||
|
// The absolute CPU usage is the amount of CPU used in relation to the entire system and
|
||||||
|
// does not take into account any limits on the server process itself.
|
||||||
|
CpuAbsolute float64 `json:"cpu_absolute"`
|
||||||
|
// The current disk space being used by the server. This is cached to prevent slow lookup
|
||||||
|
// issues on frequent refreshes.
|
||||||
|
Disk uint64 `json:"disk_bytes"`
|
||||||
|
// Current network transmit in & out for a container.
|
||||||
|
Network struct {
|
||||||
|
RxBytes uint64 `json:"rx_bytes"`
|
||||||
|
TxBytes uint64 `json:"tx_bytes"`
|
||||||
|
} `json:"network"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculates the absolute CPU usage used by the server process on the system, not constrained
|
||||||
|
// by the defined CPU limits on the container.
|
||||||
|
//
|
||||||
|
// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166
|
||||||
|
func (ru *ResourceUsage) CalculateAbsoluteCpu(previousCpu, previousSystem float64, stats *types.CPUStats) float64 {
|
||||||
|
// Calculate the change in CPU usage between the current and previous reading.
|
||||||
|
cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(previousCpu)
|
||||||
|
|
||||||
|
// Calculate the change for the entire system's CPU usage between current and previous reading.
|
||||||
|
systemDelta := float64(stats.SystemUsage) - float64(previousSystem)
|
||||||
|
|
||||||
|
// Calculate the total number of CPU cores being used.
|
||||||
|
cpus := float64(len(stats.CPUUsage.PercpuUsage))
|
||||||
|
|
||||||
|
percent := 0.0
|
||||||
|
if systemDelta > 0.0 && cpuDelta > 0.0 {
|
||||||
|
percent = (cpuDelta / systemDelta) * cpus * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
return math.Round(percent * 1000) / 1000
|
||||||
|
}
|
|
@ -48,6 +48,8 @@ type Server struct {
|
||||||
|
|
||||||
Filesystem *Filesystem `json:"-"`
|
Filesystem *Filesystem `json:"-"`
|
||||||
|
|
||||||
|
Resources *ResourceUsage `json:"resources"`
|
||||||
|
|
||||||
// Server cache used to store frequently requested information in memory and make
|
// Server cache used to store frequently requested information in memory and make
|
||||||
// certain long operations return faster. For example, FS disk space usage.
|
// certain long operations return faster. For example, FS disk space usage.
|
||||||
Cache *cache.Cache `json:"-"`
|
Cache *cache.Cache `json:"-"`
|
||||||
|
@ -203,6 +205,7 @@ func FromConfiguration(data []byte, cfg *config.SystemConfiguration) (*Server, e
|
||||||
Configuration: cfg,
|
Configuration: cfg,
|
||||||
Server: s,
|
Server: s,
|
||||||
}
|
}
|
||||||
|
s.Resources = &ResourceUsage{}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
4
wings.go
4
wings.go
|
@ -72,10 +72,10 @@ func main() {
|
||||||
zap.S().Errorw("error checking server environment status", zap.String("server", s.Uuid), zap.Error(err))
|
zap.S().Errorw("error checking server environment status", zap.String("server", s.Uuid), zap.Error(err))
|
||||||
} else if r {
|
} else if r {
|
||||||
zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid))
|
zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid))
|
||||||
|
s.SetState(server.ProcessRunningState)
|
||||||
if err := s.Environment.Attach(); err != nil {
|
if err := s.Environment.Attach(); err != nil {
|
||||||
zap.S().Errorw("error attaching to server environment", zap.String("server", s.Uuid), zap.Error(err))
|
zap.S().Errorw("error attaching to server environment", zap.String("server", s.Uuid), zap.Error(err))
|
||||||
} else {
|
s.SetState(server.ProcessOfflineState)
|
||||||
s.SetState(server.ProcessRunningState)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user