From 0cbaad5c729b6177aad356caa67151ac2b6ff49b Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 19 Jul 2020 16:27:55 -0700 Subject: [PATCH] Completely re-work the server configuration to be separated out better --- cmd/root.go | 10 +- installer/installer.go | 31 ++-- router/router_server.go | 4 +- router/websocket/websocket.go | 14 +- server/allocations.go | 17 ++ server/build_settings.go | 72 +++++++++ server/configuration.go | 84 ++++++++++ server/crash.go | 30 +++- server/environment_docker.go | 60 ++++--- server/filesystem.go | 10 +- server/filesystem_unarchive.go | 4 +- server/loader.go | 119 ++++++++++++++ server/resources.go | 57 ++++++- server/server.go | 287 +++------------------------------ server/state.go | 33 ++-- server/update.go | 72 +++++---- 16 files changed, 495 insertions(+), 409 deletions(-) create mode 100644 server/allocations.go create mode 100644 server/build_settings.go create mode 100644 server/configuration.go create mode 100644 server/loader.go diff --git a/cmd/root.go b/cmd/root.go index 60b5d31..d992e19 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -181,17 +181,13 @@ func rootCmdRun(*cobra.Command, []string) { wg.Add() go func(s *server.Server) { - // Required for tracing purposes. - var err error + defer wg.Done() - defer func() { - s.Log().Trace("ensuring server environment exists").Stop(&err) - wg.Done() - }() + s.Log().Info("ensuring server environment exists") // Create a server environment if none exists currently. This allows us to recover from Docker // being reinstalled on the host system for example. - if err = s.Environment.Create(); err != nil { + if err := s.Environment.Create(); err != nil { s.Log().WithField("error", err).Error("failed to process environment") } diff --git a/installer/installer.go b/installer/installer.go index 37ce0cb..6ca95fd 100644 --- a/installer/installer.go +++ b/installer/installer.go @@ -29,12 +29,10 @@ func New(data []byte) (*Installer, error) { return nil, NewValidationError("service egg provided was not in a valid format") } - s := &server.Server{ + cfg := &server.Configuration{ Uuid: getString(data, "uuid"), Suspended: false, - State: server.ProcessOfflineState, Invocation: getString(data, "invocation"), - EnvVars: make(server.EnvironmentVariables), Build: server.BuildSettings{ MemoryLimit: getInt(data, "build", "memory"), Swap: getInt(data, "build", "swap"), @@ -43,20 +41,18 @@ func New(data []byte) (*Installer, error) { DiskSpace: getInt(data, "build", "disk"), Threads: getString(data, "build", "threads"), }, - Allocations: server.Allocations{ - Mappings: make(map[string][]int), - }, + CrashDetectionEnabled: true, } - s.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip") - s.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port")) + cfg.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip") + cfg.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port")) // Unmarshal the environment variables from the request into the server struct. if b, _, _, err := jsonparser.Get(data, "environment"); err != nil { return nil, errors.WithStack(err) } else { - s.EnvVars = make(server.EnvironmentVariables) - if err := json.Unmarshal(b, &s.EnvVars); err != nil { + cfg.EnvVars = make(server.EnvironmentVariables) + if err := json.Unmarshal(b, &cfg.EnvVars); err != nil { return nil, errors.WithStack(err) } } @@ -65,15 +61,15 @@ func New(data []byte) (*Installer, error) { if b, _, _, err := jsonparser.Get(data, "allocations", "mappings"); err != nil { return nil, errors.WithStack(err) } else { - s.Allocations.Mappings = make(map[string][]int) - if err := json.Unmarshal(b, &s.Allocations.Mappings); err != nil { + cfg.Allocations.Mappings = make(map[string][]int) + if err := json.Unmarshal(b, &cfg.Allocations.Mappings); err != nil { return nil, errors.WithStack(err) } } - s.Container.Image = getString(data, "container", "image") + cfg.Container.Image = getString(data, "container", "image") - c, rerr, err := api.NewRequester().GetServerConfiguration(s.Uuid) + c, rerr, err := api.NewRequester().GetServerConfiguration(cfg.Uuid) if err != nil || rerr != nil { if err != nil { return nil, errors.WithStack(err) @@ -82,15 +78,12 @@ func New(data []byte) (*Installer, error) { return nil, errors.New(rerr.String()) } - // Destroy the temporary server instance. - s = nil - // Create a new server instance using the configuration we wrote to the disk // so that everything gets instantiated correctly on the struct. - s2, err := server.FromConfiguration(c) + s, err := server.FromConfiguration(c) return &Installer{ - server: s2, + server: s, }, err } diff --git a/router/router_server.go b/router/router_server.go index 87c0b67..d8c47a4 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -64,7 +64,7 @@ func postServerPower(c *gin.Context) { // // We don't really care about any of the other actions at this point, they'll all result // in the process being stopped, which should have happened anyways if the server is suspended. - if (data.Action == "start" || data.Action == "restart") && s.Suspended { + if (data.Action == "start" || data.Action == "restart") && s.IsSuspended() { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "Cannot start or restart a server that is suspended.", }) @@ -162,7 +162,7 @@ func deleteServer(c *gin.Context) { // Immediately suspend the server to prevent a user from attempting // to start it while this process is running. - s.Suspended = true + s.Config().SetSuspended(true) // If the server is currently installing, abort it. if s.IsInstalling() { diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index b321119..5b0cf3c 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -247,19 +247,7 @@ func (h *Handler) HandleInbound(m Message) error { if state == server.ProcessOfflineState { _ = h.server.Filesystem.HasSpaceAvailable() - h.server.Resources.RLock() - defer h.server.Resources.RUnlock() - - resources := server.ResourceUsage{ - Memory: 0, - MemoryLimit: 0, - CpuAbsolute: 0.0, - Disk: h.server.Resources.Disk, - } - resources.Network.RxBytes = 0 - resources.Network.TxBytes = 0 - - b, _ := json.Marshal(resources) + b, _ := json.Marshal(h.server.Proc()) h.SendJson(&Message{ Event: server.StatsEvent, Args: []string{string(b)}, diff --git a/server/allocations.go b/server/allocations.go new file mode 100644 index 0000000..ac51a0d --- /dev/null +++ b/server/allocations.go @@ -0,0 +1,17 @@ +package server + +// Defines the allocations available for a given server. When using the Docker environment +// driver these correspond to mappings for the container that allow external connections. +type Allocations struct { + // Defines the default allocation that should be used for this server. This is + // what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration + // files or the startup arguments for a server. + DefaultMapping struct { + Ip string `json:"ip"` + Port int `json:"port"` + } `json:"default"` + + // Mappings contains all of the ports that should be assigned to a given server + // attached to the IP they correspond to. + Mappings map[string][]int `json:"mappings"` +} \ No newline at end of file diff --git a/server/build_settings.go b/server/build_settings.go new file mode 100644 index 0000000..357aecf --- /dev/null +++ b/server/build_settings.go @@ -0,0 +1,72 @@ +package server + +import "math" + +// The build settings for a given server that impact docker container creation and +// resource limits for a server instance. +type BuildSettings struct { + // The total amount of memory in megabytes that this server is allowed to + // use on the host system. + MemoryLimit int64 `json:"memory_limit"` + + // The amount of additional swap space to be provided to a container instance. + Swap int64 `json:"swap"` + + // The relative weight for IO operations in a container. This is relative to other + // containers on the system and should be a value between 10 and 1000. + IoWeight uint16 `json:"io_weight"` + + // The percentage of CPU that this instance is allowed to consume relative to + // the host. A value of 200% represents complete utilization of two cores. This + // should be a value between 1 and THREAD_COUNT * 100. + CpuLimit int64 `json:"cpu_limit"` + + // The amount of disk space in megabytes that a server is allowed to use. + DiskSpace int64 `json:"disk_space"` + + // Sets which CPU threads can be used by the docker instance. + Threads string `json:"threads"` +} + +func (s *Server) Build() *BuildSettings { + return &s.Config().Build +} + +// Converts the CPU limit for a server build into a number that can be better understood +// by the Docker environment. If there is no limit set, return -1 which will indicate to +// Docker that it has unlimited CPU quota. +func (b *BuildSettings) ConvertedCpuLimit() int64 { + if b.CpuLimit == 0 { + return -1 + } + + return b.CpuLimit * 1000 +} + +// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to +// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use +// 15%. This avoids unexpected crashes from processes like Java which run over the limit. +func (b *BuildSettings) MemoryOverheadMultiplier() float64 { + if b.MemoryLimit <= 2048 { + return 1.15 + } else if b.MemoryLimit <= 4096 { + return 1.10 + } + + return 1.05 +} + +func (b *BuildSettings) BoundedMemoryLimit() int64 { + return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000)) +} + +// Returns the amount of swap available as a total in bytes. This is returned as the amount +// of memory available to the server initially, PLUS the amount of additional swap to include +// which is the format used by Docker. +func (b *BuildSettings) ConvertedSwap() int64 { + if b.Swap < 0 { + return -1 + } + + return (b.Swap * 1_000_000) + b.BoundedMemoryLimit() +} diff --git a/server/configuration.go b/server/configuration.go new file mode 100644 index 0000000..3cf39b4 --- /dev/null +++ b/server/configuration.go @@ -0,0 +1,84 @@ +package server + +import ( + "fmt" + "strconv" + "sync" +) + +type EnvironmentVariables map[string]interface{} + +// Ugly hacky function to handle environment variables that get passed through as not-a-string +// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a +// string wasn't passed through you'd cause a crash or the server to become unavailable. For now +// try to handle the most likely values from the JSON and hope for the best. +func (ev EnvironmentVariables) Get(key string) string { + val, ok := ev[key] + if !ok { + return "" + } + + switch val.(type) { + case int: + return strconv.Itoa(val.(int)) + case int32: + return strconv.FormatInt(val.(int64), 10) + case int64: + return strconv.FormatInt(val.(int64), 10) + case float32: + return fmt.Sprintf("%f", val.(float32)) + case float64: + return fmt.Sprintf("%f", val.(float64)) + case bool: + return strconv.FormatBool(val.(bool)) + } + + return val.(string) +} + +type Configuration struct { + mu sync.RWMutex + + // The unique identifier for the server that should be used when referencing + // it against the Panel API (and internally). This will be used when naming + // docker containers as well as in log output. + Uuid string `json:"uuid"` + + // Whether or not the server is in a suspended state. Suspended servers cannot + // be started or modified except in certain scenarios by an admin user. + Suspended bool `json:"suspended"` + + // The command that should be used when booting up the server instance. + Invocation string `json:"invocation"` + + // An array of environment variables that should be passed along to the running + // server process. + EnvVars EnvironmentVariables `json:"environment"` + + Allocations Allocations `json:"allocations"` + Build BuildSettings `json:"build"` + CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"` + Mounts []Mount `json:"mounts"` + Resources ResourceUsage `json:"resources"` + + Container struct { + // Defines the Docker image that will be used for this server + Image string `json:"image,omitempty"` + // If set to true, OOM killer will be disabled on the server's Docker container. + // If not present (nil) we will default to disabling it. + OomDisabled bool `default:"true" json:"oom_disabled"` + } `json:"container,omitempty"` +} + +func (s *Server) Config() *Configuration { + s.cfg.mu.RLock() + defer s.cfg.mu.RUnlock() + + return &s.cfg +} + +func (c *Configuration) SetSuspended(s bool) { + c.mu.Lock() + c.Suspended = s + c.mu.Unlock() +} diff --git a/server/crash.go b/server/crash.go index 649580f..3eb2ba4 100644 --- a/server/crash.go +++ b/server/crash.go @@ -4,18 +4,32 @@ import ( "fmt" "github.com/pkg/errors" "github.com/pterodactyl/wings/config" + "sync" "time" ) -type CrashDetection struct { - // If set to false, the system will not listen for crash detection events that - // can indicate that the server stopped unexpectedly. - Enabled bool `default:"true" json:"enabled" yaml:"enabled"` +type CrashHandler struct { + mu sync.RWMutex // Tracks the time of the last server crash event. lastCrash time.Time } +// Returns the time of the last crash for this server instance. +func (cd *CrashHandler) LastCrashTime() time.Time { + cd.mu.RLock() + defer cd.mu.RUnlock() + + return cd.lastCrash +} + +// Sets the last crash time for a server. +func (cd *CrashHandler) SetLastCrash(t time.Time) { + cd.mu.Lock() + cd.lastCrash = t + cd.mu.Unlock() +} + // Looks at the environment exit state to determine if the process exited cleanly or // if it was the result of an event that we should try to recover from. // @@ -30,8 +44,8 @@ func (s *Server) handleServerCrash() error { // No point in doing anything here if the server isn't currently offline, there // is no reason to do a crash detection event. If the server crash detection is // disabled we want to skip anything after this as well. - if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled { - if !s.CrashDetection.Enabled { + if s.GetState() != ProcessOfflineState || !s.Config().CrashDetectionEnabled { + if !s.Config().CrashDetectionEnabled { s.Log().Debug("server triggered crash detection but handler is disabled for server process") s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.") @@ -57,7 +71,7 @@ func (s *Server) handleServerCrash() error { s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode)) s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) - c := s.CrashDetection.lastCrash + c := s.crasher.LastCrashTime() // If the last crash time was within the last 60 seconds we do not want to perform // an automatic reboot of the process. Return an error that can be handled. if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) { @@ -66,7 +80,7 @@ func (s *Server) handleServerCrash() error { return &crashTooFrequent{} } - s.CrashDetection.lastCrash = time.Now() + s.crasher.SetLastCrash(time.Now()) return s.Environment.Start() } \ No newline at end of file diff --git a/server/environment_docker.go b/server/environment_docker.go index 27a702e..fd2dda6 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -23,7 +23,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" ) @@ -223,7 +222,7 @@ func (d *DockerEnvironment) Start() error { // Theoretically you'd have the Panel handle all of this logic, but we cannot do that // because we allow the websocket to control the server power state as well, so we'll // need to handle that action in here. - if d.Server.Suspended { + if d.Server.IsSuspended() { return &suspendedError{} } @@ -604,22 +603,16 @@ func (d *DockerEnvironment) EnableResourcePolling() error { return } - s.Resources.Lock() - s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(&v.PreCPUStats, &v.CPUStats) - s.Resources.Memory = s.Resources.CalculateDockerMemory(v.MemoryStats) - s.Resources.MemoryLimit = v.MemoryStats.Limit - s.Resources.Unlock() + s.Proc().UpdateFromDocker(v) + for _, nw := range v.Networks { + s.Proc().UpdateNetworkBytes(&nw) + } // Why you ask? This already has the logic for caching disk space in use and then // also handles pushing that value to the resources object automatically. s.Filesystem.HasSpaceAvailable() - for _, nw := range v.Networks { - atomic.AddUint64(&s.Resources.Network.RxBytes, nw.RxBytes) - atomic.AddUint64(&s.Resources.Network.TxBytes, nw.TxBytes) - } - - b, _ := json.Marshal(s.Resources) + b, _ := json.Marshal(s.Proc()) s.Events().Publish(StatsEvent, string(b)) } }(d.Server) @@ -634,12 +627,16 @@ func (d *DockerEnvironment) DisableResourcePolling() error { } err := d.stats.Close() - - d.Server.Resources.Empty() + d.Server.Proc().Empty() return errors.WithStack(err) } +// Returns the image to be used for the instance. +func (d *DockerEnvironment) Image() string { + return d.Server.Config().Container.Image +} + // Pulls the image from Docker. If there is an error while pulling the image from the source // but the image already exists locally, we will report that error to the logger but continue // with the process. @@ -657,7 +654,7 @@ func (d *DockerEnvironment) ensureImageExists() error { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) defer cancel() - out, err := d.Client.ImagePull(ctx, d.Server.Container.Image, types.ImagePullOptions{All: false}) + out, err := d.Client.ImagePull(ctx, d.Image(), types.ImagePullOptions{All: false}) if err != nil { images, ierr := d.Client.ImageList(ctx, types.ImageListOptions{}) if ierr != nil { @@ -668,12 +665,12 @@ func (d *DockerEnvironment) ensureImageExists() error { for _, img := range images { for _, t := range img.RepoTags { - if t != d.Server.Container.Image { + if t != d.Image() { continue } d.Server.Log().WithFields(log.Fields{ - "image": d.Server.Container.Image, + "image": d.Image(), "error": errors.New(err.Error()), }).Warn("unable to pull requested image from remote source, however the image exists locally") @@ -687,7 +684,7 @@ func (d *DockerEnvironment) ensureImageExists() error { } defer out.Close() - log.WithField("image", d.Server.Container.Image).Debug("pulling docker image... this could take a bit of time") + log.WithField("image", d.Image()).Debug("pulling docker image... this could take a bit of time") // I'm not sure what the best approach here is, but this will block execution until the image // is done being pulled, which is what we need. @@ -734,12 +731,9 @@ func (d *DockerEnvironment) Create() error { AttachStderr: true, OpenStdin: true, Tty: true, - ExposedPorts: d.exposedPorts(), - - Image: d.Server.Container.Image, - Env: d.Server.GetEnvironmentVariables(), - + Image: d.Image(), + Env: d.Server.GetEnvironmentVariables(), Labels: map[string]string{ "Service": "Pterodactyl", "ContainerType": "server_process", @@ -756,7 +750,7 @@ func (d *DockerEnvironment) Create() error { } var mounted bool - for _, m := range d.Server.Mounts { + for _, m := range d.Server.Config().Mounts { mounted = false source := filepath.Clean(m.Source) target := filepath.Clean(m.Target) @@ -931,7 +925,7 @@ func (d *DockerEnvironment) parseLogToStrings(b []byte) ([]string, error) { func (d *DockerEnvironment) portBindings() nat.PortMap { var out = nat.PortMap{} - for ip, ports := range d.Server.Allocations.Mappings { + for ip, ports := range d.Server.Config().Allocations.Mappings { for _, port := range ports { // Skip over invalid ports. if port < 0 || port > 65535 { @@ -981,14 +975,14 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet { // the same or higher than the memory limit. func (d *DockerEnvironment) getResourcesForServer() container.Resources { return container.Resources{ - Memory: d.Server.Build.BoundedMemoryLimit(), - MemoryReservation: d.Server.Build.MemoryLimit * 1_000_000, - MemorySwap: d.Server.Build.ConvertedSwap(), - CPUQuota: d.Server.Build.ConvertedCpuLimit(), + Memory: d.Server.Build().BoundedMemoryLimit(), + MemoryReservation: d.Server.Build().MemoryLimit * 1_000_000, + MemorySwap: d.Server.Build().ConvertedSwap(), + CPUQuota: d.Server.Build().ConvertedCpuLimit(), CPUPeriod: 100_000, CPUShares: 1024, - BlkioWeight: d.Server.Build.IoWeight, - OomKillDisable: &d.Server.Container.OomDisabled, - CpusetCpus: d.Server.Build.Threads, + BlkioWeight: d.Server.Build().IoWeight, + OomKillDisable: &d.Server.Config().Container.OomDisabled, + CpusetCpus: d.Server.Build().Threads, } } diff --git a/server/filesystem.go b/server/filesystem.go index 545fe87..b2115cd 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -208,7 +208,7 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) { // Because determining the amount of space being used by a server is a taxing operation we // will load it all up into a cache and pull from that as long as the key is not expired. func (fs *Filesystem) HasSpaceAvailable() bool { - space := fs.Server.Build.DiskSpace + space := fs.Server.Build().DiskSpace size, err := fs.getCachedDiskUsage() if err != nil { @@ -217,9 +217,7 @@ func (fs *Filesystem) HasSpaceAvailable() bool { // Determine if their folder size, in bytes, is smaller than the amount of space they've // been allocated. - fs.Server.Resources.Lock() - fs.Server.Resources.Disk = size - fs.Server.Resources.Unlock() + fs.Server.Proc().SetDisk(size) // If space is -1 or 0 just return true, means they're allowed unlimited. // @@ -247,7 +245,7 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) { fs.cacheDiskMu.Lock() defer fs.cacheDiskMu.Unlock() - if x, exists := fs.Server.Cache.Get("disk_used"); exists { + if x, exists := fs.Server.cache.Get("disk_used"); exists { return x.(int64), nil } @@ -260,7 +258,7 @@ func (fs *Filesystem) getCachedDiskUsage() (int64, error) { // Always cache the size, even if there is an error. We want to always return that value // so that we don't cause an endless loop of determining the disk size if there is a temporary // error encountered. - fs.Server.Cache.Set("disk_used", size, time.Second*60) + fs.Server.cache.Set("disk_used", size, time.Second*60) return size, err } diff --git a/server/filesystem_unarchive.go b/server/filesystem_unarchive.go index 7197554..9e40c28 100644 --- a/server/filesystem_unarchive.go +++ b/server/filesystem_unarchive.go @@ -21,7 +21,7 @@ import ( func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (bool, error) { // Don't waste time trying to determine this if we know the server will have the space for // it since there is no limit. - if fs.Server.Build.DiskSpace <= 0 { + if fs.Server.Build().DiskSpace <= 0 { return true, nil } @@ -60,7 +60,7 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b wg.Wait() - return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build.DiskSpace, cErr + return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build().DiskSpace, cErr } // Decompress a file in a given directory by using the archiver tool to infer the file diff --git a/server/loader.go b/server/loader.go new file mode 100644 index 0000000..55b1a32 --- /dev/null +++ b/server/loader.go @@ -0,0 +1,119 @@ +package server + +import ( + "github.com/apex/log" + "github.com/creasty/defaults" + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + "github.com/pterodactyl/wings/api" + "github.com/pterodactyl/wings/config" + "github.com/remeh/sizedwaitgroup" + "time" +) + +var servers = NewCollection(nil) + +func GetServers() *Collection { + return servers +} + +// Iterates over a given directory and loads all of the servers listed before returning +// them to the calling function. +func LoadDirectory() error { + if len(servers.items) != 0 { + return errors.New("cannot call LoadDirectory with a non-nil collection") + } + + // We could theoretically use a standard wait group here, however doing + // that introduces the potential to crash the program due to too many + // open files. This wouldn't happen on a small setup, but once the daemon is + // handling many servers you run that risk. + // + // For now just process 10 files at a time, that should be plenty fast to + // read and parse the YAML. We should probably make this configurable down + // the road to help big instances scale better. + wg := sizedwaitgroup.New(10) + + configs, rerr, err := api.NewRequester().GetAllServerConfigurations() + if err != nil || rerr != nil { + if err != nil { + return errors.WithStack(err) + } + + return errors.New(rerr.String()) + } + + states, err := getServerStates() + if err != nil { + return errors.WithStack(err) + } + + for uuid, data := range configs { + wg.Add() + + go func(uuid string, data *api.ServerConfigurationResponse) { + defer wg.Done() + + s, err := FromConfiguration(data) + if err != nil { + log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...") + return + } + + if state, exists := states[s.Id()]; exists { + s.SetState(state) + s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file") + } + + servers.Add(s) + }(uuid, data) + } + + // Wait until we've processed all of the configuration files in the directory + // before continuing. + wg.Wait() + + return nil +} + +// Initializes a server using a data byte array. This will be marshaled into the +// given struct using a YAML marshaler. This will also configure the given environment +// for a server. +func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { + cfg := Configuration{} + if err := defaults.Set(&cfg); err != nil { + return nil, err + } + + s := new(Server) + s.cfg = cfg + + if err := s.UpdateDataStructure(data.Settings, false); err != nil { + return nil, err + } + + s.AddEventListeners() + + // Right now we only support a Docker based environment, so I'm going to hard code + // this logic in. When we're ready to support other environment we'll need to make + // some modifications here obviously. + if err := NewDockerEnvironment(s); err != nil { + return nil, err + } + + s.cache = cache.New(time.Minute*10, time.Minute*15) + s.Archiver = Archiver{ + Server: s, + } + s.Filesystem = Filesystem{ + Configuration: &config.Get().System, + Server: s, + } + + // Forces the configuration to be synced with the panel. + if err := s.SyncWithConfiguration(data); err != nil { + return nil, err + } + + return s, nil +} diff --git a/server/resources.go b/server/resources.go index b42258f..5ded93c 100644 --- a/server/resources.go +++ b/server/resources.go @@ -4,29 +4,37 @@ import ( "github.com/docker/docker/api/types" "math" "sync" + "sync/atomic" ) // Defines the current resource usage for a given server instance. If a server is offline you // should obviously expect memory and CPU usage to be 0. However, disk will always be returned // since that is not dependent on the server being running to collect that data. type ResourceUsage struct { - sync.RWMutex + mu sync.RWMutex + + // The current server status. + State string `json:"state" default:"offline"` // The total amount of memory, in bytes, that this server instance is consuming. This is // calculated slightly differently than just using the raw Memory field that the stats // return from the container, so please check the code setting this value for how that // is calculated. Memory uint64 `json:"memory_bytes"` + // The total amount of memory this container or resource can use. Inside Docker this is // going to be higher than you'd expect because we're automatically allocating overhead // abilities for the container, so its not going to be a perfect match. MemoryLimit uint64 `json:"memory_limit_bytes"` + // The absolute CPU usage is the amount of CPU used in relation to the entire system and // does not take into account any limits on the server process itself. CpuAbsolute float64 `json:"cpu_absolute"` + // The current disk space being used by the server. This is cached to prevent slow lookup // issues on frequent refreshes. Disk int64 `json:"disk_bytes"` + // Current network transmit in & out for a container. Network struct { RxBytes uint64 `json:"rx_bytes"` @@ -34,11 +42,29 @@ type ResourceUsage struct { } `json:"network"` } +// Returns the resource usage stats for the server instance. If the server is not running, only the +// disk space currently used will be returned. When the server is running all of the other stats will +// be returned. +// +// When a process is stopped all of the stats are zeroed out except for the disk. +func (s *Server) Proc() *ResourceUsage { + s.resources.mu.RLock() + defer s.resources.mu.RUnlock() + + return &s.resources +} + +func (ru *ResourceUsage) setInternalState(state string) { + ru.mu.Lock() + ru.State = state + ru.mu.Unlock() +} + // Resets the usages values to zero, used when a server is stopped to ensure we don't hold // onto any values incorrectly. func (ru *ResourceUsage) Empty() { - ru.Lock() - defer ru.Unlock() + ru.mu.Lock() + defer ru.mu.Unlock() ru.Memory = 0 ru.CpuAbsolute = 0 @@ -46,6 +72,27 @@ func (ru *ResourceUsage) Empty() { ru.Network.RxBytes = 0 } +func (ru *ResourceUsage) SetDisk(i int64) { + ru.mu.Lock() + defer ru.mu.Unlock() + + ru.Disk = i +} + +func (ru *ResourceUsage) UpdateFromDocker(v *types.StatsJSON) { + ru.mu.Lock() + defer ru.mu.Unlock() + + ru.CpuAbsolute = ru.calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats) + ru.Memory = ru.calculateDockerMemory(v.MemoryStats) + ru.MemoryLimit = v.MemoryStats.Limit +} + +func (ru *ResourceUsage) UpdateNetworkBytes(nw *types.NetworkStats) { + atomic.AddUint64(&ru.Network.RxBytes, nw.RxBytes) + atomic.AddUint64(&ru.Network.TxBytes, nw.TxBytes) +} + // The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage // value which can be rather confusing to people trying to compare panel usage to // their stats output. @@ -55,7 +102,7 @@ func (ru *ResourceUsage) Empty() { // correct memory value anyways. // // @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249 -func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 { +func (ru *ResourceUsage) calculateDockerMemory(stats types.MemoryStats) uint64 { if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage { return stats.Usage - v } @@ -71,7 +118,7 @@ func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 { // by the defined CPU limits on the container. // // @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166 -func (ru *ResourceUsage) CalculateAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 { +func (ru *ResourceUsage) calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 { // Calculate the change in CPU usage between the current and previous reading. cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage) diff --git a/server/server.go b/server/server.go index 85eff71..df0ae60 100644 --- a/server/server.go +++ b/server/server.go @@ -4,99 +4,42 @@ import ( "context" "fmt" "github.com/apex/log" - "github.com/creasty/defaults" "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/pterodactyl/wings/api" - "github.com/pterodactyl/wings/config" - "github.com/remeh/sizedwaitgroup" "golang.org/x/sync/semaphore" - "math" "os" - "strconv" "strings" "sync" "time" ) -var servers *Collection - -func GetServers() *Collection { - return servers -} - -type EnvironmentVariables map[string]interface{} - -// Ugly hacky function to handle environment variables that get passed through as not-a-string -// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a -// string wasn't passed through you'd cause a crash or the server to become unavailable. For now -// try to handle the most likely values from the JSON and hope for the best. -func (ev EnvironmentVariables) Get(key string) string { - val, ok := ev[key] - if !ok { - return "" - } - - switch val.(type) { - case int: - return strconv.Itoa(val.(int)) - case int32: - return strconv.FormatInt(val.(int64), 10) - case int64: - return strconv.FormatInt(val.(int64), 10) - case float32: - return fmt.Sprintf("%f", val.(float32)) - case float64: - return fmt.Sprintf("%f", val.(float64)) - case bool: - return strconv.FormatBool(val.(bool)) - } - - return val.(string) -} - // High level definition for a server instance being controlled by Wings. type Server struct { + // Internal mutex used to block actions that need to occur sequentially, such as + // writing the configuration to the disk. + sync.RWMutex + // The unique identifier for the server that should be used when referencing // it against the Panel API (and internally). This will be used when naming // docker containers as well as in log output. - Uuid string `json:"uuid"` + Uuid string `json:"-"` - // Whether or not the server is in a suspended state. Suspended servers cannot - // be started or modified except in certain scenarios by an admin user. - Suspended bool `json:"suspended"` + // Maintains the configuration for the server. This is the data that gets returned by the Panel + // such as build settings and container images. + cfg Configuration - // The power state of the server. - State string `default:"offline" json:"state"` - - // The command that should be used when booting up the server instance. - Invocation string `json:"invocation"` - - // An array of environment variables that should be passed along to the running - // server process. - EnvVars EnvironmentVariables `json:"environment"` - - Allocations Allocations `json:"allocations"` - Build BuildSettings `json:"build"` - CrashDetection CrashDetection `json:"crash_detection"` - Mounts []Mount `json:"mounts"` - Resources ResourceUsage `json:"resources"` + // The crash handler for this server instance. + crasher CrashHandler + resources ResourceUsage Archiver Archiver `json:"-"` Environment Environment `json:"-"` Filesystem Filesystem `json:"-"` - Container struct { - // Defines the Docker image that will be used for this server - Image string `json:"image,omitempty"` - // If set to true, OOM killer will be disabled on the server's Docker container. - // If not present (nil) we will default to disabling it. - OomDisabled bool `default:"true" json:"oom_disabled"` - } `json:"container,omitempty"` - // Server cache used to store frequently requested information in memory and make // certain long operations return faster. For example, FS disk space usage. - Cache *cache.Cache `json:"-"` + cache *cache.Cache // Events emitted by the server instance. emitter *EventBus @@ -111,10 +54,6 @@ type Server struct { // installation process, for example when a server is deleted from the panel while the // installer process is still running. installer InstallerDetails - - // Internal mutex used to block actions that need to occur sequentially, such as - // writing the configuration to the disk. - sync.RWMutex } type InstallerDetails struct { @@ -127,190 +66,9 @@ type InstallerDetails struct { sem *semaphore.Weighted } -// The build settings for a given server that impact docker container creation and -// resource limits for a server instance. -type BuildSettings struct { - // The total amount of memory in megabytes that this server is allowed to - // use on the host system. - MemoryLimit int64 `json:"memory_limit"` - - // The amount of additional swap space to be provided to a container instance. - Swap int64 `json:"swap"` - - // The relative weight for IO operations in a container. This is relative to other - // containers on the system and should be a value between 10 and 1000. - IoWeight uint16 `json:"io_weight"` - - // The percentage of CPU that this instance is allowed to consume relative to - // the host. A value of 200% represents complete utilization of two cores. This - // should be a value between 1 and THREAD_COUNT * 100. - CpuLimit int64 `json:"cpu_limit"` - - // The amount of disk space in megabytes that a server is allowed to use. - DiskSpace int64 `json:"disk_space"` - - // Sets which CPU threads can be used by the docker instance. - Threads string `json:"threads"` -} - +// Returns the UUID for the server instance. func (s *Server) Id() string { - s.RLock() - defer s.RUnlock() - - return s.Uuid -} - -// Converts the CPU limit for a server build into a number that can be better understood -// by the Docker environment. If there is no limit set, return -1 which will indicate to -// Docker that it has unlimited CPU quota. -func (b *BuildSettings) ConvertedCpuLimit() int64 { - if b.CpuLimit == 0 { - return -1 - } - - return b.CpuLimit * 1000 -} - -// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to -// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use -// 15%. This avoids unexpected crashes from processes like Java which run over the limit. -func (b *BuildSettings) MemoryOverheadMultiplier() float64 { - if b.MemoryLimit <= 2048 { - return 1.15 - } else if b.MemoryLimit <= 4096 { - return 1.10 - } - - return 1.05 -} - -func (b *BuildSettings) BoundedMemoryLimit() int64 { - return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000)) -} - -// Returns the amount of swap available as a total in bytes. This is returned as the amount -// of memory available to the server initially, PLUS the amount of additional swap to include -// which is the format used by Docker. -func (b *BuildSettings) ConvertedSwap() int64 { - if b.Swap < 0 { - return -1 - } - - return (b.Swap * 1_000_000) + b.BoundedMemoryLimit() -} - -// Defines the allocations available for a given server. When using the Docker environment -// driver these correspond to mappings for the container that allow external connections. -type Allocations struct { - // Defines the default allocation that should be used for this server. This is - // what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration - // files or the startup arguments for a server. - DefaultMapping struct { - Ip string `json:"ip"` - Port int `json:"port"` - } `json:"default"` - - // Mappings contains all of the ports that should be assigned to a given server - // attached to the IP they correspond to. - Mappings map[string][]int `json:"mappings"` -} - -// Iterates over a given directory and loads all of the servers listed before returning -// them to the calling function. -func LoadDirectory() error { - // We could theoretically use a standard wait group here, however doing - // that introduces the potential to crash the program due to too many - // open files. This wouldn't happen on a small setup, but once the daemon is - // handling many servers you run that risk. - // - // For now just process 10 files at a time, that should be plenty fast to - // read and parse the YAML. We should probably make this configurable down - // the road to help big instances scale better. - wg := sizedwaitgroup.New(10) - - configs, rerr, err := api.NewRequester().GetAllServerConfigurations() - if err != nil || rerr != nil { - if err != nil { - return errors.WithStack(err) - } - - return errors.New(rerr.String()) - } - - states, err := getServerStates() - if err != nil { - return errors.WithStack(err) - } - - servers = NewCollection(nil) - - for uuid, data := range configs { - wg.Add() - - go func(uuid string, data *api.ServerConfigurationResponse) { - defer wg.Done() - - s, err := FromConfiguration(data) - if err != nil { - log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...") - return - } - - if state, exists := states[s.Uuid]; exists { - s.SetState(state) - s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file") - } - - servers.Add(s) - }(uuid, data) - } - - // Wait until we've processed all of the configuration files in the directory - // before continuing. - wg.Wait() - - return nil -} - -// Initializes a server using a data byte array. This will be marshaled into the -// given struct using a YAML marshaler. This will also configure the given environment -// for a server. -func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { - s := new(Server) - - if err := defaults.Set(s); err != nil { - return nil, err - } - - if err := s.UpdateDataStructure(data.Settings, false); err != nil { - return nil, err - } - - s.AddEventListeners() - - // Right now we only support a Docker based environment, so I'm going to hard code - // this logic in. When we're ready to support other environment we'll need to make - // some modifications here obviously. - if err := NewDockerEnvironment(s); err != nil { - return nil, err - } - - s.Cache = cache.New(time.Minute*10, time.Minute*15) - s.Archiver = Archiver{ - Server: s, - } - s.Filesystem = Filesystem{ - Configuration: &config.Get().System, - Server: s, - } - s.Resources = ResourceUsage{} - - // Forces the configuration to be synced with the panel. - if err := s.SyncWithConfiguration(data); err != nil { - return nil, err - } - - return s, nil + return s.Config().Uuid } // Returns all of the environment variables that should be assigned to a running @@ -320,21 +78,21 @@ func (s *Server) GetEnvironmentVariables() []string { var out = []string{ fmt.Sprintf("TZ=%s", zone), - fmt.Sprintf("STARTUP=%s", s.Invocation), - fmt.Sprintf("SERVER_MEMORY=%d", s.Build.MemoryLimit), - fmt.Sprintf("SERVER_IP=%s", s.Allocations.DefaultMapping.Ip), - fmt.Sprintf("SERVER_PORT=%d", s.Allocations.DefaultMapping.Port), + fmt.Sprintf("STARTUP=%s", s.Config().Invocation), + fmt.Sprintf("SERVER_MEMORY=%d", s.Build().MemoryLimit), + fmt.Sprintf("SERVER_IP=%s", s.Config().Allocations.DefaultMapping.Ip), + fmt.Sprintf("SERVER_PORT=%d", s.Config().Allocations.DefaultMapping.Port), } eloop: - for k := range s.EnvVars { + for k := range s.Config().EnvVars { for _, e := range out { if strings.HasPrefix(e, strings.ToUpper(k)) { continue eloop } } - out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.EnvVars.Get(k))) + out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.Config().EnvVars.Get(k))) } return out @@ -420,3 +178,8 @@ func (s *Server) HandlePowerAction(action PowerAction) error { return errors.New("an invalid power action was provided") } } + +// Checks if the server is marked as being suspended or not on the system. +func (s *Server) IsSuspended() bool { + return s.Config().Suspended +} diff --git a/server/state.go b/server/state.go index 033e267..aa24306 100644 --- a/server/state.go +++ b/server/state.go @@ -13,6 +13,13 @@ import ( var stateMutex sync.Mutex +const ( + ProcessOfflineState = "offline" + ProcessStartingState = "starting" + ProcessRunningState = "running" + ProcessStoppingState = "stopping" +) + // Returns the state of the servers. func getServerStates() (map[string]string, error) { // Request a lock after we check if the file exists. @@ -60,13 +67,6 @@ func saveServerStates() error { return nil } -const ( - ProcessOfflineState = "offline" - ProcessStartingState = "starting" - ProcessRunningState = "running" - ProcessStoppingState = "stopping" -) - // Sets the state of the server internally. This function handles crash detection as // well as reporting to event listeners for the server. func (s *Server) SetState(state string) error { @@ -76,16 +76,14 @@ func (s *Server) SetState(state string) error { prevState := s.GetState() - // Obtain a mutex lock and update the current state of the server. - s.Lock() - s.State = state + // Update the currently tracked state for the server. + s.Proc().setInternalState(state) // Emit the event to any listeners that are currently registered. - s.Log().WithField("status", s.State).Debug("saw server status change event") - s.Events().Publish(StatusEvent, s.State) - - // Release the lock as it is no longer needed for the following actions. - s.Unlock() + if prevState != state { + s.Log().WithField("status", s.Proc().State).Debug("saw server status change event") + s.Events().Publish(StatusEvent, s.Proc().State) + } // Persist this change to the disk immediately so that should the Daemon be stopped or // crash we can immediately restore the server state. @@ -128,10 +126,7 @@ func (s *Server) SetState(state string) error { // Returns the current state of the server in a race-safe manner. func (s *Server) GetState() string { - s.RLock() - defer s.RUnlock() - - return s.State + return s.Proc().State } // Determines if the server state is running or not. This is different than the diff --git a/server/update.go b/server/update.go index 00d9eed..750da12 100644 --- a/server/update.go +++ b/server/update.go @@ -15,7 +15,7 @@ import ( // it is up to the specific environment to determine what needs to happen when // that is the case. func (s *Server) UpdateDataStructure(data []byte, background bool) error { - src := new(Server) + src := new(Configuration) if err := json.Unmarshal(data, src); err != nil { return errors.WithStack(err) } @@ -27,26 +27,30 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { return errors.New("attempting to merge a data stack with an invalid UUID") } - s.Lock() + // Grab a copy of the configuration to work on. + c := s.Config() + + // Lock the server configuration while we're doing this merge to avoid anything + // trying to overwrite it or make modifications while we're sorting out what we + // need to do. + s.cfg.mu.Lock() + defer s.cfg.mu.Unlock() + // Merge the new data object that we have received with the existing server data object // and then save it to the disk so it is persistent. - if err := mergo.Merge(s, src, mergo.WithOverride); err != nil { + if err := mergo.Merge(c, src, mergo.WithOverride); err != nil { return errors.WithStack(err) } - // Mergo makes that previous lock disappear. Handle that by just re-locking the object. - s.Lock() - defer s.Unlock() - // Don't explode if we're setting CPU limits to 0. Mergo sees that as an empty value // so it won't override the value we've passed through in the API call. However, we can // safely assume that we're passing through valid data structures here. I foresee this // backfiring at some point, but until then... // // We'll go ahead and do this with swap as well. - s.Build.CpuLimit = src.Build.CpuLimit - s.Build.Swap = src.Build.Swap - s.Build.DiskSpace = src.Build.DiskSpace + c.Build.CpuLimit = src.Build.CpuLimit + c.Build.Swap = src.Build.Swap + c.Build.DiskSpace = src.Build.DiskSpace // Mergo can't quite handle this boolean value correctly, so for now we'll just // handle this edge case manually since none of the other data passed through in this @@ -56,7 +60,7 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { return errors.WithStack(err) } } else { - s.Container.OomDisabled = v + c.Container.OomDisabled = v } // Mergo also cannot handle this boolean value. @@ -65,25 +69,29 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { return errors.WithStack(err) } } else { - s.Suspended = v + c.Suspended = v } // Environment and Mappings should be treated as a full update at all times, never a // true patch, otherwise we can't know what we're passing along. if src.EnvVars != nil && len(src.EnvVars) > 0 { - s.EnvVars = src.EnvVars + c.EnvVars = src.EnvVars } if src.Allocations.Mappings != nil && len(src.Allocations.Mappings) > 0 { - s.Allocations.Mappings = src.Allocations.Mappings + c.Allocations.Mappings = src.Allocations.Mappings } if src.Mounts != nil && len(src.Mounts) > 0 { - s.Mounts = src.Mounts + c.Mounts = src.Mounts } + // Update the configuration once we have a lock on the configuration object. + s.cfg = *c + s.Uuid = c.Uuid + if background { - s.runBackgroundActions() + go s.runBackgroundActions() } return nil @@ -96,24 +104,22 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { // These tasks run in independent threads where relevant to speed up any updates // that need to happen. func (s *Server) runBackgroundActions() { - // Update the environment in place, allowing memory and CPU usage to be adjusted - // on the fly without the user needing to reboot (theoretically). - go func(server *Server) { - server.Log().Info("performing server limit modification on-the-fly") - if err := server.Environment.InSituUpdate(); err != nil { - server.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment") - } - }(s) - - // Check if the server is now suspended, and if so and the process is not terminated + // Check if the s is now suspended, and if so and the process is not terminated // yet, do it immediately. - go func(server *Server) { - if server.Suspended && server.GetState() != ProcessOfflineState { - server.Log().Info("server suspended with running process state, terminating now") + if s.IsSuspended() && s.GetState() != ProcessOfflineState { + s.Log().Info("server suspended with running process state, terminating now") - if err := server.Environment.WaitForStop(10, true); err != nil { - server.Log().WithField("error", err).Warn("failed to terminate server environment after suspension") - } + if err := s.Environment.WaitForStop(10, true); err != nil { + s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension") } - }(s) + } + + if !s.IsSuspended() { + // Update the environment in place, allowing memory and CPU usage to be adjusted + // on the fly without the user needing to reboot (theoretically). + s.Log().Info("performing server limit modification on-the-fly") + if err := s.Environment.InSituUpdate(); err != nil { + s.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment") + } + } }