Completely re-work the server configuration to be separated out better

This commit is contained in:
Dane Everitt 2020-07-19 16:27:55 -07:00
parent a00288aa64
commit 0cbaad5c72
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
16 changed files with 495 additions and 409 deletions

View File

@ -181,17 +181,13 @@ func rootCmdRun(*cobra.Command, []string) {
wg.Add()
go func(s *server.Server) {
// Required for tracing purposes.
var err error
defer wg.Done()
defer func() {
s.Log().Trace("ensuring server environment exists").Stop(&err)
wg.Done()
}()
s.Log().Info("ensuring server environment exists")
// Create a server environment if none exists currently. This allows us to recover from Docker
// being reinstalled on the host system for example.
if err = s.Environment.Create(); err != nil {
if err := s.Environment.Create(); err != nil {
s.Log().WithField("error", err).Error("failed to process environment")
}

View File

@ -29,12 +29,10 @@ func New(data []byte) (*Installer, error) {
return nil, NewValidationError("service egg provided was not in a valid format")
}
s := &server.Server{
cfg := &server.Configuration{
Uuid: getString(data, "uuid"),
Suspended: false,
State: server.ProcessOfflineState,
Invocation: getString(data, "invocation"),
EnvVars: make(server.EnvironmentVariables),
Build: server.BuildSettings{
MemoryLimit: getInt(data, "build", "memory"),
Swap: getInt(data, "build", "swap"),
@ -43,20 +41,18 @@ func New(data []byte) (*Installer, error) {
DiskSpace: getInt(data, "build", "disk"),
Threads: getString(data, "build", "threads"),
},
Allocations: server.Allocations{
Mappings: make(map[string][]int),
},
CrashDetectionEnabled: true,
}
s.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip")
s.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port"))
cfg.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip")
cfg.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port"))
// Unmarshal the environment variables from the request into the server struct.
if b, _, _, err := jsonparser.Get(data, "environment"); err != nil {
return nil, errors.WithStack(err)
} else {
s.EnvVars = make(server.EnvironmentVariables)
if err := json.Unmarshal(b, &s.EnvVars); err != nil {
cfg.EnvVars = make(server.EnvironmentVariables)
if err := json.Unmarshal(b, &cfg.EnvVars); err != nil {
return nil, errors.WithStack(err)
}
}
@ -65,15 +61,15 @@ func New(data []byte) (*Installer, error) {
if b, _, _, err := jsonparser.Get(data, "allocations", "mappings"); err != nil {
return nil, errors.WithStack(err)
} else {
s.Allocations.Mappings = make(map[string][]int)
if err := json.Unmarshal(b, &s.Allocations.Mappings); err != nil {
cfg.Allocations.Mappings = make(map[string][]int)
if err := json.Unmarshal(b, &cfg.Allocations.Mappings); err != nil {
return nil, errors.WithStack(err)
}
}
s.Container.Image = getString(data, "container", "image")
cfg.Container.Image = getString(data, "container", "image")
c, rerr, err := api.NewRequester().GetServerConfiguration(s.Uuid)
c, rerr, err := api.NewRequester().GetServerConfiguration(cfg.Uuid)
if err != nil || rerr != nil {
if err != nil {
return nil, errors.WithStack(err)
@ -82,15 +78,12 @@ func New(data []byte) (*Installer, error) {
return nil, errors.New(rerr.String())
}
// Destroy the temporary server instance.
s = nil
// Create a new server instance using the configuration we wrote to the disk
// so that everything gets instantiated correctly on the struct.
s2, err := server.FromConfiguration(c)
s, err := server.FromConfiguration(c)
return &Installer{
server: s2,
server: s,
}, err
}

View File

@ -64,7 +64,7 @@ func postServerPower(c *gin.Context) {
//
// We don't really care about any of the other actions at this point, they'll all result
// in the process being stopped, which should have happened anyways if the server is suspended.
if (data.Action == "start" || data.Action == "restart") && s.Suspended {
if (data.Action == "start" || data.Action == "restart") && s.IsSuspended() {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "Cannot start or restart a server that is suspended.",
})
@ -162,7 +162,7 @@ func deleteServer(c *gin.Context) {
// Immediately suspend the server to prevent a user from attempting
// to start it while this process is running.
s.Suspended = true
s.Config().SetSuspended(true)
// If the server is currently installing, abort it.
if s.IsInstalling() {

View File

@ -247,19 +247,7 @@ func (h *Handler) HandleInbound(m Message) error {
if state == server.ProcessOfflineState {
_ = h.server.Filesystem.HasSpaceAvailable()
h.server.Resources.RLock()
defer h.server.Resources.RUnlock()
resources := server.ResourceUsage{
Memory: 0,
MemoryLimit: 0,
CpuAbsolute: 0.0,
Disk: h.server.Resources.Disk,
}
resources.Network.RxBytes = 0
resources.Network.TxBytes = 0
b, _ := json.Marshal(resources)
b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{
Event: server.StatsEvent,
Args: []string{string(b)},

17
server/allocations.go Normal file
View File

@ -0,0 +1,17 @@
package server
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}

72
server/build_settings.go Normal file
View File

@ -0,0 +1,72 @@
package server
import "math"
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type BuildSettings struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
}
func (s *Server) Build() *BuildSettings {
return &s.Config().Build
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (b *BuildSettings) ConvertedCpuLimit() int64 {
if b.CpuLimit == 0 {
return -1
}
return b.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (b *BuildSettings) MemoryOverheadMultiplier() float64 {
if b.MemoryLimit <= 2048 {
return 1.15
} else if b.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (b *BuildSettings) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (b *BuildSettings) ConvertedSwap() int64 {
if b.Swap < 0 {
return -1
}
return (b.Swap * 1_000_000) + b.BoundedMemoryLimit()
}

84
server/configuration.go Normal file
View File

@ -0,0 +1,84 @@
package server
import (
"fmt"
"strconv"
"sync"
)
type EnvironmentVariables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (ev EnvironmentVariables) Get(key string) string {
val, ok := ev[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}
type Configuration struct {
mu sync.RWMutex
// The unique identifier for the server that should be used when referencing
// it against the Panel API (and internally). This will be used when naming
// docker containers as well as in log output.
Uuid string `json:"uuid"`
// Whether or not the server is in a suspended state. Suspended servers cannot
// be started or modified except in certain scenarios by an admin user.
Suspended bool `json:"suspended"`
// The command that should be used when booting up the server instance.
Invocation string `json:"invocation"`
// An array of environment variables that should be passed along to the running
// server process.
EnvVars EnvironmentVariables `json:"environment"`
Allocations Allocations `json:"allocations"`
Build BuildSettings `json:"build"`
CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"`
Mounts []Mount `json:"mounts"`
Resources ResourceUsage `json:"resources"`
Container struct {
// Defines the Docker image that will be used for this server
Image string `json:"image,omitempty"`
// If set to true, OOM killer will be disabled on the server's Docker container.
// If not present (nil) we will default to disabling it.
OomDisabled bool `default:"true" json:"oom_disabled"`
} `json:"container,omitempty"`
}
func (s *Server) Config() *Configuration {
s.cfg.mu.RLock()
defer s.cfg.mu.RUnlock()
return &s.cfg
}
func (c *Configuration) SetSuspended(s bool) {
c.mu.Lock()
c.Suspended = s
c.mu.Unlock()
}

View File

@ -4,18 +4,32 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config"
"sync"
"time"
)
type CrashDetection struct {
// If set to false, the system will not listen for crash detection events that
// can indicate that the server stopped unexpectedly.
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
type CrashHandler struct {
mu sync.RWMutex
// Tracks the time of the last server crash event.
lastCrash time.Time
}
// Returns the time of the last crash for this server instance.
func (cd *CrashHandler) LastCrashTime() time.Time {
cd.mu.RLock()
defer cd.mu.RUnlock()
return cd.lastCrash
}
// Sets the last crash time for a server.
func (cd *CrashHandler) SetLastCrash(t time.Time) {
cd.mu.Lock()
cd.lastCrash = t
cd.mu.Unlock()
}
// Looks at the environment exit state to determine if the process exited cleanly or
// if it was the result of an event that we should try to recover from.
//
@ -30,8 +44,8 @@ func (s *Server) handleServerCrash() error {
// No point in doing anything here if the server isn't currently offline, there
// is no reason to do a crash detection event. If the server crash detection is
// disabled we want to skip anything after this as well.
if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled {
if !s.CrashDetection.Enabled {
if s.GetState() != ProcessOfflineState || !s.Config().CrashDetectionEnabled {
if !s.Config().CrashDetectionEnabled {
s.Log().Debug("server triggered crash detection but handler is disabled for server process")
s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.")
@ -57,7 +71,7 @@ func (s *Server) handleServerCrash() error {
s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode))
s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled))
c := s.CrashDetection.lastCrash
c := s.crasher.LastCrashTime()
// If the last crash time was within the last 60 seconds we do not want to perform
// an automatic reboot of the process. Return an error that can be handled.
if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) {
@ -66,7 +80,7 @@ func (s *Server) handleServerCrash() error {
return &crashTooFrequent{}
}
s.CrashDetection.lastCrash = time.Now()
s.crasher.SetLastCrash(time.Now())
return s.Environment.Start()
}

View File

@ -23,7 +23,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -223,7 +222,7 @@ func (d *DockerEnvironment) Start() error {
// Theoretically you'd have the Panel handle all of this logic, but we cannot do that
// because we allow the websocket to control the server power state as well, so we'll
// need to handle that action in here.
if d.Server.Suspended {
if d.Server.IsSuspended() {
return &suspendedError{}
}
@ -604,22 +603,16 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
return
}
s.Resources.Lock()
s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(&v.PreCPUStats, &v.CPUStats)
s.Resources.Memory = s.Resources.CalculateDockerMemory(v.MemoryStats)
s.Resources.MemoryLimit = v.MemoryStats.Limit
s.Resources.Unlock()
s.Proc().UpdateFromDocker(v)
for _, nw := range v.Networks {
s.Proc().UpdateNetworkBytes(&nw)
}
// Why you ask? This already has the logic for caching disk space in use and then
// also handles pushing that value to the resources object automatically.
s.Filesystem.HasSpaceAvailable()
for _, nw := range v.Networks {
atomic.AddUint64(&s.Resources.Network.RxBytes, nw.RxBytes)
atomic.AddUint64(&s.Resources.Network.TxBytes, nw.TxBytes)
}
b, _ := json.Marshal(s.Resources)
b, _ := json.Marshal(s.Proc())
s.Events().Publish(StatsEvent, string(b))
}
}(d.Server)
@ -634,12 +627,16 @@ func (d *DockerEnvironment) DisableResourcePolling() error {
}
err := d.stats.Close()
d.Server.Resources.Empty()
d.Server.Proc().Empty()
return errors.WithStack(err)
}
// Returns the image to be used for the instance.
func (d *DockerEnvironment) Image() string {
return d.Server.Config().Container.Image
}
// Pulls the image from Docker. If there is an error while pulling the image from the source
// but the image already exists locally, we will report that error to the logger but continue
// with the process.
@ -657,7 +654,7 @@ func (d *DockerEnvironment) ensureImageExists() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
defer cancel()
out, err := d.Client.ImagePull(ctx, d.Server.Container.Image, types.ImagePullOptions{All: false})
out, err := d.Client.ImagePull(ctx, d.Image(), types.ImagePullOptions{All: false})
if err != nil {
images, ierr := d.Client.ImageList(ctx, types.ImageListOptions{})
if ierr != nil {
@ -668,12 +665,12 @@ func (d *DockerEnvironment) ensureImageExists() error {
for _, img := range images {
for _, t := range img.RepoTags {
if t != d.Server.Container.Image {
if t != d.Image() {
continue
}
d.Server.Log().WithFields(log.Fields{
"image": d.Server.Container.Image,
"image": d.Image(),
"error": errors.New(err.Error()),
}).Warn("unable to pull requested image from remote source, however the image exists locally")
@ -687,7 +684,7 @@ func (d *DockerEnvironment) ensureImageExists() error {
}
defer out.Close()
log.WithField("image", d.Server.Container.Image).Debug("pulling docker image... this could take a bit of time")
log.WithField("image", d.Image()).Debug("pulling docker image... this could take a bit of time")
// I'm not sure what the best approach here is, but this will block execution until the image
// is done being pulled, which is what we need.
@ -734,12 +731,9 @@ func (d *DockerEnvironment) Create() error {
AttachStderr: true,
OpenStdin: true,
Tty: true,
ExposedPorts: d.exposedPorts(),
Image: d.Server.Container.Image,
Image: d.Image(),
Env: d.Server.GetEnvironmentVariables(),
Labels: map[string]string{
"Service": "Pterodactyl",
"ContainerType": "server_process",
@ -756,7 +750,7 @@ func (d *DockerEnvironment) Create() error {
}
var mounted bool
for _, m := range d.Server.Mounts {
for _, m := range d.Server.Config().Mounts {
mounted = false
source := filepath.Clean(m.Source)
target := filepath.Clean(m.Target)
@ -931,7 +925,7 @@ func (d *DockerEnvironment) parseLogToStrings(b []byte) ([]string, error) {
func (d *DockerEnvironment) portBindings() nat.PortMap {
var out = nat.PortMap{}
for ip, ports := range d.Server.Allocations.Mappings {
for ip, ports := range d.Server.Config().Allocations.Mappings {
for _, port := range ports {
// Skip over invalid ports.
if port < 0 || port > 65535 {
@ -981,14 +975,14 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet {
// the same or higher than the memory limit.
func (d *DockerEnvironment) getResourcesForServer() container.Resources {
return container.Resources{
Memory: d.Server.Build.BoundedMemoryLimit(),
MemoryReservation: d.Server.Build.MemoryLimit * 1_000_000,
MemorySwap: d.Server.Build.ConvertedSwap(),
CPUQuota: d.Server.Build.ConvertedCpuLimit(),
Memory: d.Server.Build().BoundedMemoryLimit(),
MemoryReservation: d.Server.Build().MemoryLimit * 1_000_000,
MemorySwap: d.Server.Build().ConvertedSwap(),
CPUQuota: d.Server.Build().ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: d.Server.Build.IoWeight,
OomKillDisable: &d.Server.Container.OomDisabled,
CpusetCpus: d.Server.Build.Threads,
BlkioWeight: d.Server.Build().IoWeight,
OomKillDisable: &d.Server.Config().Container.OomDisabled,
CpusetCpus: d.Server.Build().Threads,
}
}

View File

@ -208,7 +208,7 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) {
// Because determining the amount of space being used by a server is a taxing operation we
// will load it all up into a cache and pull from that as long as the key is not expired.
func (fs *Filesystem) HasSpaceAvailable() bool {
space := fs.Server.Build.DiskSpace
space := fs.Server.Build().DiskSpace
size, err := fs.getCachedDiskUsage()
if err != nil {
@ -217,9 +217,7 @@ func (fs *Filesystem) HasSpaceAvailable() bool {
// Determine if their folder size, in bytes, is smaller than the amount of space they've
// been allocated.
fs.Server.Resources.Lock()
fs.Server.Resources.Disk = size
fs.Server.Resources.Unlock()
fs.Server.Proc().SetDisk(size)
// If space is -1 or 0 just return true, means they're allowed unlimited.
//
@ -247,7 +245,7 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
fs.cacheDiskMu.Lock()
defer fs.cacheDiskMu.Unlock()
if x, exists := fs.Server.Cache.Get("disk_used"); exists {
if x, exists := fs.Server.cache.Get("disk_used"); exists {
return x.(int64), nil
}
@ -260,7 +258,7 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
// Always cache the size, even if there is an error. We want to always return that value
// so that we don't cause an endless loop of determining the disk size if there is a temporary
// error encountered.
fs.Server.Cache.Set("disk_used", size, time.Second*60)
fs.Server.cache.Set("disk_used", size, time.Second*60)
return size, err
}

View File

@ -21,7 +21,7 @@ import (
func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (bool, error) {
// Don't waste time trying to determine this if we know the server will have the space for
// it since there is no limit.
if fs.Server.Build.DiskSpace <= 0 {
if fs.Server.Build().DiskSpace <= 0 {
return true, nil
}
@ -60,7 +60,7 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
wg.Wait()
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build.DiskSpace, cErr
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build().DiskSpace, cErr
}
// Decompress a file in a given directory by using the archiver tool to infer the file

119
server/loader.go Normal file
View File

@ -0,0 +1,119 @@
package server
import (
"github.com/apex/log"
"github.com/creasty/defaults"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/remeh/sizedwaitgroup"
"time"
)
var servers = NewCollection(nil)
func GetServers() *Collection {
return servers
}
// Iterates over a given directory and loads all of the servers listed before returning
// them to the calling function.
func LoadDirectory() error {
if len(servers.items) != 0 {
return errors.New("cannot call LoadDirectory with a non-nil collection")
}
// We could theoretically use a standard wait group here, however doing
// that introduces the potential to crash the program due to too many
// open files. This wouldn't happen on a small setup, but once the daemon is
// handling many servers you run that risk.
//
// For now just process 10 files at a time, that should be plenty fast to
// read and parse the YAML. We should probably make this configurable down
// the road to help big instances scale better.
wg := sizedwaitgroup.New(10)
configs, rerr, err := api.NewRequester().GetAllServerConfigurations()
if err != nil || rerr != nil {
if err != nil {
return errors.WithStack(err)
}
return errors.New(rerr.String())
}
states, err := getServerStates()
if err != nil {
return errors.WithStack(err)
}
for uuid, data := range configs {
wg.Add()
go func(uuid string, data *api.ServerConfigurationResponse) {
defer wg.Done()
s, err := FromConfiguration(data)
if err != nil {
log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
if state, exists := states[s.Id()]; exists {
s.SetState(state)
s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file")
}
servers.Add(s)
}(uuid, data)
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
wg.Wait()
return nil
}
// Initializes a server using a data byte array. This will be marshaled into the
// given struct using a YAML marshaler. This will also configure the given environment
// for a server.
func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
cfg := Configuration{}
if err := defaults.Set(&cfg); err != nil {
return nil, err
}
s := new(Server)
s.cfg = cfg
if err := s.UpdateDataStructure(data.Settings, false); err != nil {
return nil, err
}
s.AddEventListeners()
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
if err := NewDockerEnvironment(s); err != nil {
return nil, err
}
s.cache = cache.New(time.Minute*10, time.Minute*15)
s.Archiver = Archiver{
Server: s,
}
s.Filesystem = Filesystem{
Configuration: &config.Get().System,
Server: s,
}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
return s, nil
}

View File

@ -4,29 +4,37 @@ import (
"github.com/docker/docker/api/types"
"math"
"sync"
"sync/atomic"
)
// Defines the current resource usage for a given server instance. If a server is offline you
// should obviously expect memory and CPU usage to be 0. However, disk will always be returned
// since that is not dependent on the server being running to collect that data.
type ResourceUsage struct {
sync.RWMutex
mu sync.RWMutex
// The current server status.
State string `json:"state" default:"offline"`
// The total amount of memory, in bytes, that this server instance is consuming. This is
// calculated slightly differently than just using the raw Memory field that the stats
// return from the container, so please check the code setting this value for how that
// is calculated.
Memory uint64 `json:"memory_bytes"`
// The total amount of memory this container or resource can use. Inside Docker this is
// going to be higher than you'd expect because we're automatically allocating overhead
// abilities for the container, so its not going to be a perfect match.
MemoryLimit uint64 `json:"memory_limit_bytes"`
// The absolute CPU usage is the amount of CPU used in relation to the entire system and
// does not take into account any limits on the server process itself.
CpuAbsolute float64 `json:"cpu_absolute"`
// The current disk space being used by the server. This is cached to prevent slow lookup
// issues on frequent refreshes.
Disk int64 `json:"disk_bytes"`
// Current network transmit in & out for a container.
Network struct {
RxBytes uint64 `json:"rx_bytes"`
@ -34,11 +42,29 @@ type ResourceUsage struct {
} `json:"network"`
}
// Returns the resource usage stats for the server instance. If the server is not running, only the
// disk space currently used will be returned. When the server is running all of the other stats will
// be returned.
//
// When a process is stopped all of the stats are zeroed out except for the disk.
func (s *Server) Proc() *ResourceUsage {
s.resources.mu.RLock()
defer s.resources.mu.RUnlock()
return &s.resources
}
func (ru *ResourceUsage) setInternalState(state string) {
ru.mu.Lock()
ru.State = state
ru.mu.Unlock()
}
// Resets the usages values to zero, used when a server is stopped to ensure we don't hold
// onto any values incorrectly.
func (ru *ResourceUsage) Empty() {
ru.Lock()
defer ru.Unlock()
ru.mu.Lock()
defer ru.mu.Unlock()
ru.Memory = 0
ru.CpuAbsolute = 0
@ -46,6 +72,27 @@ func (ru *ResourceUsage) Empty() {
ru.Network.RxBytes = 0
}
func (ru *ResourceUsage) SetDisk(i int64) {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.Disk = i
}
func (ru *ResourceUsage) UpdateFromDocker(v *types.StatsJSON) {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.CpuAbsolute = ru.calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats)
ru.Memory = ru.calculateDockerMemory(v.MemoryStats)
ru.MemoryLimit = v.MemoryStats.Limit
}
func (ru *ResourceUsage) UpdateNetworkBytes(nw *types.NetworkStats) {
atomic.AddUint64(&ru.Network.RxBytes, nw.RxBytes)
atomic.AddUint64(&ru.Network.TxBytes, nw.TxBytes)
}
// The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage
// value which can be rather confusing to people trying to compare panel usage to
// their stats output.
@ -55,7 +102,7 @@ func (ru *ResourceUsage) Empty() {
// correct memory value anyways.
//
// @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249
func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 {
func (ru *ResourceUsage) calculateDockerMemory(stats types.MemoryStats) uint64 {
if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage {
return stats.Usage - v
}
@ -71,7 +118,7 @@ func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 {
// by the defined CPU limits on the container.
//
// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166
func (ru *ResourceUsage) CalculateAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
func (ru *ResourceUsage) calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
// Calculate the change in CPU usage between the current and previous reading.
cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage)

View File

@ -4,99 +4,42 @@ import (
"context"
"fmt"
"github.com/apex/log"
"github.com/creasty/defaults"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/remeh/sizedwaitgroup"
"golang.org/x/sync/semaphore"
"math"
"os"
"strconv"
"strings"
"sync"
"time"
)
var servers *Collection
func GetServers() *Collection {
return servers
}
type EnvironmentVariables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (ev EnvironmentVariables) Get(key string) string {
val, ok := ev[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}
// High level definition for a server instance being controlled by Wings.
type Server struct {
// Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk.
sync.RWMutex
// The unique identifier for the server that should be used when referencing
// it against the Panel API (and internally). This will be used when naming
// docker containers as well as in log output.
Uuid string `json:"uuid"`
Uuid string `json:"-"`
// Whether or not the server is in a suspended state. Suspended servers cannot
// be started or modified except in certain scenarios by an admin user.
Suspended bool `json:"suspended"`
// Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images.
cfg Configuration
// The power state of the server.
State string `default:"offline" json:"state"`
// The command that should be used when booting up the server instance.
Invocation string `json:"invocation"`
// An array of environment variables that should be passed along to the running
// server process.
EnvVars EnvironmentVariables `json:"environment"`
Allocations Allocations `json:"allocations"`
Build BuildSettings `json:"build"`
CrashDetection CrashDetection `json:"crash_detection"`
Mounts []Mount `json:"mounts"`
Resources ResourceUsage `json:"resources"`
// The crash handler for this server instance.
crasher CrashHandler
resources ResourceUsage
Archiver Archiver `json:"-"`
Environment Environment `json:"-"`
Filesystem Filesystem `json:"-"`
Container struct {
// Defines the Docker image that will be used for this server
Image string `json:"image,omitempty"`
// If set to true, OOM killer will be disabled on the server's Docker container.
// If not present (nil) we will default to disabling it.
OomDisabled bool `default:"true" json:"oom_disabled"`
} `json:"container,omitempty"`
// Server cache used to store frequently requested information in memory and make
// certain long operations return faster. For example, FS disk space usage.
Cache *cache.Cache `json:"-"`
cache *cache.Cache
// Events emitted by the server instance.
emitter *EventBus
@ -111,10 +54,6 @@ type Server struct {
// installation process, for example when a server is deleted from the panel while the
// installer process is still running.
installer InstallerDetails
// Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk.
sync.RWMutex
}
type InstallerDetails struct {
@ -127,190 +66,9 @@ type InstallerDetails struct {
sem *semaphore.Weighted
}
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type BuildSettings struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
}
// Returns the UUID for the server instance.
func (s *Server) Id() string {
s.RLock()
defer s.RUnlock()
return s.Uuid
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (b *BuildSettings) ConvertedCpuLimit() int64 {
if b.CpuLimit == 0 {
return -1
}
return b.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (b *BuildSettings) MemoryOverheadMultiplier() float64 {
if b.MemoryLimit <= 2048 {
return 1.15
} else if b.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (b *BuildSettings) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (b *BuildSettings) ConvertedSwap() int64 {
if b.Swap < 0 {
return -1
}
return (b.Swap * 1_000_000) + b.BoundedMemoryLimit()
}
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}
// Iterates over a given directory and loads all of the servers listed before returning
// them to the calling function.
func LoadDirectory() error {
// We could theoretically use a standard wait group here, however doing
// that introduces the potential to crash the program due to too many
// open files. This wouldn't happen on a small setup, but once the daemon is
// handling many servers you run that risk.
//
// For now just process 10 files at a time, that should be plenty fast to
// read and parse the YAML. We should probably make this configurable down
// the road to help big instances scale better.
wg := sizedwaitgroup.New(10)
configs, rerr, err := api.NewRequester().GetAllServerConfigurations()
if err != nil || rerr != nil {
if err != nil {
return errors.WithStack(err)
}
return errors.New(rerr.String())
}
states, err := getServerStates()
if err != nil {
return errors.WithStack(err)
}
servers = NewCollection(nil)
for uuid, data := range configs {
wg.Add()
go func(uuid string, data *api.ServerConfigurationResponse) {
defer wg.Done()
s, err := FromConfiguration(data)
if err != nil {
log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
if state, exists := states[s.Uuid]; exists {
s.SetState(state)
s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file")
}
servers.Add(s)
}(uuid, data)
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
wg.Wait()
return nil
}
// Initializes a server using a data byte array. This will be marshaled into the
// given struct using a YAML marshaler. This will also configure the given environment
// for a server.
func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
s := new(Server)
if err := defaults.Set(s); err != nil {
return nil, err
}
if err := s.UpdateDataStructure(data.Settings, false); err != nil {
return nil, err
}
s.AddEventListeners()
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
if err := NewDockerEnvironment(s); err != nil {
return nil, err
}
s.Cache = cache.New(time.Minute*10, time.Minute*15)
s.Archiver = Archiver{
Server: s,
}
s.Filesystem = Filesystem{
Configuration: &config.Get().System,
Server: s,
}
s.Resources = ResourceUsage{}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
return s, nil
return s.Config().Uuid
}
// Returns all of the environment variables that should be assigned to a running
@ -320,21 +78,21 @@ func (s *Server) GetEnvironmentVariables() []string {
var out = []string{
fmt.Sprintf("TZ=%s", zone),
fmt.Sprintf("STARTUP=%s", s.Invocation),
fmt.Sprintf("SERVER_MEMORY=%d", s.Build.MemoryLimit),
fmt.Sprintf("SERVER_IP=%s", s.Allocations.DefaultMapping.Ip),
fmt.Sprintf("SERVER_PORT=%d", s.Allocations.DefaultMapping.Port),
fmt.Sprintf("STARTUP=%s", s.Config().Invocation),
fmt.Sprintf("SERVER_MEMORY=%d", s.Build().MemoryLimit),
fmt.Sprintf("SERVER_IP=%s", s.Config().Allocations.DefaultMapping.Ip),
fmt.Sprintf("SERVER_PORT=%d", s.Config().Allocations.DefaultMapping.Port),
}
eloop:
for k := range s.EnvVars {
for k := range s.Config().EnvVars {
for _, e := range out {
if strings.HasPrefix(e, strings.ToUpper(k)) {
continue eloop
}
}
out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.EnvVars.Get(k)))
out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.Config().EnvVars.Get(k)))
}
return out
@ -420,3 +178,8 @@ func (s *Server) HandlePowerAction(action PowerAction) error {
return errors.New("an invalid power action was provided")
}
}
// Checks if the server is marked as being suspended or not on the system.
func (s *Server) IsSuspended() bool {
return s.Config().Suspended
}

View File

@ -13,6 +13,13 @@ import (
var stateMutex sync.Mutex
const (
ProcessOfflineState = "offline"
ProcessStartingState = "starting"
ProcessRunningState = "running"
ProcessStoppingState = "stopping"
)
// Returns the state of the servers.
func getServerStates() (map[string]string, error) {
// Request a lock after we check if the file exists.
@ -60,13 +67,6 @@ func saveServerStates() error {
return nil
}
const (
ProcessOfflineState = "offline"
ProcessStartingState = "starting"
ProcessRunningState = "running"
ProcessStoppingState = "stopping"
)
// Sets the state of the server internally. This function handles crash detection as
// well as reporting to event listeners for the server.
func (s *Server) SetState(state string) error {
@ -76,16 +76,14 @@ func (s *Server) SetState(state string) error {
prevState := s.GetState()
// Obtain a mutex lock and update the current state of the server.
s.Lock()
s.State = state
// Update the currently tracked state for the server.
s.Proc().setInternalState(state)
// Emit the event to any listeners that are currently registered.
s.Log().WithField("status", s.State).Debug("saw server status change event")
s.Events().Publish(StatusEvent, s.State)
// Release the lock as it is no longer needed for the following actions.
s.Unlock()
if prevState != state {
s.Log().WithField("status", s.Proc().State).Debug("saw server status change event")
s.Events().Publish(StatusEvent, s.Proc().State)
}
// Persist this change to the disk immediately so that should the Daemon be stopped or
// crash we can immediately restore the server state.
@ -128,10 +126,7 @@ func (s *Server) SetState(state string) error {
// Returns the current state of the server in a race-safe manner.
func (s *Server) GetState() string {
s.RLock()
defer s.RUnlock()
return s.State
return s.Proc().State
}
// Determines if the server state is running or not. This is different than the

View File

@ -15,7 +15,7 @@ import (
// it is up to the specific environment to determine what needs to happen when
// that is the case.
func (s *Server) UpdateDataStructure(data []byte, background bool) error {
src := new(Server)
src := new(Configuration)
if err := json.Unmarshal(data, src); err != nil {
return errors.WithStack(err)
}
@ -27,26 +27,30 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.New("attempting to merge a data stack with an invalid UUID")
}
s.Lock()
// Grab a copy of the configuration to work on.
c := s.Config()
// Lock the server configuration while we're doing this merge to avoid anything
// trying to overwrite it or make modifications while we're sorting out what we
// need to do.
s.cfg.mu.Lock()
defer s.cfg.mu.Unlock()
// Merge the new data object that we have received with the existing server data object
// and then save it to the disk so it is persistent.
if err := mergo.Merge(s, src, mergo.WithOverride); err != nil {
if err := mergo.Merge(c, src, mergo.WithOverride); err != nil {
return errors.WithStack(err)
}
// Mergo makes that previous lock disappear. Handle that by just re-locking the object.
s.Lock()
defer s.Unlock()
// Don't explode if we're setting CPU limits to 0. Mergo sees that as an empty value
// so it won't override the value we've passed through in the API call. However, we can
// safely assume that we're passing through valid data structures here. I foresee this
// backfiring at some point, but until then...
//
// We'll go ahead and do this with swap as well.
s.Build.CpuLimit = src.Build.CpuLimit
s.Build.Swap = src.Build.Swap
s.Build.DiskSpace = src.Build.DiskSpace
c.Build.CpuLimit = src.Build.CpuLimit
c.Build.Swap = src.Build.Swap
c.Build.DiskSpace = src.Build.DiskSpace
// Mergo can't quite handle this boolean value correctly, so for now we'll just
// handle this edge case manually since none of the other data passed through in this
@ -56,7 +60,7 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.WithStack(err)
}
} else {
s.Container.OomDisabled = v
c.Container.OomDisabled = v
}
// Mergo also cannot handle this boolean value.
@ -65,25 +69,29 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.WithStack(err)
}
} else {
s.Suspended = v
c.Suspended = v
}
// Environment and Mappings should be treated as a full update at all times, never a
// true patch, otherwise we can't know what we're passing along.
if src.EnvVars != nil && len(src.EnvVars) > 0 {
s.EnvVars = src.EnvVars
c.EnvVars = src.EnvVars
}
if src.Allocations.Mappings != nil && len(src.Allocations.Mappings) > 0 {
s.Allocations.Mappings = src.Allocations.Mappings
c.Allocations.Mappings = src.Allocations.Mappings
}
if src.Mounts != nil && len(src.Mounts) > 0 {
s.Mounts = src.Mounts
c.Mounts = src.Mounts
}
// Update the configuration once we have a lock on the configuration object.
s.cfg = *c
s.Uuid = c.Uuid
if background {
s.runBackgroundActions()
go s.runBackgroundActions()
}
return nil
@ -96,24 +104,22 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
// These tasks run in independent threads where relevant to speed up any updates
// that need to happen.
func (s *Server) runBackgroundActions() {
// Check if the s is now suspended, and if so and the process is not terminated
// yet, do it immediately.
if s.IsSuspended() && s.GetState() != ProcessOfflineState {
s.Log().Info("server suspended with running process state, terminating now")
if err := s.Environment.WaitForStop(10, true); err != nil {
s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension")
}
}
if !s.IsSuspended() {
// Update the environment in place, allowing memory and CPU usage to be adjusted
// on the fly without the user needing to reboot (theoretically).
go func(server *Server) {
server.Log().Info("performing server limit modification on-the-fly")
if err := server.Environment.InSituUpdate(); err != nil {
server.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment")
}
}(s)
// Check if the server is now suspended, and if so and the process is not terminated
// yet, do it immediately.
go func(server *Server) {
if server.Suspended && server.GetState() != ProcessOfflineState {
server.Log().Info("server suspended with running process state, terminating now")
if err := server.Environment.WaitForStop(10, true); err != nil {
server.Log().WithField("error", err).Warn("failed to terminate server environment after suspension")
s.Log().Info("performing server limit modification on-the-fly")
if err := s.Environment.InSituUpdate(); err != nil {
s.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment")
}
}
}(s)
}