Refactor environment handling logic to separate a server from the environment handler itself

This change makes the environment handling logic execute independent of the server itself and should make it much easier for people to contribute changes and additional environment handlers down the road without polluting the server object even more.

There is still a lot of work to do on this front to make things easier to work with, and there are some questionable design decisions at play I'm sure.

Welcome to additional modifications and cleanup to make this code easier to reason about and work with.
This commit is contained in:
Dane Everitt 2020-08-10 21:38:42 -07:00 committed by GitHub
parent 2c8cad2410
commit cc52954a2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1669 additions and 1350 deletions

View File

@ -47,20 +47,22 @@ func (olm *OutputLineMatcher) UnmarshalJSON(data []byte) error {
return nil return nil
} }
type ProcessStopConfiguration struct {
Type string `json:"type"`
Value string `json:"value"`
}
// Defines the process configuration for a given server instance. This sets what the // Defines the process configuration for a given server instance. This sets what the
// daemon is looking for to mark a server as done starting, what to do when stopping, // daemon is looking for to mark a server as done starting, what to do when stopping,
// and what changes to make to the configuration file for a server. // and what changes to make to the configuration file for a server.
type ProcessConfiguration struct { type ProcessConfiguration struct {
Startup struct { Startup struct {
Done []*OutputLineMatcher `json:"done"` Done []*OutputLineMatcher `json:"done"`
UserInteraction []string `json:"user_interaction"` UserInteraction []string `json:"user_interaction"`
StripAnsi bool `json:"strip_ansi"` StripAnsi bool `json:"strip_ansi"`
} `json:"startup"` } `json:"startup"`
Stop struct { Stop ProcessStopConfiguration `json:"stop"`
Type string `json:"type"`
Value string `json:"value"`
} `json:"stop"`
ConfigurationFiles []parser.ConfigurationFile `json:"configs"` ConfigurationFiles []parser.ConfigurationFile `json:"configs"`
} }

View File

@ -55,24 +55,7 @@ type Configuration struct {
// Defines internal throttling configurations for server processes to prevent // Defines internal throttling configurations for server processes to prevent
// someone from running an endless loop that spams data to logs. // someone from running an endless loop that spams data to logs.
Throttles struct { Throttles ConsoleThrottles
// The number of data overage warnings (inclusive) that can accumulate
// before a process is terminated.
KillAtCount int `default:"5" yaml:"kill_at_count"`
// The number of seconds that must elapse before the internal counter
// begins decrementing warnings assigned to a process that is outputting
// too much data.
DecaySeconds int `default:"10" json:"decay" yaml:"decay"`
// The total number of bytes allowed to be output by a server process
// per interval.
BytesPerInterval int `default:"4096" json:"bytes" yaml:"bytes"`
// The amount of time that should lapse between data output throttle
// checks. This should be defined in milliseconds.
CheckInterval int `default:"100" yaml:"check_interval"`
}
// The location where the panel is running that this daemon should connect to // The location where the panel is running that this daemon should connect to
// to collect data and send events. // to collect data and send events.

View File

@ -0,0 +1,23 @@
package config
type ConsoleThrottles struct {
// Wether or not the throttler is enabled for this instance.
Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
// The total number of throttle activations that must accumulate before a server is
// forcibly stopped for violating these limits.
KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"`
// The amount of time in milliseconds that a server process must go through without
// triggering an output warning before the throttle activation count begins decreasing.
// This time is measured in milliseconds.
Decay uint64 `json:"decay" yaml:"decay" default:"10000"`
// The total number of lines that can be output in a given CheckInterval period before
// a warning is triggered and counted against the server.
Lines uint64 `json:"lines" yaml:"lines" default:"1000"`
// The amount of time that must pass between intervals before the count is reset. This
// value is in milliseconds.
CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"`
}

View File

@ -0,0 +1,67 @@
package environment
import (
"fmt"
"github.com/docker/go-connections/nat"
"strconv"
)
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}
// Converts the server allocation mappings into a format that can be understood by Docker. While
// we do strive to support multiple environments, using Docker's standardized format for the
// bindings certainly makes life a little easier for managing things.
func (a *Allocations) Bindings() nat.PortMap {
var out = nat.PortMap{}
for ip, ports := range a.Mappings {
for _, port := range ports {
// Skip over invalid ports.
if port < 1 || port > 65535 {
continue
}
binding := []nat.PortBinding{
{
HostIP: ip,
HostPort: strconv.Itoa(port),
},
}
out[nat.Port(fmt.Sprintf("%d/tcp", port))] = binding
out[nat.Port(fmt.Sprintf("%d/udp", port))] = binding
}
}
return out
}
// Converts the server allocation mappings into a PortSet that can be understood
// by Docker. This formatting is slightly different than "Bindings" as it should
// return an empty struct rather than a binding.
//
// To accomplish this, we'll just get the values from "Bindings" and then set them
// to empty structs. Because why not.
func (a *Allocations) Exposed() nat.PortSet {
var out = nat.PortSet{}
for port := range a.Bindings() {
out[port] = struct{}{}
}
return out
}

84
environment/config.go Normal file
View File

@ -0,0 +1,84 @@
package environment
import (
"fmt"
"strings"
"sync"
"time"
)
type configurationSettings struct {
Mounts []Mount
Allocations Allocations
Limits Limits
Variables Variables
}
// Defines the actual configuration struct for the environment with all of the settings
// defined within it.
type Configuration struct {
mu sync.RWMutex
settings configurationSettings
}
func NewConfiguration(m []Mount, a Allocations, l Limits, v Variables) *Configuration {
return &Configuration{
settings: configurationSettings{
Mounts: m,
Allocations: a,
Limits: l,
Variables: v,
},
}
}
func (c *Configuration) Limits() Limits {
c.mu.RLock()
defer c.mu.RUnlock()
return c.settings.Limits
}
func (c *Configuration) Allocations() Allocations {
c.mu.RLock()
defer c.mu.RUnlock()
return c.settings.Allocations
}
func (c *Configuration) Mounts() []Mount {
c.mu.RLock()
defer c.mu.RUnlock()
return c.settings.Mounts
}
// Returns all of the environment variables that should be assigned to a running
// server instance.
func (c *Configuration) EnvironmentVariables() []string {
c.mu.RLock()
c.mu.RUnlock()
zone, _ := time.Now().In(time.Local).Zone()
var out = []string{
fmt.Sprintf("TZ=%s", zone),
fmt.Sprintf("SERVER_MEMORY=%d", c.settings.Limits.MemoryLimit),
fmt.Sprintf("SERVER_IP=%s", c.settings.Allocations.DefaultMapping.Ip),
fmt.Sprintf("SERVER_PORT=%d", c.settings.Allocations.DefaultMapping.Port),
}
eloop:
for k := range c.settings.Variables {
for _, e := range out {
if strings.HasPrefix(e, strings.ToUpper(k)) {
continue eloop
}
}
out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), c.settings.Variables.Get(k)))
}
return out
}

View File

@ -0,0 +1,20 @@
package docker
import "io"
type Console struct {
HandlerFunc *func(string)
}
var _ io.Writer = Console{}
func (c Console) Write(b []byte) (int, error) {
if c.HandlerFunc != nil {
l := make([]byte, len(b))
copy(l, b)
(*c.HandlerFunc)(string(l))
}
return len(b), nil
}

View File

@ -0,0 +1,367 @@
package docker
import (
"bufio"
"context"
"fmt"
"github.com/apex/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/system"
"io"
"strconv"
"strings"
"time"
)
// Attaches to the docker container itself and ensures that we can pipe data in and out
// of the process stream. This should not be used for reading console data as you *will*
// miss important output at the beginning because of the time delay with attaching to the
// output.
func (e *Environment) Attach() error {
if e.IsAttached() {
return nil
}
if err := e.followOutput(); err != nil {
return errors.WithStack(err)
}
opts := types.ContainerAttachOptions{
Stdin: true,
Stdout: true,
Stderr: true,
Stream: true,
}
// Set the stream again with the container.
if st, err := e.client.ContainerAttach(context.Background(), e.Id, opts); err != nil {
return errors.WithStack(err)
} else {
e.SetStream(&st)
}
console := new(Console)
// TODO: resource polling should be handled by the server itself and just call a function
// on the environment that can return the data. Same for disabling polling.
go func() {
defer e.stream.Close()
defer func() {
e.setState(system.ProcessOfflineState)
e.SetStream(nil)
}()
_, _ = io.Copy(console, e.stream.Reader)
}()
return nil
}
func (e *Environment) resources() container.Resources {
l := e.Configuration.Limits()
return container.Resources{
Memory: l.BoundedMemoryLimit(),
MemoryReservation: l.MemoryLimit * 1_000_000,
MemorySwap: l.ConvertedSwap(),
CPUQuota: l.ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: l.IoWeight,
OomKillDisable: &l.OOMDisabled,
CpusetCpus: l.Threads,
}
}
// Performs an in-place update of the Docker container's resource limits without actually
// making any changes to the operational state of the container. This allows memory, cpu,
// and IO limitations to be adjusted on the fly for individual instances.
func (e *Environment) InSituUpdate() error {
if _, err := e.client.ContainerInspect(context.Background(), e.Id); err != nil {
// If the container doesn't exist for some reason there really isn't anything
// we can do to fix that in this process (it doesn't make sense at least). In those
// cases just return without doing anything since we still want to save the configuration
// to the disk.
//
// We'll let a boot process make modifications to the container if needed at this point.
if client.IsErrNotFound(err) {
return nil
}
return errors.WithStack(err)
}
u := container.UpdateConfig{
Resources: e.resources(),
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if _, err := e.client.ContainerUpdate(ctx, e.Id, u); err != nil {
return errors.WithStack(err)
}
return nil
}
// Creates a new container for the server using all of the data that is currently
// available for it. If the container already exists it will be returnee.
func (e *Environment) Create() error {
// If the container already exists don't hit the user with an error, just return
// the current information about it which is what we would do when creating the
// container anyways.
if _, err := e.client.ContainerInspect(context.Background(), e.Id); err == nil {
return nil
} else if !client.IsErrNotFound(err) {
return errors.WithStack(err)
}
// Try to pull the requested image before creating the container.
if err := e.ensureImageExists(e.meta.Image); err != nil {
return errors.WithStack(err)
}
a := e.Configuration.Allocations()
conf := &container.Config{
Hostname: e.Id,
Domainname: config.Get().Docker.Domainname,
User: strconv.Itoa(config.Get().System.User.Uid),
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
OpenStdin: true,
Tty: true,
ExposedPorts: a.Exposed(),
Image: e.meta.Image,
Env: e.variables(),
Labels: map[string]string{
"Service": "Pterodactyl",
"ContainerType": "server_process",
},
}
hostConf := &container.HostConfig{
PortBindings: a.Bindings(),
// Configure the mounts for this container. First mount the server data directory
// into the container as a r/w bine.
Mounts: e.convertMounts(),
// Configure the /tmp folder mapping in containers. This is necessary for some
// games that need to make use of it for downloads and other installation processes.
Tmpfs: map[string]string{
"/tmp": "rw,exec,nosuid,size=50M",
},
// Define resource limits for the container based on the data passed through
// from the Panel.
Resources: e.resources(),
DNS: config.Get().Docker.Network.Dns,
// Configure logging for the container to make it easier on the Daemon to grab
// the server output. Ensure that we don't use too much space on the host machine
// since we only need it for the last few hundred lines of output and don't care
// about anything else in it.
LogConfig: container.LogConfig{
Type: jsonfilelog.Name,
Config: map[string]string{
"max-size": "5m",
"max-file": "1",
},
},
SecurityOpt: []string{"no-new-privileges"},
ReadonlyRootfs: true,
CapDrop: []string{
"setpcap", "mknod", "audit_write", "net_raw", "dac_override",
"fowner", "fsetid", "net_bind_service", "sys_chroot", "setfcap",
},
NetworkMode: container.NetworkMode(config.Get().Docker.Network.Mode),
}
if _, err := e.client.ContainerCreate(context.Background(), conf, hostConf, nil, e.Id); err != nil {
return errors.WithStack(err)
}
return nil
}
func (e *Environment) variables() []string {
v := e.Configuration.EnvironmentVariables()
return append(v, fmt.Sprintf("STARTUP=%s", e.meta.Invocation))
}
func (e *Environment) convertMounts() []mount.Mount {
var out []mount.Mount
for _, m := range e.Configuration.Mounts() {
out = append(out, mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Target,
ReadOnly: m.ReadOnly,
})
}
return out
}
// Remove the Docker container from the machine. If the container is currently running
// it will be forcibly stopped by Docker.
func (e *Environment) Destroy() error {
// We set it to stopping than offline to prevent crash detection from being triggeree.
e.setState(system.ProcessStoppingState)
err := e.client.ContainerRemove(context.Background(), e.Id, types.ContainerRemoveOptions{
RemoveVolumes: true,
RemoveLinks: false,
Force: true,
})
// Don't trigger a destroy failure if we try to delete a container that does not
// exist on the system. We're just a step ahead of ourselves in that case.
//
// @see https://github.com/pterodactyl/panel/issues/2001
if err != nil && client.IsErrNotFound(err) {
return nil
}
e.setState(system.ProcessOfflineState)
return err
}
// Attaches to the log for the container. This avoids us missing cruicial output that
// happens in the split seconds before the code moves from 'Starting' to 'Attaching'
// on the process.
func (e *Environment) followOutput() error {
if exists, err := e.Exists(); !exists {
if err != nil {
return errors.WithStack(err)
}
return errors.New(fmt.Sprintf("no such container: %s", e.Id))
}
opts := types.ContainerLogsOptions{
ShowStderr: true,
ShowStdout: true,
Follow: true,
Since: time.Now().Format(time.RFC3339),
}
reader, err := e.client.ContainerLogs(context.Background(), e.Id, opts)
go func(r io.ReadCloser) {
defer r.Close()
s := bufio.NewScanner(r)
for s.Scan() {
e.Events().Publish(environment.ConsoleOutputEvent, s.Text())
}
if err := s.Err(); err != nil {
log.WithField("error", err).WithField("container_id", e.Id).Warn("error processing scanner line in console output")
}
}(reader)
return errors.WithStack(err)
}
// Pulls the image from Docker. If there is an error while pulling the image from the source
// but the image already exists locally, we will report that error to the logger but continue
// with the process.
//
// The reasoning behind this is that Quay has had some serious outages as of late, and we don't
// need to block all of the servers from booting just because of that. I'd imagine in a lot of
// cases an outage shouldn't affect users too badly. It'll at least keep existing servers working
// correctly if anything.
//
// TODO: handle authorization & local images
func (e *Environment) ensureImageExists(image string) error {
// Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an
// image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull
// an image. Let me know when I am inevitably wrong here...
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
defer cancel()
// Get a registry auth configuration from the config.
var registryAuth *config.RegistryConfiguration
for registry, c := range config.Get().Docker.Registries {
if !strings.HasPrefix(image, registry) {
continue
}
log.WithField("registry", registry).Debug("using authentication for registry")
registryAuth = &c
break
}
// Get the ImagePullOptions.
imagePullOptions := types.ImagePullOptions{All: false}
if registryAuth != nil {
b64, err := registryAuth.Base64()
if err != nil {
log.WithError(err).Error("failed to get registry auth credentials")
}
// b64 is a string so if there is an error it will just be empty, not nil.
imagePullOptions.RegistryAuth = b64
}
out, err := e.client.ImagePull(ctx, image, imagePullOptions)
if err != nil {
images, ierr := e.client.ImageList(ctx, types.ImageListOptions{})
if ierr != nil {
// Well damn, something has gone really wrong here, just go ahead and abort there
// isn't much anything we can do to try and self-recover from this.
return ierr
}
for _, img := range images {
for _, t := range img.RepoTags {
if t != image {
continue
}
log.WithFields(log.Fields{
"image": image,
"container_id": e.Id,
"error": errors.New(err.Error()),
}).Warn("unable to pull requested image from remote source, however the image exists locally")
// Okay, we found a matching container image, in that case just go ahead and return
// from this function, since there is nothing else we need to do here.
return nil
}
}
return err
}
defer out.Close()
log.WithField("image", image).Debug("pulling docker image... this could take a bit of time")
// I'm not sure what the best approach here is, but this will block execution until the image
// is done being pulled, which is what we neee.
scanner := bufio.NewScanner(out)
for scanner.Scan() {
continue
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,169 @@
package docker
import (
"context"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events"
"io"
"sync"
)
type Metadata struct {
Invocation string
Image string
Stop *api.ProcessStopConfiguration
}
// Ensure that the Docker environment is always implementing all of the methods
// from the base environment interface.
var _ environment.ProcessEnvironment = (*Environment)(nil)
type Environment struct {
mu sync.RWMutex
eventMu sync.Mutex
// The public identifier for this environment. In this case it is the Docker container
// name that will be used for all instances created under it.
Id string
// The environment configuration.
Configuration *environment.Configuration
meta *Metadata
// The Docker client being used for this instance.
client *client.Client
// Controls the hijacked response stream which exists only when we're attached to
// the running container instance.
stream *types.HijackedResponse
// Holds the stats stream used by the polling commands so that we can easily close it out.
stats io.ReadCloser
emitter *events.EventBus
// Tracks the environment state.
st string
stMu sync.RWMutex
}
// Creates a new base Docker environment. The ID passed through will be the ID that is used to
// reference the container from here on out. This should be unique per-server (we use the UUID
// by default). The container does not need to exist at this point.
func New(id string, m *Metadata, c *environment.Configuration) (*Environment, error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
e := &Environment{
Id: id,
Configuration: c,
meta: m,
client: cli,
}
return e, nil
}
func (e *Environment) SetStopConfiguration(c *api.ProcessStopConfiguration) {
e.mu.Lock()
e.meta.Stop = c
e.mu.Unlock()
}
func (e *Environment) Type() string {
return "docker"
}
// Set if this process is currently attached to the process.
func (e *Environment) SetStream(s *types.HijackedResponse) {
e.mu.Lock()
e.stream = s
e.mu.Unlock()
}
// Determine if the this process is currently attached to the container.
func (e *Environment) IsAttached() bool {
e.mu.RLock()
defer e.mu.RUnlock()
return e.stream != nil
}
func (e *Environment) Events() *events.EventBus {
e.eventMu.Lock()
defer e.eventMu.Unlock()
if e.emitter == nil {
e.emitter = events.New()
}
return e.emitter
}
// Determines if the container exists in this environment. The ID passed through should be the
// server UUID since containers are created utilizing the server UUID as the name and docker
// will work fine when using the container name as the lookup parameter in addition to the longer
// ID auto-assigned when the container is createe.
func (e *Environment) Exists() (bool, error) {
_, err := e.client.ContainerInspect(context.Background(), e.Id)
if err != nil {
// If this error is because the container instance wasn't found via Docker we
// can safely ignore the error and just return false.
if client.IsErrNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
// Determines if the server's docker container is currently running. If there is no container
// present, an error will be raised (since this shouldn't be a case that ever happens under
// correctly developed circumstances).
//
// You can confirm if the instance wasn't found by using client.IsErrNotFound from the Docker
// API.
//
// @see docker/client/errors.go
func (e *Environment) IsRunning() (bool, error) {
c, err := e.client.ContainerInspect(context.Background(), e.Id)
if err != nil {
return false, err
}
return c.State.Running, nil
}
// Determine the container exit state and return the exit code and wether or not
// the container was killed by the OOM killer.
func (e *Environment) ExitState() (uint32, bool, error) {
c, err := e.client.ContainerInspect(context.Background(), e.Id)
if err != nil {
// I'm not entirely sure how this can happen to be honest. I tried deleting a
// container _while_ a server was running and wings gracefully saw the crash and
// created a new container for it.
//
// However, someone reported an error in Discord about this scenario happening,
// so I guess this should prevent it? They didn't tell me how they caused it though
// so that's a mystery that will have to go unsolvee.
//
// @see https://github.com/pterodactyl/panel/issues/2003
if client.IsErrNotFound(err) {
return 1, false, nil
}
return 0, false, errors.WithStack(err)
}
return uint32(c.State.ExitCode), c.State.OOMKilled, nil
}

243
environment/docker/power.go Normal file
View File

@ -0,0 +1,243 @@
package docker
import (
"context"
"github.com/apex/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/system"
"os"
"strings"
"time"
)
// Run before the container starts and get the process configuration from the Panel.
// This is important since we use this to check configuration files as well as ensure
// we always have the latest version of an egg available for server processes.
//
// This process will also confirm that the server environment exists and is in a bootable
// state. This ensures that unexpected container deletion while Wings is running does
// not result in the server becoming unbootable.
func (e *Environment) OnBeforeStart() error {
// Always destroy and re-create the server container to ensure that synced data from
// the Panel is usee.
if err := e.client.ContainerRemove(context.Background(), e.Id, types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil {
if !client.IsErrNotFound(err) {
return err
}
}
// The Create() function will check if the container exists in the first place, and if
// so just silently return without an error. Otherwise, it will try to create the necessary
// container and data storage directory.
//
// This won't actually run an installation process however, it is just here to ensure the
// environment gets created properly if it is missing and the server is startee. We're making
// an assumption that all of the files will still exist at this point.
if err := e.Create(); err != nil {
return err
}
return nil
}
// Starts the server environment and begins piping output to the event listeners for the
// console. If a container does not exist, or needs to be rebuilt that will happen in the
// call to OnBeforeStart().
func (e *Environment) Start() error {
sawError := false
// If sawError is set to true there was an error somewhere in the pipeline that
// got passed up, but we also want to ensure we set the server to be offline at
// that point.
defer func() {
if sawError {
// If we don't set it to stopping first, you'll trigger crash detection which
// we don't want to do at this point since it'll just immediately try to do the
// exact same action that lead to it crashing in the first place...
e.setState(system.ProcessStoppingState)
e.setState(system.ProcessOfflineState)
}
}()
if c, err := e.client.ContainerInspect(context.Background(), e.Id); err != nil {
// Do nothing if the container is not found, we just don't want to continue
// to the next block of code here. This check was inlined here to guard againt
// a nil-pointer when checking c.State below.
//
// @see https://github.com/pterodactyl/panel/issues/2000
if !client.IsErrNotFound(err) {
return errors.WithStack(err)
}
} else {
// If the server is running update our internal state and continue on with the attach.
if c.State.Running {
e.setState(system.ProcessRunningState)
return e.Attach()
}
// Truncate the log file so we don't end up outputting a bunch of useless log information
// to the websocket and whatnot. Check first that the path and file exist before trying
// to truncate them.
if _, err := os.Stat(c.LogPath); err == nil {
if err := os.Truncate(c.LogPath, 0); err != nil {
return errors.WithStack(err)
}
}
}
e.setState(system.ProcessStartingState)
// Set this to true for now, we will set it to false once we reach the
// end of this chain.
sawError = true
// Run the before start function and wait for it to finish. This will validate that the container
// exists on the system, and rebuild the container if that is required for server booting to
// occur.
if err := e.OnBeforeStart(); err != nil {
return errors.WithStack(err)
}
// Update the configuration files defined for the server before beginning the boot process.
// This process executes a bunch of parallel updates, so we just block until that process
// is completee. Any errors as a result of this will just be bubbled out in the logger,
// we don't need to actively do anything about it at this point, worst comes to worst the
// server starts in a weird state and the user can manually adjust.
// e.Server.UpdateConfigurationFiles()
//
// // Reset the permissions on files for the server before actually trying
// // to start it.
// if err := e.Server.Filesystem.Chown("/"); err != nil {
// return errors.WithStack(err)
// }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := e.client.ContainerStart(ctx, e.Id, types.ContainerStartOptions{}); err != nil {
return errors.WithStack(err)
}
// No errors, good to continue through.
sawError = false
return e.Attach()
}
// Restarts the server process by waiting for the process to gracefully stop and then triggering a
// start commane. This will return an error if there is already a restart process executing for the
// server. The lock is released when the process is stopped and a start has begun.
func (e *Environment) Restart() error {
err := e.WaitForStop(60, false)
if err != nil {
return err
}
// Start the process.
return e.Start()
}
// Stops the container that the server is running in. This will allow up to 10
// seconds to pass before a failure occurs.
func (e *Environment) Stop() error {
e.mu.RLock()
s := e.meta.Stop
e.mu.RUnlock()
if s == nil || s.Type == api.ProcessStopSignal {
if s == nil {
log.WithField("container_id", e.Id).Warn("no stop configuration detected for environment, using termination proceedure")
}
return e.Terminate(os.Kill)
}
e.setState(system.ProcessStoppingState)
// Only attempt to send the stop command to the instance if we are actually attached to
// the instance. If we are not for some reason, just send the container stop event.
if e.IsAttached() && s.Type == api.ProcessStopCommand {
return e.SendCommand(s.Value)
}
t := time.Second * 10
err := e.client.ContainerStop(context.Background(), e.Id, &t)
if err != nil {
// If the container does not exist just mark the process as stopped and return without
// an error.
if client.IsErrNotFound(err) {
e.SetStream(nil)
e.setState(system.ProcessOfflineState)
return nil
}
return err
}
return nil
}
// Attempts to gracefully stop a server using the defined stop commane. If the server
// does not stop after seconds have passed, an error will be returned, or the instance
// will be terminated forcefully depending on the value of the second argument.
func (e *Environment) WaitForStop(seconds int, terminate bool) error {
if err := e.Stop(); err != nil {
return errors.WithStack(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
// Block the return of this function until the container as been marked as no
// longer running. If this wait does not end by the time seconds have passed,
// attempt to terminate the container, or return an error.
ok, errChan := e.client.ContainerWait(ctx, e.Id, container.WaitConditionNotRunning)
select {
case <-ctx.Done():
if ctxErr := ctx.Err(); ctxErr != nil {
if terminate {
return e.Terminate(os.Kill)
}
return errors.WithStack(ctxErr)
}
case err := <-errChan:
if err != nil {
return errors.WithStack(err)
}
case <-ok:
}
return nil
}
// Forcefully terminates the container using the signal passed through.
func (e *Environment) Terminate(signal os.Signal) error {
c, err := e.client.ContainerInspect(context.Background(), e.Id)
if err != nil {
return errors.WithStack(err)
}
if !c.State.Running {
return nil
}
// We set it to stopping than offline to prevent crash detection from being triggeree.
e.setState(system.ProcessStoppingState)
sig := strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed")
if err := e.client.ContainerKill(context.Background(), e.Id, sig); err != nil {
return err
}
e.setState(system.ProcessOfflineState)
return nil
}

View File

@ -0,0 +1,42 @@
package docker
import (
"fmt"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/system"
)
// Returns the current environment state.
func (e *Environment) State() string {
e.stMu.RLock()
defer e.stMu.RUnlock()
return e.st
}
// Sets the state of the environment. This emits an event that server's can hook into to
// take their own actions and track their own state based on the environment.
func (e *Environment) setState(state string) error {
if state != system.ProcessOfflineState &&
state != system.ProcessStartingState &&
state != system.ProcessRunningState &&
state != system.ProcessStoppingState {
return errors.New(fmt.Sprintf("invalid server state received: %s", state))
}
// Get the current state of the environment before changing it.
prevState := e.State()
// Emit the event to any listeners that are currently registeree.
if prevState != state {
// If the state changed make sure we update the internal tracking to note that.
e.stMu.Lock()
e.st = state
e.stMu.Unlock()
e.Events().Publish(environment.StateChangeEvent, e.State())
}
return nil
}

View File

@ -0,0 +1,108 @@
package docker
import (
"bufio"
"bytes"
"context"
"encoding/json"
"github.com/docker/docker/api/types"
"github.com/pkg/errors"
"io"
"os"
)
type dockerLogLine struct {
Log string `json:"log"`
}
func (e *Environment) setStream(s *types.HijackedResponse) {
e.mu.Lock()
e.stream = s
e.mu.Unlock()
}
// Sends the specified command to the stdin of the running container instance. There is no
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
func (e *Environment) SendCommand(c string) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.IsAttached() {
return errors.New("attempting to send command to non-attached instance")
}
_, err := e.stream.Conn.Write([]byte(c + "\n"))
return errors.WithStack(err)
}
// Reads the log file for the server. This does not care if the server is running or not, it will
// simply try to read the last X bytes of the file and return them.
func (e *Environment) Readlog(len int64) ([]string, error) {
j, err := e.client.ContainerInspect(context.Background(), e.Id)
if err != nil {
return nil, err
}
if j.LogPath == "" {
return nil, errors.New("empty log path defined for server")
}
f, err := os.Open(j.LogPath)
if err != nil {
return nil, err
}
defer f.Close()
// Check if the length of the file is smaller than the amount of data that was requested
// for reading. If so, adjust the length to be the total length of the file. If this is not
// done an error is thrown since we're reading backwards, and not forwards.
if stat, err := os.Stat(j.LogPath); err != nil {
return nil, err
} else if stat.Size() < len {
len = stat.Size()
}
// Seed to the end of the file and then move backwards until the length is met to avoid
// reading the entirety of the file into memory.
if _, err := f.Seek(-len, io.SeekEnd); err != nil {
return nil, err
}
b := make([]byte, len)
if _, err := f.Read(b); err != nil && err != io.EOF {
return nil, err
}
return e.parseLogToStrings(b)
}
// Docker stores the logs for server output in a JSON format. This function will iterate over the JSON
// that was read from the log file and parse it into a more human readable format.
func (e *Environment) parseLogToStrings(b []byte) ([]string, error) {
var hasError = false
var out []string
scanner := bufio.NewScanner(bytes.NewReader(b))
for scanner.Scan() {
var l dockerLogLine
// Unmarshal the contents and allow up to a single error before bailing out of the process. We
// do this because if you're arbitrarily reading a length of the file you'll likely end up
// with the first line in the output being improperly formatted JSON. In those cases we want to
// just skip over it. However if we see another error we're going to bail out because that is an
// abnormal situation.
if err := json.Unmarshal([]byte(scanner.Text()), &l); err != nil {
if hasError {
return nil, err
}
hasError = true
continue
}
out = append(out, l.Log)
}
return out, nil
}

View File

@ -1,15 +1,31 @@
package server package environment
import ( import (
"github.com/pterodactyl/wings/events"
"os" "os"
) )
const (
ConsoleOutputEvent = "console output"
StateChangeEvent = "state change"
)
// Defines the basic interface that all environments need to implement so that // Defines the basic interface that all environments need to implement so that
// a server can be properly controlled. // a server can be properly controlled.
type Environment interface { type ProcessEnvironment interface {
// Returns the name of the environment. // Returns the name of the environment.
Type() string Type() string
// Returns an event emitter instance that can be hooked into to listen for different
// events that are fired by the environment. This should not allow someone to publish
// events, only subscribe to them.
Events() *events.EventBus
// Determines if the server instance exists. For example, in a docker environment
// this should confirm that the container is created and in a bootable state. In
// a basic CLI environment this can probably just return true right away.
Exists() (bool, error)
// Determines if the environment is currently active and running a server process // Determines if the environment is currently active and running a server process
// for this specific server instance. // for this specific server instance.
IsRunning() (bool, error) IsRunning() (bool, error)
@ -42,11 +58,6 @@ type Environment interface {
// depending on the value of the second argument. // depending on the value of the second argument.
WaitForStop(seconds int, terminate bool) error WaitForStop(seconds int, terminate bool) error
// Determines if the server instance exists. For example, in a docker environment
// this should confirm that the container is created and in a bootable state. In
// a basic CLI environment this can probably just return true right away.
Exists() (bool, error)
// Terminates a running server instance using the provided signal. If the server // Terminates a running server instance using the provided signal. If the server
// is not running no error should be returned. // is not running no error should be returned.
Terminate(signal os.Signal) error Terminate(signal os.Signal) error
@ -69,22 +80,10 @@ type Environment interface {
// send data into the environment's stdin. // send data into the environment's stdin.
Attach() error Attach() error
// Follows the output from the server console and will begin piping the output to
// the server's emitter.
FollowConsoleOutput() error
// Sends the provided command to the running server instance. // Sends the provided command to the running server instance.
SendCommand(string) error SendCommand(string) error
// Reads the log file for the process from the end backwards until the provided // Reads the log file for the process from the end backwards until the provided
// number of bytes is met. // number of bytes is met.
Readlog(int64) ([]string, error) Readlog(int64) ([]string, error)
// Polls the given environment for resource usage of the server when the process
// is running.
EnableResourcePolling() error
// Disables the polling operation for resource usage and sets the required values
// to 0 in the server resource usage struct.
DisableResourcePolling() error
} }

124
environment/settings.go Normal file
View File

@ -0,0 +1,124 @@
package environment
import (
"fmt"
"math"
"strconv"
)
type Mount struct {
// In Docker environments this makes no difference, however in a non-Docker environment you
// should treat the "Default" mount as the root directory for the server. All other mounts
// are just in addition to that one, and generally things like shared maps or timezone data.
Default bool `json:"-"`
// The target path on the system. This is "/home/container" for all server's Default mount
// but in non-container environments you can likely ignore the target and just work with the
// source.
Target string `json:"target"`
// The directory from which the files will be read. In Docker environments this is the directory
// that we're mounting into the container at the Target location.
Source string `json:"source"`
// Wether or not the directory is being mounted as read-only. It is up to the environment to
// handle this value correctly and ensure security expectations are met with its usage.
ReadOnly bool `json:"read_only"`
}
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type Limits struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
OOMDisabled bool `json:"oom_disabled"`
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (r *Limits) ConvertedCpuLimit() int64 {
if r.CpuLimit == 0 {
return -1
}
return r.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (r *Limits) MemoryOverheadMultiplier() float64 {
if r.MemoryLimit <= 2048 {
return 1.15
} else if r.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (r *Limits) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(r.MemoryLimit) * r.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (r *Limits) ConvertedSwap() int64 {
if r.Swap < 0 {
return -1
}
return (r.Swap * 1_000_000) + r.BoundedMemoryLimit()
}
type Variables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (v Variables) Get(key string) string {
val, ok := v[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}

105
events/events.go Normal file
View File

@ -0,0 +1,105 @@
package events
import (
"encoding/json"
"strings"
"sync"
)
type Event struct {
Data string
Topic string
}
type EventBus struct {
sync.RWMutex
subscribers map[string]map[chan Event]struct{}
}
func New() *EventBus {
return &EventBus{
subscribers: make(map[string]map[chan Event]struct{}),
}
}
// Publish data to a given topic.
func (e *EventBus) Publish(topic string, data string) {
t := topic
// Some of our topics for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed.
//
// In these cases, we still need to the send the event using the standard listener
// name of "backup completed".
if strings.Contains(topic, ":") {
parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 {
t = parts[0]
}
}
// Acquire a read lock and loop over all of the channels registered for the topic. This
// avoids a panic crash if the process tries to unregister the channel while this routine
// is running.
go func() {
e.RLock()
defer e.RUnlock()
if ch, ok := e.subscribers[t]; ok {
for channel := range ch {
channel <- Event{Data: data, Topic: topic}
}
}
}()
}
func (e *EventBus) PublishJson(topic string, data interface{}) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
e.Publish(topic, string(b))
return nil
}
// Subscribe to an emitter topic using a channel.
func (e *EventBus) Subscribe(topic string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic]; !exists {
e.subscribers[topic] = make(map[chan Event]struct{})
}
// Only set the channel if there is not currently a matching one for this topic. This
// avoids registering two identical listeners for the same topic and causing pain in
// the unsubscribe functionality as well.
if _, exists := e.subscribers[topic][ch]; !exists {
e.subscribers[topic][ch] = struct{}{}
}
}
// Unsubscribe a channel from a given topic.
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic][ch]; exists {
delete(e.subscribers[topic], ch)
}
}
// Removes all of the event listeners for the server. This is used when a server
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
// should also check elsewhere and handle a server reference going nil, but this
// won't hurt.
func (e *EventBus) UnsubscribeAll() {
e.Lock()
defer e.Unlock()
// Reset the entire struct into an empty map.
e.subscribers = make(map[string]map[chan Event]struct{})
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"os" "os"
"path" "path"
@ -33,7 +34,7 @@ func New(data []byte) (*Installer, error) {
Uuid: getString(data, "uuid"), Uuid: getString(data, "uuid"),
Suspended: false, Suspended: false,
Invocation: getString(data, "invocation"), Invocation: getString(data, "invocation"),
Build: server.BuildSettings{ Build: environment.Limits{
MemoryLimit: getInt(data, "build", "memory"), MemoryLimit: getInt(data, "build", "memory"),
Swap: getInt(data, "build", "swap"), Swap: getInt(data, "build", "swap"),
IoWeight: uint16(getInt(data, "build", "io")), IoWeight: uint16(getInt(data, "build", "io")),
@ -51,7 +52,7 @@ func New(data []byte) (*Installer, error) {
if b, _, _, err := jsonparser.Get(data, "environment"); err != nil { if b, _, _, err := jsonparser.Get(data, "environment"); err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} else { } else {
cfg.EnvVars = make(server.EnvironmentVariables) cfg.EnvVars = make(environment.Variables)
if err := json.Unmarshal(b, &cfg.EnvVars); err != nil { if err := json.Unmarshal(b, &cfg.EnvVars); err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
@ -116,6 +117,7 @@ func (i *Installer) Execute() {
} }
l.Debug("creating required environment for server instance") l.Debug("creating required environment for server instance")
// TODO: ensure data directory exists.
if err := i.server.Environment.Create(); err != nil { if err := i.server.Environment.Create(); err != nil {
l.WithField("error", err).Error("failed to create environment for server") l.WithField("error", err).Error("failed to create environment for server")
return return

View File

@ -2,6 +2,7 @@ package websocket
import ( import (
"context" "context"
"github.com/pterodactyl/wings/events"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"time" "time"
) )
@ -38,7 +39,7 @@ func (h *Handler) ListenForExpiration(ctx context.Context) {
// Listens for different events happening on a server and sends them along // Listens for different events happening on a server and sends them along
// to the connected websocket. // to the connected websocket.
func (h *Handler) ListenForServerEvents(ctx context.Context) { func (h *Handler) ListenForServerEvents(ctx context.Context) {
events := []string{ e := []string{
server.StatsEvent, server.StatsEvent,
server.StatusEvent, server.StatusEvent,
server.ConsoleOutputEvent, server.ConsoleOutputEvent,
@ -49,15 +50,15 @@ func (h *Handler) ListenForServerEvents(ctx context.Context) {
server.BackupCompletedEvent, server.BackupCompletedEvent,
} }
eventChannel := make(chan server.Event) eventChannel := make(chan events.Event)
for _, event := range events { for _, event := range e {
h.server.Events().Subscribe(event, eventChannel) h.server.Events().Subscribe(event, eventChannel)
} }
for d := range eventChannel { for d := range eventChannel {
select { select {
case <-ctx.Done(): case <-ctx.Done():
for _, event := range events { for _, event := range e {
h.server.Events().Unsubscribe(event, eventChannel) h.server.Events().Unsubscribe(event, eventChannel)
} }

View File

@ -1,17 +0,0 @@
package server
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}

View File

@ -1,72 +0,0 @@
package server
import "math"
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type BuildSettings struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
}
func (s *Server) Build() *BuildSettings {
return &s.Config().Build
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (b *BuildSettings) ConvertedCpuLimit() int64 {
if b.CpuLimit == 0 {
return -1
}
return b.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (b *BuildSettings) MemoryOverheadMultiplier() float64 {
if b.MemoryLimit <= 2048 {
return 1.15
} else if b.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (b *BuildSettings) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (b *BuildSettings) ConvertedSwap() int64 {
if b.Swap < 0 {
return -1
}
return (b.Swap * 1_000_000) + b.BoundedMemoryLimit()
}

View File

@ -1,41 +1,10 @@
package server package server
import ( import (
"fmt" "github.com/pterodactyl/wings/environment"
"strconv"
"sync" "sync"
) )
type EnvironmentVariables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (ev EnvironmentVariables) Get(key string) string {
val, ok := ev[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}
type Configuration struct { type Configuration struct {
mu sync.RWMutex mu sync.RWMutex
@ -53,20 +22,17 @@ type Configuration struct {
// An array of environment variables that should be passed along to the running // An array of environment variables that should be passed along to the running
// server process. // server process.
EnvVars EnvironmentVariables `json:"environment"` EnvVars environment.Variables `json:"environment"`
Allocations Allocations `json:"allocations"` Allocations environment.Allocations `json:"allocations"`
Build BuildSettings `json:"build"` Build environment.Limits `json:"build"`
CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"` CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"`
Mounts []Mount `json:"mounts"` Mounts []Mount `json:"mounts"`
Resources ResourceUsage `json:"resources"` Resources ResourceUsage `json:"resources"`
Container struct { Container struct {
// Defines the Docker image that will be used for this server // Defines the Docker image that will be used for this server
Image string `json:"image,omitempty"` Image string `json:"image,omitempty"`
// If set to true, OOM killer will be disabled on the server's Docker container.
// If not present (nil) we will default to disabling it.
OomDisabled bool `default:"true" json:"oom_disabled"`
} `json:"container,omitempty"` } `json:"container,omitempty"`
} }

View File

@ -3,27 +3,100 @@ package server
import ( import (
"fmt" "fmt"
"github.com/mitchellh/colorstring" "github.com/mitchellh/colorstring"
"io" "github.com/pterodactyl/wings/config"
"sync"
"sync/atomic"
"time"
) )
type Console struct { type ConsoleThrottler struct {
Server *Server sync.RWMutex
HandlerFunc *func(string) config.ConsoleThrottles
// The total number of activations that have occurred thus far.
activations uint64
// The total number of lines processed so far during the given time period.
lines uint64
lastIntervalTime *time.Time
lastDecayTime *time.Time
} }
var _ io.Writer = Console{} // Increments the number of activations for a server.
func (ct *ConsoleThrottler) AddActivation() uint64 {
ct.Lock()
defer ct.Unlock()
func (c Console) Write(b []byte) (int, error) { ct.activations += 1
if c.HandlerFunc != nil {
l := make([]byte, len(b))
copy(l, b)
(*c.HandlerFunc)(string(l)) return ct.activations
}
// Decrements the number of activations for a server.
func (ct *ConsoleThrottler) RemoveActivation() uint64 {
ct.Lock()
defer ct.Unlock()
if ct.activations == 0 {
return 0
} }
return len(b), nil ct.activations -= 1
return ct.activations
} }
// Increment the total count of lines that we have processed so far.
func (ct *ConsoleThrottler) IncrementLineCount() uint64 {
return atomic.AddUint64(&ct.lines, 1)
}
// Reset the line count to zero.
func (ct *ConsoleThrottler) ResetLineCount() {
atomic.SwapUint64(&ct.lines, 0)
}
// Handles output from a server's console. This code ensures that a server is not outputting
// an excessive amount of data to the console that could indicate a malicious or run-away process
// and lead to performance issues for other users.
//
// This was much more of a problem for the NodeJS version of the daemon which struggled to handle
// large volumes of output. However, this code is much more performant so I generally feel a lot
// better about it's abilities.
//
// However, extreme output is still somewhat of a DoS attack vector against this software since we
// are still logging it to the disk temporarily and will want to avoid dumping a huge amount of
// data all at once. These values are all configurable via the wings configuration file, however the
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite
// confident in them.
func (ct *ConsoleThrottler) Handle() {
}
// Returns the throttler instance for the server or creates a new one.
func (s *Server) Throttler() *ConsoleThrottler {
s.throttleLock.RLock()
if s.throttler == nil {
// Release the read lock so that we can acquire a normal lock on the process and
// make modifications to the throttler.
s.throttleLock.RUnlock()
s.throttleLock.Lock()
s.throttler = &ConsoleThrottler{
ConsoleThrottles: config.Get().Throttles,
}
s.throttleLock.Unlock()
return s.throttler
} else {
defer s.throttleLock.RUnlock()
return s.throttler
}
}
// Sends output to the server console formatted to appear correctly as being sent // Sends output to the server console formatted to appear correctly as being sent
// from Wings. // from Wings.
func (s *Server) PublishConsoleOutputFromDaemon(data string) { func (s *Server) PublishConsoleOutputFromDaemon(data string) {

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,7 @@
package server package server
import ( import (
"encoding/json" "github.com/pterodactyl/wings/events"
"strings"
"sync"
) )
// Defines all of the possible output events for a server. // Defines all of the possible output events for a server.
@ -19,108 +17,14 @@ const (
BackupCompletedEvent = "backup completed" BackupCompletedEvent = "backup completed"
) )
type Event struct {
Data string
Topic string
}
type EventBus struct {
sync.RWMutex
subscribers map[string]map[chan Event]struct{}
}
// Returns the server's emitter instance. // Returns the server's emitter instance.
func (s *Server) Events() *EventBus { func (s *Server) Events() *events.EventBus {
s.emitterLock.Lock() s.emitterLock.Lock()
defer s.emitterLock.Unlock() defer s.emitterLock.Unlock()
if s.emitter == nil { if s.emitter == nil {
s.emitter = &EventBus{ s.emitter = events.New()
subscribers: make(map[string]map[chan Event]struct{}),
}
} }
return s.emitter return s.emitter
} }
// Publish data to a given topic.
func (e *EventBus) Publish(topic string, data string) {
t := topic
// Some of our topics for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed.
//
// In these cases, we still need to the send the event using the standard listener
// name of "backup completed".
if strings.Contains(topic, ":") {
parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 {
t = parts[0]
}
}
// Acquire a read lock and loop over all of the channels registered for the topic. This
// avoids a panic crash if the process tries to unregister the channel while this routine
// is running.
go func() {
e.RLock()
defer e.RUnlock()
if ch, ok := e.subscribers[t]; ok {
for channel := range ch {
channel <- Event{Data: data, Topic: topic}
}
}
}()
}
func (e *EventBus) PublishJson(topic string, data interface{}) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
e.Publish(topic, string(b))
return nil
}
// Subscribe to an emitter topic using a channel.
func (e *EventBus) Subscribe(topic string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic]; !exists {
e.subscribers[topic] = make(map[chan Event]struct{})
}
// Only set the channel if there is not currently a matching one for this topic. This
// avoids registering two identical listeners for the same topic and causing pain in
// the unsubscribe functionality as well.
if _, exists := e.subscribers[topic][ch]; !exists {
e.subscribers[topic][ch] = struct{}{}
}
}
// Unsubscribe a channel from a given topic.
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic][ch]; exists {
delete(e.subscribers[topic], ch)
}
}
// Removes all of the event listeners for the server. This is used when a server
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
// should also check elsewhere and handle a server reference going nil, but this
// won't hurt.
func (e *EventBus) UnsubscribeAll() {
e.Lock()
defer e.Unlock()
// Reset the entire struct into an empty map.
e.subscribers = make(map[string]map[chan Event]struct{})
}

View File

@ -3,19 +3,34 @@ package server
import ( import (
"github.com/apex/log" "github.com/apex/log"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events"
"regexp" "regexp"
) )
// Adds all of the internal event listeners we want to use for a server. // Adds all of the internal event listeners we want to use for a server.
func (s *Server) AddEventListeners() { func (s *Server) StartEventListeners() {
consoleChannel := make(chan Event) consoleChannel := make(chan events.Event)
s.Events().Subscribe(ConsoleOutputEvent, consoleChannel) stateChannel := make(chan events.Event)
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, consoleChannel)
s.Environment.Events().Subscribe(environment.StateChangeEvent, stateChannel)
// TODO: this is leaky I imagine since the routines aren't destroyed when the server is?
go func() { go func() {
for { for {
select { select {
case data := <-consoleChannel: case data := <-consoleChannel:
// Immediately emit this event back over the server event stream since it is
// being called from the environment event stream and things probably aren't
// listening to that event.
s.Events().Publish(ConsoleOutputEvent, data.Data)
// Also pass the data along to the console output channel.
s.onConsoleOutput(data.Data) s.onConsoleOutput(data.Data)
case data := <-stateChannel:
s.SetState(data.Data)
} }
} }
}() }()

View File

@ -7,6 +7,8 @@ import (
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/environment/docker"
"os" "os"
"runtime" "runtime"
"time" "time"
@ -87,21 +89,24 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
return nil, err return nil, err
} }
s.AddEventListeners() s.cache = cache.New(time.Minute*10, time.Minute*15)
s.Archiver = Archiver{Server: s}
s.Filesystem = Filesystem{Server: s}
// Right now we only support a Docker based environment, so I'm going to hard code // Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make // this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously. // some modifications here obviously.
if err := NewDockerEnvironment(s); err != nil { envCfg := environment.NewConfiguration(s.Mounts(), s.cfg.Allocations, s.cfg.Build, s.cfg.EnvVars)
return nil, err meta := docker.Metadata{
Invocation: s.Config().Invocation,
Image: s.Config().Container.Image,
} }
s.cache = cache.New(time.Minute*10, time.Minute*15) if env, err := docker.New(s.Id(), &meta, envCfg); err != nil {
s.Archiver = Archiver{ return nil, err
Server: s, } else {
} s.Environment = env
s.Filesystem = Filesystem{ s.StartEventListeners()
Server: s,
} }
// Forces the configuration to be synced with the panel. // Forces the configuration to be synced with the panel.

View File

@ -1,8 +0,0 @@
package server
// Mount represents a Server Mount.
type Mount struct {
Target string `json:"target"`
Source string `json:"source"`
ReadOnly bool `json:"read_only"`
}

98
server/mounts.go Normal file
View File

@ -0,0 +1,98 @@
package server
import (
"github.com/apex/log"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"os"
"path/filepath"
"strings"
)
// To avoid confusion when working with mounts, assume that a server.Mount has not been properly
// cleaned up and had the paths set. An environment.Mount should only be returned with valid paths
// that have been checked.
type Mount environment.Mount
// Returns the default container mounts for the server instance. This includes the data directory
// for the server as well as any timezone related files if they exist on the host system so that
// servers running within the container will use the correct time.
func (s *Server) Mounts() []environment.Mount {
var m []environment.Mount
m = append(m, environment.Mount{
Default: true,
Target: "/home/container",
Source: s.Filesystem.Path(),
ReadOnly: false,
})
// Try to mount in /etc/localtime and /etc/timezone if they exist on the host system.
if _, err := os.Stat("/etc/localtime"); err != nil {
if !os.IsNotExist(err) {
log.WithField("error", errors.WithStack(err)).Warn("failed to stat /etc/localtime due to an error")
}
} else {
m = append(m, environment.Mount{
Target: "/etc/localtime",
Source: "/etc/localtime",
ReadOnly: true,
})
}
if _, err := os.Stat("/etc/timezone"); err != nil {
if !os.IsNotExist(err) {
log.WithField("error", errors.WithStack(err)).Warn("failed to stat /etc/timezone due to an error")
}
} else {
m = append(m, environment.Mount{
Target: "/etc/timezone",
Source: "/etc/timezone",
ReadOnly: true,
})
}
// Also include any of this server's custom mounts when returning them.
return append(m, s.customMounts()...)
}
// Returns the custom mounts for a given server after verifying that they are within a list of
// allowed mount points for the node.
func (s *Server) customMounts() []environment.Mount {
var mounts []environment.Mount
// TODO: probably need to handle things trying to mount directories that do not exist.
for _, m := range s.Config().Mounts {
source := filepath.Clean(m.Source)
target := filepath.Clean(m.Target)
logger := s.Log().WithFields(log.Fields{
"source_path": source,
"target_path": target,
"read_only": m.ReadOnly,
})
mounted := false
for _, allowed := range config.Get().AllowedMounts {
if !strings.HasPrefix(source, allowed) {
continue
}
mounted = true
mounts = append(mounts, environment.Mount{
Source: source,
Target: target,
ReadOnly: m.ReadOnly,
})
break
}
if !mounted {
logger.Warn("skipping custom server mount, not in list of allowed mount points")
}
}
return mounts
}

View File

@ -32,6 +32,10 @@ func (pa PowerAction) IsValid() bool {
pa == PowerActionRestart pa == PowerActionRestart
} }
func (pa PowerAction) IsStart() bool {
return pa == PowerActionStart || pa == PowerActionRestart
}
// Helper function that can receive a power action and then process the actions that need // Helper function that can receive a power action and then process the actions that need
// to occur for it. This guards against someone calling Start() twice at the same time, or // to occur for it. This guards against someone calling Start() twice at the same time, or
// trying to restart while another restart process is currently running. // trying to restart while another restart process is currently running.
@ -40,6 +44,11 @@ func (pa PowerAction) IsValid() bool {
// function rather than making direct calls to the start/stop/restart functions on the // function rather than making direct calls to the start/stop/restart functions on the
// environment struct. // environment struct.
func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error { func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error {
// Disallow start & restart if the server is suspended.
if action.IsStart() && s.IsSuspended() {
return new(suspendedError)
}
if s.powerLock == nil { if s.powerLock == nil {
s.powerLock = semaphore.NewWeighted(1) s.powerLock = semaphore.NewWeighted(1)
} }
@ -65,6 +74,17 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
// Release the lock once the process being requested has finished executing. // Release the lock once the process being requested has finished executing.
defer s.powerLock.Release(1) defer s.powerLock.Release(1)
if action.IsStart() {
s.Log().Info("syncing server configuration with panel")
if err := s.Sync(); err != nil {
return errors.WithStack(err)
}
if !s.Filesystem.HasSpaceAvailable() {
return errors.New("cannot start server, not enough disk space available")
}
}
switch action { switch action {
case PowerActionStart: case PowerActionStart:
return s.Environment.Start() return s.Environment.Start()

View File

@ -7,6 +7,9 @@ import (
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/environment/docker"
"github.com/pterodactyl/wings/events"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"strings" "strings"
"sync" "sync"
@ -18,8 +21,9 @@ type Server struct {
// Internal mutex used to block actions that need to occur sequentially, such as // Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk. // writing the configuration to the disk.
sync.RWMutex sync.RWMutex
emitterLock sync.Mutex emitterLock sync.Mutex
powerLock *semaphore.Weighted powerLock *semaphore.Weighted
throttleLock sync.RWMutex
// Maintains the configuration for the server. This is the data that gets returned by the Panel // Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images. // such as build settings and container images.
@ -29,16 +33,16 @@ type Server struct {
crasher CrashHandler crasher CrashHandler
resources ResourceUsage resources ResourceUsage
Archiver Archiver `json:"-"` Archiver Archiver `json:"-"`
Environment Environment `json:"-"` Environment environment.ProcessEnvironment `json:"-"`
Filesystem Filesystem `json:"-"` Filesystem Filesystem `json:"-"`
// Server cache used to store frequently requested information in memory and make // Server cache used to store frequently requested information in memory and make
// certain long operations return faster. For example, FS disk space usage. // certain long operations return faster. For example, FS disk space usage.
cache *cache.Cache cache *cache.Cache
// Events emitted by the server instance. // Events emitted by the server instance.
emitter *EventBus emitter *events.EventBus
// Defines the process configuration for the server instance. This is dynamically // Defines the process configuration for the server instance. This is dynamically
// fetched from the Pterodactyl Server instance each time the server process is // fetched from the Pterodactyl Server instance each time the server process is
@ -50,6 +54,9 @@ type Server struct {
// installation process, for example when a server is deleted from the panel while the // installation process, for example when a server is deleted from the panel while the
// installer process is still running. // installer process is still running.
installer InstallerDetails installer InstallerDetails
// The console throttler instance used to control outputs.
throttler *ConsoleThrottler
} }
type InstallerDetails struct { type InstallerDetails struct {
@ -131,6 +138,13 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
s.procConfig = cfg.ProcessConfiguration s.procConfig = cfg.ProcessConfiguration
s.Unlock() s.Unlock()
// If this is a Docker environment we need to sync the stop configuration with it so that
// the process isn't just terminated when a user requests it be stopped.
if e, ok := s.Environment.(*docker.Environment); ok {
s.Log().Debug("syncing stop configuration with configured docker environment")
e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop)
}
return nil return nil
} }
@ -150,6 +164,11 @@ func (s *Server) IsBootable() bool {
// Initalizes a server instance. This will run through and ensure that the environment // Initalizes a server instance. This will run through and ensure that the environment
// for the server is setup, and that all of the necessary files are created. // for the server is setup, and that all of the necessary files are created.
func (s *Server) CreateEnvironment() error { func (s *Server) CreateEnvironment() error {
// Ensure the data directory exists before getting too far through this process.
if err := s.Filesystem.EnsureDataDirectory(); err != nil {
return errors.WithStack(err)
}
return s.Environment.Create() return s.Environment.Create()
} }
@ -162,3 +181,7 @@ func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *a
func (s *Server) IsSuspended() bool { func (s *Server) IsSuspended() bool {
return s.Config().Suspended return s.Config().Suspended
} }
func (s *Server) Build() *environment.Limits {
return &s.Config().Build
}

View File

@ -67,7 +67,7 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.WithStack(err) return errors.WithStack(err)
} }
} else { } else {
c.Container.OomDisabled = v c.Build.OOMDisabled = v
} }
// Mergo also cannot handle this boolean value. // Mergo also cannot handle this boolean value.

View File

@ -4,3 +4,10 @@ var (
// The current version of this software. // The current version of this software.
Version = "0.0.1" Version = "0.0.1"
) )
const (
ProcessOfflineState = "offline"
ProcessStartingState = "starting"
ProcessRunningState = "running"
ProcessStoppingState = "stopping"
)