Correctly send stats & proc information back for servers

This commit is contained in:
Dane Everitt 2020-08-18 21:38:42 -07:00
parent 956e87eb93
commit 5b241fdf36
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
7 changed files with 225 additions and 110 deletions

View File

@ -47,19 +47,27 @@ func (e *Environment) Attach() error {
e.SetStream(&st) e.SetStream(&st)
} }
console := new(Console) c := new(Console)
go func(console *Console) {
ctx, cancel := context.WithCancel(context.Background())
// TODO: resource polling should be handled by the server itself and just call a function defer cancel()
// on the environment that can return the data. Same for disabling polling.
go func() {
defer e.stream.Close() defer e.stream.Close()
defer func() { defer func() {
e.setState(system.ProcessOfflineState) e.setState(system.ProcessOfflineState)
e.SetStream(nil) e.SetStream(nil)
}() }()
// Poll resources in a seperate thread since this will block the copy call below
// from being reached until it is completed if not run in a seperate process. However,
// we still want it to be stopped when the copy operation below is finished running which
// indicates that the container is no longer running.
go e.pollResources(ctx)
// Stream the reader output to the console which will then fire off events and handle console
// throttling and sending the output to the user.
_, _ = io.Copy(console, e.stream.Reader) _, _ = io.Copy(console, e.stream.Reader)
}() }(c)
return nil return nil
} }

120
environment/docker/stats.go Normal file
View File

@ -0,0 +1,120 @@
package docker
import (
"context"
"encoding/json"
"github.com/apex/log"
"github.com/docker/docker/api/types"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/system"
"io"
"math"
"sync/atomic"
)
// Attach to the instance and then automatically emit an event whenever the resource usage for the
// server process changes.
func (e *Environment) pollResources(ctx context.Context) error {
if e.State() == system.ProcessOfflineState {
return errors.New("attempting to enable resource polling on a stopped server instance")
}
stats, err := e.client.ContainerStats(context.Background(), e.Id, true)
if err != nil {
return errors.WithStack(err)
}
dec := json.NewDecoder(stats.Body)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
var v *types.StatsJSON
if err := dec.Decode(&v); err != nil {
if err != io.EOF {
log.WithField("container_id", e.Id).Warn("encountered error processing docker stats output, stopping collection")
}
return nil
}
// Disable collection if the server is in an offline state and this process is still running.
if e.State() == system.ProcessOfflineState {
return nil
}
var rx uint64
var tx uint64
for _, nw := range v.Networks {
atomic.AddUint64(&rx, nw.RxBytes)
atomic.AddUint64(&tx, nw.RxBytes)
}
st := &environment.Stats{
Memory: calculateDockerMemory(v.MemoryStats),
MemoryLimit: v.MemoryStats.Limit,
CpuAbsolute: calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats),
Network: struct {
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
}{
RxBytes: rx,
TxBytes: tx,
},
}
b, _ := json.Marshal(st)
e.Events().Publish(environment.ResourceEvent, string(b))
}
}
}
// The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage
// value which can be rather confusing to people trying to compare panel usage to
// their stats output.
//
// This math is straight up lifted from their CLI repository in order to show the same
// values to avoid people bothering me about it. It should also reflect a slightly more
// correct memory value anyways.
//
// @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249
func calculateDockerMemory(stats types.MemoryStats) uint64 {
if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage {
return stats.Usage - v
}
if v := stats.Stats["inactive_file"]; v < stats.Usage {
return stats.Usage - v
}
return stats.Usage
}
// Calculates the absolute CPU usage used by the server process on the system, not constrained
// by the defined CPU limits on the container.
//
// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166
func calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
// Calculate the change in CPU usage between the current and previous reading.
cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage)
// Calculate the change for the entire system's CPU usage between current and previous reading.
systemDelta := float64(stats.SystemUsage) - float64(pStats.SystemUsage)
// Calculate the total number of CPU cores being used.
cpus := float64(stats.OnlineCPUs)
if cpus == 0.0 {
cpus = float64(len(stats.CPUUsage.PercpuUsage))
}
percent := 0.0
if systemDelta > 0.0 && cpuDelta > 0.0 {
percent = (cpuDelta / systemDelta) * cpus * 100.0
}
return math.Round(percent*1000) / 1000
}

View File

@ -8,6 +8,7 @@ import (
const ( const (
ConsoleOutputEvent = "console output" ConsoleOutputEvent = "console output"
StateChangeEvent = "state change" StateChangeEvent = "state change"
ResourceEvent = "resources"
) )
// Defines the basic interface that all environments need to implement so that // Defines the basic interface that all environments need to implement so that

47
environment/stats.go Normal file
View File

@ -0,0 +1,47 @@
package environment
import "sync"
// Defines the current resource usage for a given server instance. If a server is offline you
// should obviously expect memory and CPU usage to be 0. However, disk will always be returned
// since that is not dependent on the server being running to collect that data.
type Stats struct {
mu sync.RWMutex
// The total amount of memory, in bytes, that this server instance is consuming. This is
// calculated slightly differently than just using the raw Memory field that the stats
// return from the container, so please check the code setting this value for how that
// is calculated.
Memory uint64 `json:"memory_bytes"`
// The total amount of memory this container or resource can use. Inside Docker this is
// going to be higher than you'd expect because we're automatically allocating overhead
// abilities for the container, so its not going to be a perfect match.
MemoryLimit uint64 `json:"memory_limit_bytes"`
// The absolute CPU usage is the amount of CPU used in relation to the entire system and
// does not take into account any limits on the server process itself.
CpuAbsolute float64 `json:"cpu_absolute"`
// The current disk space being used by the server. This is cached to prevent slow lookup
// issues on frequent refreshes.
// Disk int64 `json:"disk_bytes"`
// Current network transmit in & out for a container.
Network struct {
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
} `json:"network"`
}
// Resets the usages values to zero, used when a server is stopped to ensure we don't hold
// onto any values incorrectly.
func (s *Stats) Empty() {
s.mu.Lock()
defer s.mu.Unlock()
s.Memory = 0
s.CpuAbsolute = 0
s.Network.TxBytes = 0
s.Network.RxBytes = 0
}

View File

@ -1,7 +1,9 @@
package server package server
import ( import (
"encoding/json"
"github.com/apex/log" "github.com/apex/log"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/events"
@ -10,17 +12,19 @@ import (
// Adds all of the internal event listeners we want to use for a server. // Adds all of the internal event listeners we want to use for a server.
func (s *Server) StartEventListeners() { func (s *Server) StartEventListeners() {
consoleChannel := make(chan events.Event) console := make(chan events.Event)
stateChannel := make(chan events.Event) state := make(chan events.Event)
stats := make(chan events.Event)
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, consoleChannel) s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console)
s.Environment.Events().Subscribe(environment.StateChangeEvent, stateChannel) s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
// TODO: this is leaky I imagine since the routines aren't destroyed when the server is? // TODO: this is leaky I imagine since the routines aren't destroyed when the server is?
go func() { go func() {
for { for {
select { select {
case data := <-consoleChannel: case data := <-console:
// Immediately emit this event back over the server event stream since it is // Immediately emit this event back over the server event stream since it is
// being called from the environment event stream and things probably aren't // being called from the environment event stream and things probably aren't
// listening to that event. // listening to that event.
@ -28,9 +32,28 @@ func (s *Server) StartEventListeners() {
// Also pass the data along to the console output channel. // Also pass the data along to the console output channel.
s.onConsoleOutput(data.Data) s.onConsoleOutput(data.Data)
case data := <-stateChannel: case data := <-state:
s.SetState(data.Data) s.SetState(data.Data)
case data := <-stats:
st := new(environment.Stats)
if err := json.Unmarshal([]byte(data.Data), st); err != nil {
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
continue
}
// Update the server resource tracking object with the resources we got here.
s.resources.mu.Lock()
s.resources.Stats = *st
s.resources.mu.Unlock()
// TODO: we'll need to handle this better since calling it in rapid succession will
// cause it to block until the first call is done calculating disk usage, which will
// case stat events to pile up for the server.
s.Filesystem.HasSpaceAvailable()
// Emit the event to the websocket.
b, _ := json.Marshal(s.Proc())
s.Events().Publish(StatsEvent, string(b))
} }
} }
}() }()

View File

@ -1,8 +1,7 @@
package server package server
import ( import (
"github.com/docker/docker/api/types" "github.com/pterodactyl/wings/environment"
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -13,33 +12,15 @@ import (
type ResourceUsage struct { type ResourceUsage struct {
mu sync.RWMutex mu sync.RWMutex
// Embed the current environment stats into this server specific resource usage struct.
environment.Stats
// The current server status. // The current server status.
State string `json:"state" default:"offline"` State string `json:"state" default:"offline"`
// The total amount of memory, in bytes, that this server instance is consuming. This is
// calculated slightly differently than just using the raw Memory field that the stats
// return from the container, so please check the code setting this value for how that
// is calculated.
Memory uint64 `json:"memory_bytes"`
// The total amount of memory this container or resource can use. Inside Docker this is
// going to be higher than you'd expect because we're automatically allocating overhead
// abilities for the container, so its not going to be a perfect match.
MemoryLimit uint64 `json:"memory_limit_bytes"`
// The absolute CPU usage is the amount of CPU used in relation to the entire system and
// does not take into account any limits on the server process itself.
CpuAbsolute float64 `json:"cpu_absolute"`
// The current disk space being used by the server. This is cached to prevent slow lookup // The current disk space being used by the server. This is cached to prevent slow lookup
// issues on frequent refreshes. // issues on frequent refreshes.
Disk int64 `json:"disk_bytes"` Disk int64 `json:"disk_bytes"`
// Current network transmit in & out for a container.
Network struct {
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
} `json:"network"`
} }
// Returns the resource usage stats for the server instance. If the server is not running, only the // Returns the resource usage stats for the server instance. If the server is not running, only the
@ -69,81 +50,6 @@ func (ru *ResourceUsage) setInternalState(state string) {
ru.mu.Unlock() ru.mu.Unlock()
} }
// Resets the usages values to zero, used when a server is stopped to ensure we don't hold
// onto any values incorrectly.
func (ru *ResourceUsage) Empty() {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.Memory = 0
ru.CpuAbsolute = 0
ru.Network.TxBytes = 0
ru.Network.RxBytes = 0
}
func (ru *ResourceUsage) SetDisk(i int64) { func (ru *ResourceUsage) SetDisk(i int64) {
ru.mu.Lock() atomic.SwapInt64(&ru.Disk, i)
defer ru.mu.Unlock()
ru.Disk = i
}
func (ru *ResourceUsage) UpdateFromDocker(v *types.StatsJSON) {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.CpuAbsolute = ru.calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats)
ru.Memory = ru.calculateDockerMemory(v.MemoryStats)
ru.MemoryLimit = v.MemoryStats.Limit
}
func (ru *ResourceUsage) UpdateNetworkBytes(nw *types.NetworkStats) {
atomic.AddUint64(&ru.Network.RxBytes, nw.RxBytes)
atomic.AddUint64(&ru.Network.TxBytes, nw.TxBytes)
}
// The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage
// value which can be rather confusing to people trying to compare panel usage to
// their stats output.
//
// This math is straight up lifted from their CLI repository in order to show the same
// values to avoid people bothering me about it. It should also reflect a slightly more
// correct memory value anyways.
//
// @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249
func (ru *ResourceUsage) calculateDockerMemory(stats types.MemoryStats) uint64 {
if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage {
return stats.Usage - v
}
if v := stats.Stats["inactive_file"]; v < stats.Usage {
return stats.Usage - v
}
return stats.Usage
}
// Calculates the absolute CPU usage used by the server process on the system, not constrained
// by the defined CPU limits on the container.
//
// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166
func (ru *ResourceUsage) calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
// Calculate the change in CPU usage between the current and previous reading.
cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage)
// Calculate the change for the entire system's CPU usage between current and previous reading.
systemDelta := float64(stats.SystemUsage) - float64(pStats.SystemUsage)
// Calculate the total number of CPU cores being used.
cpus := float64(stats.OnlineCPUs)
if cpus == 0.0 {
cpus = float64(len(stats.CPUUsage.PercpuUsage))
}
percent := 0.0
if systemDelta > 0.0 && cpuDelta > 0.0 {
percent = (cpuDelta / systemDelta) * cpus * 100.0
}
return math.Round(percent*1000) / 1000
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/system"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -99,6 +100,15 @@ func (s *Server) SetState(state string) error {
} }
}() }()
// Reset the resource usage to 0 when the process fully stops so that all of the UI
// views in the Panel correctly display 0.
if state == system.ProcessOfflineState {
s.resources.Empty()
b, _ := json.Marshal(s.Proc())
s.Events().Publish(StatsEvent, string(b))
}
// If server was in an online state, and is now in an offline state we should handle // If server was in an online state, and is now in an offline state we should handle
// that as a crash event. In that scenario, check the last crash time, and the crash // that as a crash event. In that scenario, check the last crash time, and the crash
// counter. // counter.