Merge develop into feature/file-uploads

This commit is contained in:
Matthew Penner 2020-07-31 16:31:06 -06:00
commit b1940426c3
38 changed files with 1288 additions and 625 deletions

View File

@ -23,7 +23,7 @@ jobs:
run: go test ./...
- name: Compress binary and make it executable
run: upx build/wings_linux_amd64 && chmod +x build/wings_linux_amd64
run: upx --brute build/wings_linux_amd64 && chmod +x build/wings_linux_amd64
- name: Extract changelog
env:
@ -32,7 +32,6 @@ jobs:
sed -n "/^## ${REF:10}/,/^## /{/^## /b;p}" CHANGELOG.md > ./RELEASE_CHANGELOG
echo ::set-output name=version_name::`sed -nr "s/^## (${REF:10} .*)$/\1/p" CHANGELOG.md`
- name: Create checksum and add to changelog
run: |
SUM=`cd build && sha256sum wings_linux_amd64`
@ -48,8 +47,8 @@ jobs:
git config --local user.name "Pterodactyl CI"
git checkout -b $BRANCH
git push -u origin $BRANCH
sed -i "s/ Version = \".*\"/ Version = \"${REF:11}\"/" config/app.php
git add config/app.php
sed -i "s/ Version = \".*\"/ Version = \"${REF:11}\"/" system/const.go
git add system/const.go
git commit -m "bump version for release"
git push

View File

@ -169,7 +169,7 @@ func rootCmdRun(*cobra.Command, []string) {
// Just for some nice log output.
for _, s := range server.GetServers().All() {
log.WithField("server", s.Uuid).Info("loaded configuration for server")
log.WithField("server", s.Id()).Info("loaded configuration for server")
}
// Create a new WaitGroup that limits us to 4 servers being bootstrapped at a time
@ -181,17 +181,13 @@ func rootCmdRun(*cobra.Command, []string) {
wg.Add()
go func(s *server.Server) {
// Required for tracing purposes.
var err error
defer wg.Done()
defer func() {
s.Log().Trace("ensuring server environment exists").Stop(&err)
wg.Done()
}()
s.Log().Info("ensuring server environment exists")
// Create a server environment if none exists currently. This allows us to recover from Docker
// being reinstalled on the host system for example.
if err = s.Environment.Create(); err != nil {
if err := s.Environment.Create(); err != nil {
s.Log().WithField("error", err).Error("failed to process environment")
}

1
go.mod
View File

@ -31,6 +31,7 @@ require (
github.com/docker/go-units v0.3.3 // indirect
github.com/fatih/color v1.9.0
github.com/gabriel-vasile/mimetype v0.1.4
github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0
github.com/gin-gonic/gin v1.6.3
github.com/golang/protobuf v1.3.5 // indirect

4
go.sum
View File

@ -79,6 +79,10 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v0.1.4 h1:5mcsq3+DXypREUkW+1juhjeKmE/XnWgs+paHMJn7lf8=
github.com/gabriel-vasile/mimetype v0.1.4/go.mod h1:kMJbg3SlWZCsj4R73F1WDzbT9AyGCOVmUtIxxwO5pmI=
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8=
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w=
github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 h1:oSQ61LxZkz3Z4La0O5cbyVDvLWEfbNgiD43cSPdjPQQ=
github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs=
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 h1:7KeiSrO5puFH1+vdAdbpiie2TrNnkvFc/eOQzT60Z2k=
github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0/go.mod h1:D1+3UtCYAJ1os1PI+zhTVEj6Tb+IHJvXjXKz83OstmM=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=

View File

@ -29,12 +29,10 @@ func New(data []byte) (*Installer, error) {
return nil, NewValidationError("service egg provided was not in a valid format")
}
s := &server.Server{
cfg := &server.Configuration{
Uuid: getString(data, "uuid"),
Suspended: false,
State: server.ProcessOfflineState,
Invocation: getString(data, "invocation"),
EnvVars: make(server.EnvironmentVariables),
Build: server.BuildSettings{
MemoryLimit: getInt(data, "build", "memory"),
Swap: getInt(data, "build", "swap"),
@ -43,20 +41,18 @@ func New(data []byte) (*Installer, error) {
DiskSpace: getInt(data, "build", "disk"),
Threads: getString(data, "build", "threads"),
},
Allocations: server.Allocations{
Mappings: make(map[string][]int),
},
CrashDetectionEnabled: true,
}
s.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip")
s.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port"))
cfg.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip")
cfg.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port"))
// Unmarshal the environment variables from the request into the server struct.
if b, _, _, err := jsonparser.Get(data, "environment"); err != nil {
return nil, errors.WithStack(err)
} else {
s.EnvVars = make(server.EnvironmentVariables)
if err := json.Unmarshal(b, &s.EnvVars); err != nil {
cfg.EnvVars = make(server.EnvironmentVariables)
if err := json.Unmarshal(b, &cfg.EnvVars); err != nil {
return nil, errors.WithStack(err)
}
}
@ -65,15 +61,15 @@ func New(data []byte) (*Installer, error) {
if b, _, _, err := jsonparser.Get(data, "allocations", "mappings"); err != nil {
return nil, errors.WithStack(err)
} else {
s.Allocations.Mappings = make(map[string][]int)
if err := json.Unmarshal(b, &s.Allocations.Mappings); err != nil {
cfg.Allocations.Mappings = make(map[string][]int)
if err := json.Unmarshal(b, &cfg.Allocations.Mappings); err != nil {
return nil, errors.WithStack(err)
}
}
s.Container.Image = getString(data, "container", "image")
cfg.Container.Image = getString(data, "container", "image")
c, rerr, err := api.NewRequester().GetServerConfiguration(s.Uuid)
c, rerr, err := api.NewRequester().GetServerConfiguration(cfg.Uuid)
if err != nil || rerr != nil {
if err != nil {
return nil, errors.WithStack(err)
@ -82,21 +78,18 @@ func New(data []byte) (*Installer, error) {
return nil, errors.New(rerr.String())
}
// Destroy the temporary server instance.
s = nil
// Create a new server instance using the configuration we wrote to the disk
// so that everything gets instantiated correctly on the struct.
s2, err := server.FromConfiguration(c)
s, err := server.FromConfiguration(c)
return &Installer{
server: s2,
server: s,
}, err
}
// Returns the UUID associated with this installer instance.
func (i *Installer) Uuid() string {
return i.server.Uuid
return i.server.Id()
}
// Return the server instance.

View File

@ -81,7 +81,7 @@ func (h *Handler) HandleLog(e *log.Entry) error {
}
func getErrorStack(err error, i bool) errors.StackTrace {
e, ok := errors.Cause(err).(tracer)
e, ok := err.(tracer)
if !ok {
if i {
// Just abort out of this and return a stacktrace leading up to this point. It isn't perfect
@ -89,7 +89,7 @@ func getErrorStack(err error, i bool) errors.StackTrace {
return errors.Wrap(err, "failed to generate stacktrace for caught error").(tracer).StackTrace()
}
return getErrorStack(errors.New(err.Error()), true)
return getErrorStack(errors.Wrap(err, err.Error()), true)
}
st := e.StackTrace()

View File

@ -33,7 +33,7 @@ func TrackedError(err error) *RequestError {
// generated this server for the purposes of logging.
func TrackedServerError(err error, s *server.Server) *RequestError {
return &RequestError{
Err: err,
Err: errors.WithStack(err),
Uuid: uuid.Must(uuid.NewRandom()).String(),
Message: "",
server: s,

View File

@ -48,7 +48,7 @@ func AuthorizationMiddleware(c *gin.Context) {
// Helper function to fetch a server out of the servers collection stored in memory.
func GetServer(uuid string) *server.Server {
return server.GetServers().Find(func(s *server.Server) bool {
return uuid == s.Uuid
return uuid == s.Id()
})
}

View File

@ -85,6 +85,7 @@ func Configure() *gin.Engine {
files.POST("/create-directory", postServerCreateDirectory)
files.POST("/delete", postServerDeleteFiles)
files.POST("/compress", postServerCompressFiles)
files.POST("/decompress", postServerDecompressFiles)
}
backup := server.Group("/backup")

View File

@ -13,7 +13,9 @@ import (
// Returns a single server from the collection of servers.
func getServer(c *gin.Context) {
c.JSON(http.StatusOK, GetServer(c.Param("server")))
s := GetServer(c.Param("server"))
c.JSON(http.StatusOK, s.Proc())
}
// Returns the logs for a given server instance.
@ -64,7 +66,7 @@ func postServerPower(c *gin.Context) {
//
// We don't really care about any of the other actions at this point, they'll all result
// in the process being stopped, which should have happened anyways if the server is suspended.
if (data.Action == "start" || data.Action == "restart") && s.Suspended {
if (data.Action == "start" || data.Action == "restart") && s.IsSuspended() {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "Cannot start or restart a server that is suspended.",
})
@ -162,7 +164,7 @@ func deleteServer(c *gin.Context) {
// Immediately suspend the server to prevent a user from attempting
// to start it while this process is running.
s.Suspended = true
s.Config().SetSuspended(true)
// If the server is currently installing, abort it.
if s.IsInstalling() {
@ -200,9 +202,9 @@ func deleteServer(c *gin.Context) {
}
}(s.Filesystem.Path())
var uuid = s.Uuid
var uuid = s.Id()
server.GetServers().Remove(func(s2 *server.Server) bool {
return s2.Uuid == uuid
return s2.Id() == uuid
})
// Deallocate the reference to this server.

View File

@ -3,7 +3,6 @@ package router
import (
"bufio"
"context"
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/router/tokens"
@ -83,6 +82,13 @@ func getServerListDirectory(c *gin.Context) {
stats, err := s.Filesystem.ListDirectory(d)
if err != nil {
if err.Error() == "readdirent: not a directory" {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The requested directory does not exist.",
})
return
}
TrackedServerError(err, s).AbortWithServerError(c)
return
}
@ -175,7 +181,7 @@ func postServerDeleteFiles(c *gin.Context) {
if len(data.Files) == 0 {
c.AbortWithStatusJSON(http.StatusUnprocessableEntity, gin.H{
"error": "No files were specified for deletion.",
"error": "No files were specififed for deletion.",
})
return
}
@ -283,6 +289,39 @@ func postServerCompressFiles(c *gin.Context) {
})
}
func postServerDecompressFiles(c *gin.Context) {
s := GetServer(c.Param("server"))
var data struct {
RootPath string `json:"root"`
File string `json:"file"`
}
if err := c.BindJSON(&data); err != nil {
return
}
hasSpace, err := s.Filesystem.SpaceAvailableForDecompression(data.RootPath, data.File)
if err != nil {
TrackedServerError(err, s).AbortWithServerError(c)
return
}
if !hasSpace {
c.AbortWithStatusJSON(http.StatusConflict, gin.H{
"error": "This server does not have enough available disk space to decompress this archive.",
})
return
}
if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil {
TrackedServerError(err, s).AbortWithServerError(c)
return
}
c.Status(http.StatusNoContent)
}
func postServerUploadFiles(c *gin.Context) {
token := tokens.UploadPayload{}
if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil {
@ -313,10 +352,6 @@ func postServerUploadFiles(c *gin.Context) {
return
}
for i := range form.File {
log.Debug(i)
}
headers, ok := form.File["files"]
if !ok {
c.AbortWithStatusJSON(http.StatusNotModified, gin.H{
@ -335,7 +370,6 @@ func postServerUploadFiles(c *gin.Context) {
c.AbortWithError(http.StatusInternalServerError, err)
return
}
log.Debug(p)
// We run this in a different method so I can use defer without any of
// the consequences caused by calling it in a loop.

View File

@ -51,8 +51,10 @@ func getServerWebsocket(c *gin.Context) {
continue
}
if err := handler.HandleInbound(j); err != nil {
handler.SendErrorJson(j, err)
}
go func(msg websocket.Message) {
if err := handler.HandleInbound(msg); err != nil {
handler.SendErrorJson(msg, err)
}
}(j)
}
}

View File

@ -98,33 +98,33 @@ func postServerArchive(c *gin.Context) {
start := time.Now()
if err := server.Archiver.Archive(); err != nil {
zap.S().Errorw("failed to get archive for server", zap.String("server", s.Uuid), zap.Error(err))
zap.S().Errorw("failed to get archive for server", zap.String("server", server.Id()), zap.Error(err))
return
}
zap.S().Debugw(
"successfully created archive for server",
zap.String("server", server.Uuid),
zap.String("server", server.Id()),
zap.Duration("time", time.Now().Sub(start).Round(time.Microsecond)),
)
r := api.NewRequester()
rerr, err := r.SendArchiveStatus(server.Uuid, true)
rerr, err := r.SendArchiveStatus(server.Id(), true)
if rerr != nil || err != nil {
if err != nil {
zap.S().Errorw("failed to notify panel with archive status", zap.String("server", server.Uuid), zap.Error(err))
zap.S().Errorw("failed to notify panel with archive status", zap.String("server", server.Id()), zap.Error(err))
return
}
zap.S().Errorw(
"panel returned an error when sending the archive status",
zap.String("server", server.Uuid),
zap.String("server", server.Id()),
zap.Error(errors.New(rerr.String())),
)
return
}
zap.S().Debugw("successfully notified panel about archive status", zap.String("server", server.Uuid))
zap.S().Debugw("successfully notified panel about archive status", zap.String("server", server.Id()))
}(s)
c.Status(http.StatusAccepted)

View File

@ -4,10 +4,13 @@ import (
"encoding/json"
"github.com/gbrlsnchs/jwt/v3"
"strings"
"sync"
)
type WebsocketPayload struct {
jwt.Payload
sync.RWMutex
UserID json.Number `json:"user_id"`
ServerUUID string `json:"server_uuid"`
Permissions []string `json:"permissions"`
@ -15,11 +18,24 @@ type WebsocketPayload struct {
// Returns the JWT payload.
func (p *WebsocketPayload) GetPayload() *jwt.Payload {
p.RLock()
defer p.RUnlock()
return &p.Payload
}
func (p *WebsocketPayload) GetServerUuid() string {
p.RLock()
defer p.RUnlock()
return p.ServerUUID
}
// Checks if the given token payload has a permission string.
func (p *WebsocketPayload) HasPermission(permission string) bool {
p.RLock()
defer p.RUnlock()
for _, k := range p.Permissions {
if k == permission || (!strings.HasPrefix(permission, "admin") && k == "*") {
return true

View File

@ -43,6 +43,8 @@ func (h *Handler) ListenForServerEvents(ctx context.Context) {
server.StatusEvent,
server.ConsoleOutputEvent,
server.InstallOutputEvent,
server.InstallStartedEvent,
server.InstallCompletedEvent,
server.DaemonMessageEvent,
server.BackupCompletedEvent,
}
@ -52,16 +54,15 @@ func (h *Handler) ListenForServerEvents(ctx context.Context) {
h.server.Events().Subscribe(event, eventChannel)
}
select {
case <-ctx.Done():
for _, event := range events {
h.server.Events().Unsubscribe(event, eventChannel)
}
for d := range eventChannel {
select {
case <-ctx.Done():
for _, event := range events {
h.server.Events().Unsubscribe(event, eventChannel)
}
close(eventChannel)
default:
// Listen for different events emitted by the server and respond to them appropriately.
for d := range eventChannel {
close(eventChannel)
default:
h.SendJson(&Message{
Event: d.Topic,
Args: []string{d.Data},

View File

@ -127,7 +127,7 @@ func (h *Handler) TokenValid() error {
return errors.New("jwt does not have connect permission")
}
if h.server.Uuid != j.ServerUUID {
if h.server.Id() != j.GetServerUuid() {
return errors.New("jwt server uuid mismatch")
}
@ -247,16 +247,7 @@ func (h *Handler) HandleInbound(m Message) error {
if state == server.ProcessOfflineState {
_ = h.server.Filesystem.HasSpaceAvailable()
resources := server.ResourceUsage{
Memory: 0,
MemoryLimit: 0,
CpuAbsolute: 0.0,
Disk: h.server.Resources.Disk,
}
resources.Network.RxBytes = 0
resources.Network.TxBytes = 0
b, _ := json.Marshal(resources)
b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{
Event: server.StatsEvent,
Args: []string{string(b)},
@ -280,11 +271,14 @@ func (h *Handler) HandleInbound(m Message) error {
break
case "restart":
if h.GetJwt().HasPermission(PermissionSendPowerRestart) {
if err := h.server.Environment.WaitForStop(60, false); err != nil {
return err
// If the server is alreay restarting don't do anything. Perhaps we send back an event
// in the future for this? For now no reason to knowingly trigger an error by trying to
// restart a process already restarting.
if h.server.Environment.IsRestarting() {
return nil
}
return h.server.Environment.Start()
return h.server.Environment.Restart()
}
break
case "kill":

17
server/allocations.go Normal file
View File

@ -0,0 +1,17 @@
package server
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}

View File

@ -23,7 +23,7 @@ func (a *Archiver) ArchivePath() string {
// ArchiveName returns the name of the server's archive.
func (a *Archiver) ArchiveName() string {
return a.Server.Uuid + ".tar.gz"
return a.Server.Id() + ".tar.gz"
}
// Exists returns a boolean based off if the archive exists.
@ -52,7 +52,12 @@ func (a *Archiver) Archive() error {
}
for _, file := range fileInfo {
files = append(files, filepath.Join(path, file.Name()))
f, err := a.Server.Filesystem.SafeJoin(path, file)
if err != nil {
return err
}
files = append(files, f)
}
stat, err := a.Stat()

View File

@ -20,9 +20,9 @@ type Archive struct {
Files *IncludedFiles
}
// Creates an archive at dest with all of the files definied in the included files struct.
func (a *Archive) Create(dest string, ctx context.Context) (os.FileInfo, error) {
f, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
// Creates an archive at dst with all of the files defined in the included files struct.
func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) {
f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
@ -66,14 +66,17 @@ func (a *Archive) Create(dest string, ctx context.Context) (os.FileInfo, error)
// Attempt to remove the archive if there is an error, report that error to
// the logger if it fails.
if rerr := os.Remove(dest); rerr != nil && !os.IsNotExist(rerr) {
log.WithField("location", dest).Warn("failed to delete corrupted backup archive")
if rerr := os.Remove(dst); rerr != nil && !os.IsNotExist(rerr) {
log.WithField("location", dst).Warn("failed to delete corrupted backup archive")
}
return nil, err
}
st, _ := f.Stat()
st, err := f.Stat()
if err != nil {
return nil, err
}
return st, nil
}
@ -101,7 +104,7 @@ func (a *Archive) addToArchive(p string, s *os.FileInfo, w *tar.Writer) error {
a.Lock()
defer a.Unlock()
if err = w.WriteHeader(header); err != nil {
if err := w.WriteHeader(header); err != nil {
return err
}

72
server/build_settings.go Normal file
View File

@ -0,0 +1,72 @@
package server
import "math"
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type BuildSettings struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
}
func (s *Server) Build() *BuildSettings {
return &s.Config().Build
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (b *BuildSettings) ConvertedCpuLimit() int64 {
if b.CpuLimit == 0 {
return -1
}
return b.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (b *BuildSettings) MemoryOverheadMultiplier() float64 {
if b.MemoryLimit <= 2048 {
return 1.15
} else if b.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (b *BuildSettings) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (b *BuildSettings) ConvertedSwap() int64 {
if b.Swap < 0 {
return -1
}
return (b.Swap * 1_000_000) + b.BoundedMemoryLimit()
}

View File

@ -10,7 +10,8 @@ import (
func (s *Server) UpdateConfigurationFiles() {
wg := new(sync.WaitGroup)
for _, v := range s.processConfiguration.ConfigurationFiles {
files := s.ProcessConfiguration().ConfigurationFiles
for _, v := range files {
wg.Add(1)
go func(f parser.ConfigurationFile, server *Server) {

91
server/configuration.go Normal file
View File

@ -0,0 +1,91 @@
package server
import (
"fmt"
"strconv"
"sync"
)
type EnvironmentVariables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (ev EnvironmentVariables) Get(key string) string {
val, ok := ev[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}
type Configuration struct {
mu sync.RWMutex
// The unique identifier for the server that should be used when referencing
// it against the Panel API (and internally). This will be used when naming
// docker containers as well as in log output.
Uuid string `json:"uuid"`
// Whether or not the server is in a suspended state. Suspended servers cannot
// be started or modified except in certain scenarios by an admin user.
Suspended bool `json:"suspended"`
// The command that should be used when booting up the server instance.
Invocation string `json:"invocation"`
// An array of environment variables that should be passed along to the running
// server process.
EnvVars EnvironmentVariables `json:"environment"`
Allocations Allocations `json:"allocations"`
Build BuildSettings `json:"build"`
CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"`
Mounts []Mount `json:"mounts"`
Resources ResourceUsage `json:"resources"`
Container struct {
// Defines the Docker image that will be used for this server
Image string `json:"image,omitempty"`
// If set to true, OOM killer will be disabled on the server's Docker container.
// If not present (nil) we will default to disabling it.
OomDisabled bool `default:"true" json:"oom_disabled"`
} `json:"container,omitempty"`
}
func (s *Server) Config() *Configuration {
s.cfg.mu.RLock()
defer s.cfg.mu.RUnlock()
return &s.cfg
}
func (c *Configuration) GetUuid() string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.Uuid
}
func (c *Configuration) SetSuspended(s bool) {
c.mu.Lock()
c.Suspended = s
c.mu.Unlock()
}

View File

@ -4,18 +4,32 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config"
"sync"
"time"
)
type CrashDetection struct {
// If set to false, the system will not listen for crash detection events that
// can indicate that the server stopped unexpectedly.
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
type CrashHandler struct {
mu sync.RWMutex
// Tracks the time of the last server crash event.
lastCrash time.Time
}
// Returns the time of the last crash for this server instance.
func (cd *CrashHandler) LastCrashTime() time.Time {
cd.mu.RLock()
defer cd.mu.RUnlock()
return cd.lastCrash
}
// Sets the last crash time for a server.
func (cd *CrashHandler) SetLastCrash(t time.Time) {
cd.mu.Lock()
cd.lastCrash = t
cd.mu.Unlock()
}
// Looks at the environment exit state to determine if the process exited cleanly or
// if it was the result of an event that we should try to recover from.
//
@ -30,8 +44,8 @@ func (s *Server) handleServerCrash() error {
// No point in doing anything here if the server isn't currently offline, there
// is no reason to do a crash detection event. If the server crash detection is
// disabled we want to skip anything after this as well.
if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled {
if !s.CrashDetection.Enabled {
if s.GetState() != ProcessOfflineState || !s.Config().CrashDetectionEnabled {
if !s.Config().CrashDetectionEnabled {
s.Log().Debug("server triggered crash detection but handler is disabled for server process")
s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.")
@ -57,7 +71,7 @@ func (s *Server) handleServerCrash() error {
s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode))
s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled))
c := s.CrashDetection.lastCrash
c := s.crasher.LastCrashTime()
// If the last crash time was within the last 60 seconds we do not want to perform
// an automatic reboot of the process. Return an error that can be handled.
if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) {
@ -66,7 +80,7 @@ func (s *Server) handleServerCrash() error {
return &crashTooFrequent{}
}
s.CrashDetection.lastCrash = time.Now()
s.crasher.SetLastCrash(time.Now())
return s.Environment.Start()
}

View File

@ -31,6 +31,13 @@ type Environment interface {
// not be returned.
Stop() error
// Restart a server instance. If already stopped the process will be started. This function
// will return an error if the server is already performing a restart process as to avoid
// unnecessary double/triple/quad looping issues if multiple people press restart or spam the
// button to restart.
Restart() error
IsRestarting() bool
// Waits for a server instance to stop gracefully. If the server is still detected
// as running after seconds, an error will be returned, or the server will be terminated
// depending on the value of the second argument.

View File

@ -16,16 +16,20 @@ import (
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"golang.org/x/sync/semaphore"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// Defines the base environment for Docker instances running through Wings.
type DockerEnvironment struct {
sync.RWMutex
Server *Server
// The Docker client being used for this instance.
@ -43,6 +47,25 @@ type DockerEnvironment struct {
// Holds the stats stream used by the polling commands so that we can easily close
// it out.
stats io.ReadCloser
// Locks when we're performing a restart to avoid trying to restart a process that is already
// being restarted.
restartSem *semaphore.Weighted
}
// Set if this process is currently attached to the process.
func (d *DockerEnvironment) SetAttached(a bool) {
d.Lock()
d.attached = a
d.Unlock()
}
// Determine if the this process is currently attached to the container.
func (d *DockerEnvironment) IsAttached() bool {
d.RLock()
defer d.RUnlock()
return d.attached
}
// Creates a new base Docker environment. A server must still be attached to it.
@ -71,7 +94,7 @@ func (d *DockerEnvironment) Type() string {
// Determines if the container exists in this environment.
func (d *DockerEnvironment) Exists() (bool, error) {
_, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
_, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil {
// If this error is because the container instance wasn't found via Docker we
@ -95,7 +118,7 @@ func (d *DockerEnvironment) Exists() (bool, error) {
//
// @see docker/client/errors.go
func (d *DockerEnvironment) IsRunning() (bool, error) {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil {
return false, err
}
@ -107,7 +130,7 @@ func (d *DockerEnvironment) IsRunning() (bool, error) {
// making any changes to the operational state of the container. This allows memory, cpu,
// and IO limitations to be adjusted on the fly for individual instances.
func (d *DockerEnvironment) InSituUpdate() error {
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil {
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil {
// If the container doesn't exist for some reason there really isn't anything
// we can do to fix that in this process (it doesn't make sense at least). In those
// cases just return without doing anything since we still want to save the configuration
@ -129,7 +152,7 @@ func (d *DockerEnvironment) InSituUpdate() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if _, err := d.Client.ContainerUpdate(ctx, d.Server.Uuid, u); err != nil {
if _, err := d.Client.ContainerUpdate(ctx, d.Server.Id(), u); err != nil {
return errors.WithStack(err)
}
@ -155,7 +178,7 @@ func (d *DockerEnvironment) OnBeforeStart() error {
// Always destroy and re-create the server container to ensure that synced data from
// the Panel is used.
if err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil {
if err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil {
if !client.IsErrNotFound(err) {
return err
}
@ -199,11 +222,11 @@ func (d *DockerEnvironment) Start() error {
// Theoretically you'd have the Panel handle all of this logic, but we cannot do that
// because we allow the websocket to control the server power state as well, so we'll
// need to handle that action in here.
if d.Server.Suspended {
if d.Server.IsSuspended() {
return &suspendedError{}
}
if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil {
if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil {
// Do nothing if the container is not found, we just don't want to continue
// to the next block of code here. This check was inlined here to guard againt
// a nil-pointer when checking c.State below.
@ -258,7 +281,7 @@ func (d *DockerEnvironment) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := d.Client.ContainerStart(ctx, d.Server.Uuid, types.ContainerStartOptions{}); err != nil {
if err := d.Client.ContainerStart(ctx, d.Server.Id(), types.ContainerStartOptions{}); err != nil {
return errors.WithStack(err)
}
@ -271,19 +294,89 @@ func (d *DockerEnvironment) Start() error {
// Stops the container that the server is running in. This will allow up to 10
// seconds to pass before a failure occurs.
func (d *DockerEnvironment) Stop() error {
stop := d.Server.processConfiguration.Stop
stop := d.Server.ProcessConfiguration().Stop
if stop.Type == api.ProcessStopSignal {
return d.Terminate(os.Kill)
}
d.Server.SetState(ProcessStoppingState)
if stop.Type == api.ProcessStopCommand {
// Only attempt to send the stop command to the instance if we are actually attached to
// the instance. If we are not for some reason, just send the container stop event.
if d.IsAttached() && stop.Type == api.ProcessStopCommand {
return d.SendCommand(stop.Value)
}
t := time.Second * 10
return d.Client.ContainerStop(context.Background(), d.Server.Uuid, &t)
err := d.Client.ContainerStop(context.Background(), d.Server.Id(), &t)
if err != nil {
// If the container does not exist just mark the process as stopped and return without
// an error.
if client.IsErrNotFound(err) {
d.SetAttached(false)
d.Server.SetState(ProcessOfflineState)
return nil
}
return err
}
return nil
}
// Try to acquire a lock to restart the server. If one cannot be obtained within 5 seconds return
// an error to the caller. You should ideally be checking IsRestarting() before calling this function
// to avoid unnecessary delays since you can respond immediately from that.
func (d *DockerEnvironment) acquireRestartLock() error {
if d.restartSem == nil {
d.restartSem = semaphore.NewWeighted(1)
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
return d.restartSem.Acquire(ctx, 1)
}
// Restarts the server process by waiting for the process to gracefully stop and then triggering a
// start command. This will return an error if there is already a restart process executing for the
// server. The lock is released when the process is stopped and a start has begun.
func (d *DockerEnvironment) Restart() error {
d.Server.Log().Debug("attempting to acquire restart lock...")
if err := d.acquireRestartLock(); err != nil {
d.Server.Log().Warn("failed to acquire restart lock; already acquired by a different process")
return err
}
d.Server.Log().Debug("acquired restart lock")
err := d.WaitForStop(60, false)
if err != nil {
d.restartSem.Release(1)
return err
}
// Release the restart lock, it is now safe for someone to attempt restarting the server again.
d.restartSem.Release(1)
// Start the process.
return d.Start()
}
// Check if the server is currently running the restart process by checking if there is a semaphore
// allocated, and if so, if we can aquire a lock on it.
func (d *DockerEnvironment) IsRestarting() bool {
if d.restartSem == nil {
return false
}
if d.restartSem.TryAcquire(1) {
d.restartSem.Release(1)
return false
}
return true
}
// Attempts to gracefully stop a server using the defined stop command. If the server
@ -304,7 +397,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
// Block the return of this function until the container as been marked as no
// longer running. If this wait does not end by the time seconds have passed,
// attempt to terminate the container, or return an error.
ok, errChan := d.Client.ContainerWait(ctx, d.Server.Uuid, container.WaitConditionNotRunning)
ok, errChan := d.Client.ContainerWait(ctx, d.Server.Id(), container.WaitConditionNotRunning)
select {
case <-ctx.Done():
if ctxErr := ctx.Err(); ctxErr != nil {
@ -326,7 +419,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error {
// Forcefully terminates the container using the signal passed through.
func (d *DockerEnvironment) Terminate(signal os.Signal) error {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil {
return errors.WithStack(err)
}
@ -338,7 +431,7 @@ func (d *DockerEnvironment) Terminate(signal os.Signal) error {
d.Server.SetState(ProcessStoppingState)
return d.Client.ContainerKill(
context.Background(), d.Server.Uuid, strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"),
context.Background(), d.Server.Id(), strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"),
)
}
@ -348,7 +441,7 @@ func (d *DockerEnvironment) Destroy() error {
// Avoid crash detection firing off.
d.Server.SetState(ProcessStoppingState)
err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{
err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{
RemoveVolumes: true,
RemoveLinks: false,
Force: true,
@ -368,7 +461,7 @@ func (d *DockerEnvironment) Destroy() error {
// Determine the container exit state and return the exit code and wether or not
// the container was killed by the OOM killer.
func (d *DockerEnvironment) ExitState() (uint32, bool, error) {
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil {
// I'm not entirely sure how this can happen to be honest. I tried deleting a
// container _while_ a server was running and wings gracefully saw the crash and
@ -394,7 +487,7 @@ func (d *DockerEnvironment) ExitState() (uint32, bool, error) {
// miss important output at the beginning because of the time delay with attaching to the
// output.
func (d *DockerEnvironment) Attach() error {
if d.attached {
if d.IsAttached() {
return nil
}
@ -403,7 +496,7 @@ func (d *DockerEnvironment) Attach() error {
}
var err error
d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Uuid, types.ContainerAttachOptions{
d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{
Stdin: true,
Stdout: true,
Stderr: true,
@ -418,7 +511,7 @@ func (d *DockerEnvironment) Attach() error {
Server: d.Server,
}
d.attached = true
d.SetAttached(true)
go func() {
if err := d.EnableResourcePolling(); err != nil {
d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server")
@ -429,7 +522,7 @@ func (d *DockerEnvironment) Attach() error {
defer d.stream.Close()
defer func() {
d.Server.SetState(ProcessOfflineState)
d.attached = false
d.SetAttached(false)
}()
io.Copy(console, d.stream.Reader)
@ -447,7 +540,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
return errors.WithStack(err)
}
return errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid))
return errors.New(fmt.Sprintf("no such container: %s", d.Server.Id()))
}
opts := types.ContainerLogsOptions{
@ -457,7 +550,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error {
Since: time.Now().Format(time.RFC3339),
}
reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Uuid, opts)
reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Id(), opts)
go func(r io.ReadCloser) {
defer r.Close()
@ -483,7 +576,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
return errors.New("cannot enable resource polling on a server that is not running")
}
stats, err := d.Client.ContainerStats(context.Background(), d.Server.Uuid, true)
stats, err := d.Client.ContainerStats(context.Background(), d.Server.Id(), true)
if err != nil {
return errors.WithStack(err)
}
@ -510,20 +603,16 @@ func (d *DockerEnvironment) EnableResourcePolling() error {
return
}
s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(&v.PreCPUStats, &v.CPUStats)
s.Resources.Memory = s.Resources.CalculateDockerMemory(v.MemoryStats)
s.Resources.MemoryLimit = v.MemoryStats.Limit
s.Proc().UpdateFromDocker(v)
for _, nw := range v.Networks {
s.Proc().UpdateNetworkBytes(&nw)
}
// Why you ask? This already has the logic for caching disk space in use and then
// also handles pushing that value to the resources object automatically.
s.Filesystem.HasSpaceAvailable()
for _, nw := range v.Networks {
s.Resources.Network.RxBytes += nw.RxBytes
s.Resources.Network.TxBytes += nw.TxBytes
}
b, _ := json.Marshal(s.Resources)
b, _ := json.Marshal(s.Proc())
s.Events().Publish(StatsEvent, string(b))
}
}(d.Server)
@ -538,15 +627,16 @@ func (d *DockerEnvironment) DisableResourcePolling() error {
}
err := d.stats.Close()
d.Server.Resources.CpuAbsolute = 0
d.Server.Resources.Memory = 0
d.Server.Resources.Network.TxBytes = 0
d.Server.Resources.Network.RxBytes = 0
d.Server.Proc().Empty()
return errors.WithStack(err)
}
// Returns the image to be used for the instance.
func (d *DockerEnvironment) Image() string {
return d.Server.Config().Container.Image
}
// Pulls the image from Docker. If there is an error while pulling the image from the source
// but the image already exists locally, we will report that error to the logger but continue
// with the process.
@ -564,7 +654,7 @@ func (d *DockerEnvironment) ensureImageExists() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
defer cancel()
out, err := d.Client.ImagePull(ctx, d.Server.Container.Image, types.ImagePullOptions{All: false})
out, err := d.Client.ImagePull(ctx, d.Image(), types.ImagePullOptions{All: false})
if err != nil {
images, ierr := d.Client.ImageList(ctx, types.ImageListOptions{})
if ierr != nil {
@ -575,12 +665,12 @@ func (d *DockerEnvironment) ensureImageExists() error {
for _, img := range images {
for _, t := range img.RepoTags {
if t != d.Server.Container.Image {
if t != d.Image() {
continue
}
d.Server.Log().WithFields(log.Fields{
"image": d.Server.Container.Image,
"image": d.Image(),
"error": errors.New(err.Error()),
}).Warn("unable to pull requested image from remote source, however the image exists locally")
@ -594,7 +684,7 @@ func (d *DockerEnvironment) ensureImageExists() error {
}
defer out.Close()
log.WithField("image", d.Server.Container.Image).Debug("pulling docker image... this could take a bit of time")
log.WithField("image", d.Image()).Debug("pulling docker image... this could take a bit of time")
// I'm not sure what the best approach here is, but this will block execution until the image
// is done being pulled, which is what we need.
@ -621,7 +711,7 @@ func (d *DockerEnvironment) Create() error {
// If the container already exists don't hit the user with an error, just return
// the current information about it which is what we would do when creating the
// container anyways.
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err == nil {
if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err == nil {
return nil
} else if !client.IsErrNotFound(err) {
return errors.WithStack(err)
@ -633,7 +723,7 @@ func (d *DockerEnvironment) Create() error {
}
conf := &container.Config{
Hostname: d.Server.Uuid,
Hostname: d.Server.Id(),
Domainname: config.Get().Docker.Domainname,
User: strconv.Itoa(config.Get().System.User.Uid),
AttachStdin: true,
@ -641,12 +731,9 @@ func (d *DockerEnvironment) Create() error {
AttachStderr: true,
OpenStdin: true,
Tty: true,
ExposedPorts: d.exposedPorts(),
Image: d.Server.Container.Image,
Env: d.Server.GetEnvironmentVariables(),
Image: d.Image(),
Env: d.Server.GetEnvironmentVariables(),
Labels: map[string]string{
"Service": "Pterodactyl",
"ContainerType": "server_process",
@ -663,7 +750,7 @@ func (d *DockerEnvironment) Create() error {
}
var mounted bool
for _, m := range d.Server.Mounts {
for _, m := range d.Server.Config().Mounts {
mounted = false
source := filepath.Clean(m.Source)
target := filepath.Clean(m.Target)
@ -685,16 +772,17 @@ func (d *DockerEnvironment) Create() error {
break
}
log := log.WithFields(log.Fields{
"server": d.Server.Uuid,
logger := log.WithFields(log.Fields{
"server": d.Server.Id(),
"source_path": source,
"target_path": target,
"read_only": m.ReadOnly,
})
if mounted {
log.Debug("attaching mount to server's container")
logger.Debug("attaching mount to server's container")
} else {
log.Warn("skipping mount because it isn't allowed")
logger.Warn("skipping mount because it isn't allowed")
}
}
@ -738,7 +826,7 @@ func (d *DockerEnvironment) Create() error {
NetworkMode: container.NetworkMode(config.Get().Docker.Network.Mode),
}
if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Uuid); err != nil {
if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Id()); err != nil {
return errors.WithStack(err)
}
@ -748,7 +836,7 @@ func (d *DockerEnvironment) Create() error {
// Sends the specified command to the stdin of the running container instance. There is no
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
func (d *DockerEnvironment) SendCommand(c string) error {
if !d.attached {
if !d.IsAttached() {
return errors.New("attempting to send command to non-attached instance")
}
@ -760,7 +848,7 @@ func (d *DockerEnvironment) SendCommand(c string) error {
// Reads the log file for the server. This does not care if the server is running or not, it will
// simply try to read the last X bytes of the file and return them.
func (d *DockerEnvironment) Readlog(len int64) ([]string, error) {
j, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
j, err := d.Client.ContainerInspect(context.Background(), d.Server.Id())
if err != nil {
return nil, err
}
@ -837,7 +925,7 @@ func (d *DockerEnvironment) parseLogToStrings(b []byte) ([]string, error) {
func (d *DockerEnvironment) portBindings() nat.PortMap {
var out = nat.PortMap{}
for ip, ports := range d.Server.Allocations.Mappings {
for ip, ports := range d.Server.Config().Allocations.Mappings {
for _, port := range ports {
// Skip over invalid ports.
if port < 0 || port > 65535 {
@ -887,14 +975,14 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet {
// the same or higher than the memory limit.
func (d *DockerEnvironment) getResourcesForServer() container.Resources {
return container.Resources{
Memory: d.Server.Build.BoundedMemoryLimit(),
MemoryReservation: d.Server.Build.MemoryLimit * 1_000_000,
MemorySwap: d.Server.Build.ConvertedSwap(),
CPUQuota: d.Server.Build.ConvertedCpuLimit(),
Memory: d.Server.Build().BoundedMemoryLimit(),
MemoryReservation: d.Server.Build().MemoryLimit * 1_000_000,
MemorySwap: d.Server.Build().ConvertedSwap(),
CPUQuota: d.Server.Build().ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: d.Server.Build.IoWeight,
OomKillDisable: &d.Server.Container.OomDisabled,
CpusetCpus: d.Server.Build.Threads,
BlkioWeight: d.Server.Build().IoWeight,
OomKillDisable: &d.Server.Config().Container.OomDisabled,
CpusetCpus: d.Server.Build().Threads,
}
}

View File

@ -9,12 +9,14 @@ import (
// Defines all of the possible output events for a server.
// noinspection GoNameStartsWithPackageName
const (
DaemonMessageEvent = "daemon message"
InstallOutputEvent = "install output"
ConsoleOutputEvent = "console output"
StatusEvent = "status"
StatsEvent = "stats"
BackupCompletedEvent = "backup completed"
DaemonMessageEvent = "daemon message"
InstallOutputEvent = "install output"
InstallStartedEvent = "install started"
InstallCompletedEvent = "install completed"
ConsoleOutputEvent = "console output"
StatusEvent = "status"
StatsEvent = "stats"
BackupCompletedEvent = "backup completed"
)
type Event struct {
@ -30,6 +32,9 @@ type EventBus struct {
// Returns the server's emitter instance.
func (s *Server) Events() *EventBus {
s.emitterLock.Lock()
defer s.emitterLock.Unlock()
if s.emitter == nil {
s.emitter = &EventBus{
subscribers: map[string][]chan Event{},
@ -83,11 +88,27 @@ func (e *EventBus) Subscribe(topic string, ch chan Event) {
e.Lock()
defer e.Unlock()
if p, ok := e.subscribers[topic]; ok {
e.subscribers[topic] = append(p, ch)
} else {
p, ok := e.subscribers[topic]
// If there is nothing currently subscribed to this topic just go ahead and create
// the item and then return.
if !ok {
e.subscribers[topic] = append([]chan Event{}, ch)
return
}
// If this topic is already setup, first iterate over the event channels currently in there
// and confirm there is not a match. If there _is_ a match do nothing since that means this
// channel is already being tracked. This avoids registering two identical handlers for the
// same topic, and means the Unsubscribe function can safely assume there will only be a
// single match for an event.
for i := range e.subscribers[topic] {
if ch == e.subscribers[topic][i] {
return
}
}
e.subscribers[topic] = append(p, ch)
}
// Unsubscribe a channel from a topic.
@ -99,6 +120,10 @@ func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
for i := range e.subscribers[topic] {
if ch == e.subscribers[topic][i] {
e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...)
// Subscribe enforces a unique event channel for the topic, so we can safely exit
// this loop once matched since there should not be any additional matches after
// this point.
break
}
}
}

View File

@ -26,18 +26,27 @@ import (
)
// Error returned when there is a bad path provided to one of the FS calls.
var InvalidPathResolution = errors.New("invalid path resolution")
type PathResolutionError struct{}
// Returns the error response in a string form that can be more easily consumed.
func (pre PathResolutionError) Error() string {
return "invalid path resolution"
}
func IsPathResolutionError(err error) bool {
_, ok := err.(PathResolutionError)
return ok
}
type Filesystem struct {
// The server object associated with this Filesystem.
Server *Server
Configuration *config.SystemConfiguration
Server *Server
cacheDiskMu sync.Mutex
}
// Returns the root path that contains all of a server's data.
func (fs *Filesystem) Path() string {
return filepath.Join(fs.Configuration.Data, fs.Server.Uuid)
return filepath.Join(config.Get().System.Data, fs.Server.Id())
}
// Normalizes a directory being passed in to ensure the user is not able to escape
@ -49,12 +58,8 @@ func (fs *Filesystem) Path() string {
func (fs *Filesystem) SafePath(p string) (string, error) {
var nonExistentPathResolution string
// Calling filpath.Clean on the joined directory will resolve it to the absolute path,
// removing any ../ type of resolution arguments, and leaving us with a direct path link.
//
// This will also trim the existing root path off the beginning of the path passed to
// the function since that can get a bit messy.
r := filepath.Clean(filepath.Join(fs.Path(), strings.TrimPrefix(p, fs.Path())))
// Start with a cleaned up path before checking the more complex bits.
r := fs.unsafeFilePath(p)
// At the same time, evaluate the symlink status and determine where this file or folder
// is truly pointing to.
@ -72,7 +77,7 @@ func (fs *Filesystem) SafePath(p string) (string, error) {
for k := range parts {
try = strings.Join(parts[:(len(parts)-k)], "/")
if !strings.HasPrefix(try, fs.Path()) {
if !fs.unsafeIsInDataDirectory(try) {
break
}
@ -87,8 +92,8 @@ func (fs *Filesystem) SafePath(p string) (string, error) {
// If the new path doesn't start with their root directory there is clearly an escape
// attempt going on, and we should NOT resolve this path for them.
if nonExistentPathResolution != "" {
if !strings.HasPrefix(nonExistentPathResolution, fs.Path()) {
return "", InvalidPathResolution
if !fs.unsafeIsInDataDirectory(nonExistentPathResolution) {
return "", PathResolutionError{}
}
// If the nonExistentPathResolution variable is not empty then the initial path requested
@ -101,11 +106,51 @@ func (fs *Filesystem) SafePath(p string) (string, error) {
// If the requested directory from EvalSymlinks begins with the server root directory go
// ahead and return it. If not we'll return an error which will block any further action
// on the file.
if strings.HasPrefix(p, fs.Path()) {
if fs.unsafeIsInDataDirectory(p) {
return p, nil
}
return "", InvalidPathResolution
return "", PathResolutionError{}
}
// Generate a path to the file by cleaning it up and appending the root server path to it. This
// DOES NOT gaurantee that the file resolves within the server data directory. You'll want to use
// the fs.unsafeIsInDataDirectory(p) function to confirm.
func (fs *Filesystem) unsafeFilePath(p string) string {
// Calling filpath.Clean on the joined directory will resolve it to the absolute path,
// removing any ../ type of resolution arguments, and leaving us with a direct path link.
//
// This will also trim the existing root path off the beginning of the path passed to
// the function since that can get a bit messy.
return filepath.Clean(filepath.Join(fs.Path(), strings.TrimPrefix(p, fs.Path())))
}
// Check that that path string starts with the server data directory path. This function DOES NOT
// validate that the rest of the path does not end up resolving out of this directory, or that the
// targeted file or folder is not a symlink doing the same thing.
func (fs *Filesystem) unsafeIsInDataDirectory(p string) bool {
return strings.HasPrefix(strings.TrimSuffix(p, "/")+"/", strings.TrimSuffix(fs.Path(), "/")+"/")
}
// Helper function to keep some of the codebase a little cleaner. Returns a "safe" version of the path
// joined with a file. This is important because you cannot just assume that appending a file to a cleaned
// path will result in a cleaned path to that file. For example, imagine you have the following scenario:
//
// my_bad_file -> symlink:/etc/passwd
//
// cleaned := SafePath("../../etc") -> "/"
// filepath.Join(cleaned, my_bad_file) -> "/my_bad_file"
//
// You might think that "/my_bad_file" is fine since it isn't pointing to the original "../../etc/my_bad_file".
// However, this doesn't account for symlinks where the file might be pointing outside of the directory, so
// calling a function such as Chown against it would chown the symlinked location, and not the file within the
// Wings daemon.
func (fs *Filesystem) SafeJoin(dir string, f os.FileInfo) (string, error) {
if f.Mode()&os.ModeSymlink != 0 {
return fs.SafePath(filepath.Join(dir, f.Name()))
}
return filepath.Join(dir, f.Name()), nil
}
// Executes the fs.SafePath function in parallel against an array of paths. If any of the calls
@ -162,18 +207,45 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) {
// Because determining the amount of space being used by a server is a taxing operation we
// will load it all up into a cache and pull from that as long as the key is not expired.
func (fs *Filesystem) HasSpaceAvailable() bool {
var space = fs.Server.Build.DiskSpace
space := fs.Server.Build().DiskSpace
size, err := fs.getCachedDiskUsage()
if err != nil {
fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
}
// Determine if their folder size, in bytes, is smaller than the amount of space they've
// been allocated.
fs.Server.Proc().SetDisk(size)
// If space is -1 or 0 just return true, means they're allowed unlimited.
//
// Technically we could skip disk space calculation because we don't need to check if the server exceeds it's limit
// but because this method caches the disk usage it would be best to calculate the disk usage and always
// return true.
if space <= 0 {
return true
}
// If we have a match in the cache, use that value in the return. No need to perform an expensive
// disk operation, even if this is an empty value.
if x, exists := fs.Server.Cache.Get("disk_used"); exists {
fs.Server.Resources.Disk = x.(int64)
return (x.(int64) / 1000.0 / 1000.0) <= space
return (size / 1000.0 / 1000.0) <= space
}
// Internal helper function to allow other parts of the codebase to check the total used disk space
// as needed without overly taxing the system. This will prioritize the value from the cache to avoid
// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there
// is no longer a cached value.
func (fs *Filesystem) getCachedDiskUsage() (int64, error) {
// Obtain an exclusive lock on this process so that we don't unintentionally run it at the same
// time as another running process. Once the lock is available it'll read from the cache for the
// second call rather than hitting the disk in parallel.
//
// This effectively the same speed as running this call in parallel since this cache will return
// instantly on the second call.
fs.cacheDiskMu.Lock()
defer fs.cacheDiskMu.Unlock()
if x, exists := fs.Server.cache.Get("disk_used"); exists {
return x.(int64), nil
}
// If there is no size its either because there is no data (in which case running this function
@ -181,37 +253,30 @@ func (fs *Filesystem) HasSpaceAvailable() bool {
// grab the size of their data directory. This is a taxing operation, so we want to store it in
// the cache once we've gotten it.
size, err := fs.DirectorySize("/")
if err != nil {
fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size")
}
// Always cache the size, even if there is an error. We want to always return that value
// so that we don't cause an endless loop of determining the disk size if there is a temporary
// error encountered.
fs.Server.Cache.Set("disk_used", size, time.Second*60)
fs.Server.cache.Set("disk_used", size, time.Second*60)
// Determine if their folder size, in bytes, is smaller than the amount of space they've
// been allocated.
fs.Server.Resources.Disk = size
return (size / 1000.0 / 1000.0) <= space
return size, err
}
// Determines the directory size of a given location by running parallel tasks to iterate
// through all of the folders. Returns the size in bytes. This can be a fairly taxing operation
// on locations with tons of files, so it is recommended that you cache the output.
func (fs *Filesystem) DirectorySize(dir string) (int64, error) {
w := fs.NewWalker()
ctx := context.Background()
var size int64
err := w.Walk(dir, ctx, func(f os.FileInfo, _ string) bool {
// Only increment the size when we're dealing with a file specifically, otherwise
// just continue digging deeper until there are no more directories to iterate over.
err := fs.Walk(dir, func(_ string, f os.FileInfo, err error) error {
if err != nil {
return fs.handleWalkerError(err, f)
}
if !f.IsDir() {
atomic.AddInt64(&size, f.Size())
}
return true
return nil
})
return size, err
@ -293,7 +358,7 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
// Finally, chown the file to ensure the permissions don't end up out-of-whack
// if we had just created it.
return fs.Chown(p)
return fs.Chown(cleaned)
}
// Defines the stat struct object.
@ -409,7 +474,7 @@ func (fs *Filesystem) Chown(path string) error {
if s, err := os.Stat(cleaned); err != nil {
return errors.WithStack(err)
} else if !s.IsDir() {
return os.Chown(cleaned, fs.Configuration.User.Uid, fs.Configuration.User.Gid)
return os.Chown(cleaned, config.Get().System.User.Uid, config.Get().System.User.Gid)
}
return fs.chownDirectory(cleaned)
@ -435,16 +500,27 @@ func (fs *Filesystem) chownDirectory(path string) error {
}
for _, f := range files {
// Do not attempt to chmod a symlink. Go's os.Chown function will affect the symlink
// so if it points to a location outside the data directory the user would be able to
// (un)intentionally modify that files permissions.
if f.Mode()&os.ModeSymlink != 0 {
continue
}
p, err := fs.SafeJoin(cleaned, f)
if err != nil {
return err
}
if f.IsDir() {
wg.Add(1)
go func(p string) {
defer wg.Done()
fs.chownDirectory(p)
}(filepath.Join(cleaned, f.Name()))
}(p)
} else {
// Chown the file.
os.Chown(filepath.Join(cleaned, f.Name()), fs.Configuration.User.Uid, fs.Configuration.User.Gid)
os.Chown(p, config.Get().System.User.Uid, config.Get().System.User.Gid)
}
}
@ -536,17 +612,26 @@ func (fs *Filesystem) Copy(p string) error {
// Deletes a file or folder from the system. Prevents the user from accidentally
// (or maliciously) removing their root server data directory.
func (fs *Filesystem) Delete(p string) error {
cleaned, err := fs.SafePath(p)
if err != nil {
return errors.WithStack(err)
// This is one of the few (only?) places in the codebase where we're explictly not using
// the SafePath functionality when working with user provided input. If we did, you would
// not be able to delete a file that is a symlink pointing to a location outside of the data
// directory.
//
// We also want to avoid resolving a symlink that points _within_ the data directory and thus
// deleting the actual source file for the symlink rather than the symlink itself. For these
// purposes just resolve the actual file path using filepath.Join() and confirm that the path
// exists within the data directory.
resolved := fs.unsafeFilePath(p)
if !fs.unsafeIsInDataDirectory(resolved) {
return PathResolutionError{}
}
// Block any whoopsies.
if cleaned == fs.Path() {
if resolved == fs.Path() {
return errors.New("cannot delete root server directory")
}
return os.RemoveAll(cleaned)
return os.RemoveAll(resolved)
}
// Lists the contents of a given directory and returns stat information about each
@ -579,7 +664,14 @@ func (fs *Filesystem) ListDirectory(p string) ([]*Stat, error) {
var m = "inode/directory"
if !f.IsDir() {
m, _, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name()))
cleanedp, _ := fs.SafeJoin(cleaned, f)
if cleanedp != "" {
m, _, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name()))
} else {
// Just pass this for an unknown type because the file could not safely be resolved within
// the server data path.
m = "application/octet-stream"
}
}
out[idx] = &Stat{
@ -636,9 +728,6 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
return nil, err
}
w := fs.NewWalker()
ctx := context.Background()
i, err := ignore.CompileIgnoreLines(ignored...)
if err != nil {
return nil, err
@ -647,7 +736,12 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
// Walk through all of the files and directories on a server. This callback only returns
// files found, and will keep walking deeper and deeper into directories.
inc := new(backup.IncludedFiles)
if err := w.Walk(cleaned, ctx, func(f os.FileInfo, p string) bool {
if err := fs.Walk(cleaned, func(p string, f os.FileInfo, err error) error {
if err != nil {
return fs.handleWalkerError(err, f)
}
// Avoid unnecessary parsing if there are no ignored files, nothing will match anyways
// so no reason to call the function.
if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) {
@ -657,7 +751,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
// We can't just abort if the path is technically ignored. It is possible there is a nested
// file or folder that should not be excluded, so in this case we need to just keep going
// until we get to a final state.
return true
return nil
}); err != nil {
return nil, err
}
@ -665,7 +759,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In
return inc, nil
}
// Compresses all of the files matching the given paths in the specififed directory. This function
// Compresses all of the files matching the given paths in the specified directory. This function
// also supports passing nested paths to only compress certain files and folders when working in
// a larger directory. This effectively creates a local backup, but rather than ignoring specific
// files and folders, it takes an allow-list of files and folders.
@ -688,46 +782,58 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er
return nil, err
}
w := fs.NewWalker()
wg := new(sync.WaitGroup)
inc := new(backup.IncludedFiles)
// Iterate over all of the cleaned paths and merge them into a large object of final file
// paths to pass into the archiver. As directories are encountered this will drop into them
// and look for all of the files.
for _, p := range cleaned {
wg.Add(1)
f, err := os.Stat(p)
if err != nil {
fs.Server.Log().WithField("error", err).WithField("path", p).Debug("failed to stat file or directory for compression")
continue
}
go func(pa string) {
defer wg.Done()
if f.IsDir() {
err := fs.Walk(p, func(s string, info os.FileInfo, err error) error {
if err != nil {
return fs.handleWalkerError(err, info)
}
if !info.IsDir() {
inc.Push(&info, s)
}
return nil
})
f, err := os.Stat(pa)
if err != nil {
fs.Server.Log().WithField("error", err).WithField("path", pa).Warn("failed to stat file or directory for compression")
return
return nil, err
}
if f.IsDir() {
// Recursively drop into directory and get all of the additional files and directories within
// it that should be included in this backup.
w.Walk(pa, context.Background(), func(info os.FileInfo, s string) bool {
if !info.IsDir() {
inc.Push(&info, s)
}
return true
})
} else {
inc.Push(&f, pa)
}
}(p)
} else {
inc.Push(&f, p)
}
}
wg.Wait()
a := &backup.Archive{TrimPrefix: fs.Path(), Files: inc}
d := path.Join(cleanedRootDir, fmt.Sprintf("archive-%s.tar.gz", strings.ReplaceAll(time.Now().Format(time.RFC3339), ":", "")))
return a.Create(d, context.Background())
}
// Handle errors encountered when walking through directories.
//
// If there is a path resolution error just skip the item entirely. Only return this for a
// directory, otherwise return nil. Returning this error for a file will stop the walking
// for the remainder of the directory. This is assuming an os.FileInfo struct was even returned.
func (fs *Filesystem) handleWalkerError(err error, f os.FileInfo) error {
if !IsPathResolutionError(err) {
return err
}
if f != nil && f.IsDir() {
return filepath.SkipDir
}
return nil
}

View File

@ -0,0 +1,130 @@
package server
import (
"archive/tar"
"archive/zip"
"compress/gzip"
"fmt"
"github.com/mholt/archiver/v3"
"github.com/pkg/errors"
"io"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"sync/atomic"
)
// Look through a given archive and determine if decompressing it would put the server over
// its allocated disk space limit.
func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (bool, error) {
// Don't waste time trying to determine this if we know the server will have the space for
// it since there is no limit.
if fs.Server.Build().DiskSpace <= 0 {
return true, nil
}
source, err := fs.SafePath(filepath.Join(dir, file))
if err != nil {
return false, err
}
wg := new(sync.WaitGroup)
var dirSize int64
var cErr error
// Get the cached size in a parallel process so that if it is not cached we are not
// waiting an unnecessary amount of time on this call.
go func() {
wg.Add(1)
defer wg.Done()
dirSize, cErr = fs.getCachedDiskUsage()
}()
var size int64
// In a seperate thread, walk over the archive and figure out just how large the final
// output would be from dearchiving it.
go func() {
wg.Add(1)
defer wg.Done()
// Walk all of the files and calculate the total decompressed size of this archive.
archiver.Walk(source, func(f archiver.File) error {
atomic.AddInt64(&size, f.Size())
return nil
})
}()
wg.Wait()
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build().DiskSpace, cErr
}
// Decompress a file in a given directory by using the archiver tool to infer the file
// type and go from there. This will walk over all of the files within the given archive
// and ensure that there is not a zip-slip attack being attempted by validating that the
// final path is within the server data directory.
func (fs *Filesystem) DecompressFile(dir string, file string) error {
source, err := fs.SafePath(filepath.Join(dir, file))
if err != nil {
return errors.WithStack(err)
}
// Walk over all of the files spinning up an additional go-routine for each file we've encountered
// and then extract that file from the archive and write it to the disk. If any part of this process
// encounters an error the entire process will be stopped.
return archiver.Walk(source, func(f archiver.File) error {
// Don't waste time with directories, we don't need to create them if they have no contents, and
// we will ensure the directory exists when opening the file for writing anyways.
if f.IsDir() {
return nil
}
return fs.extractFileFromArchive(f)
})
}
// Extracts a single file from the archive and writes it to the disk after verifying that it will end
// up in the server data directory.
func (fs *Filesystem) extractFileFromArchive(f archiver.File) error {
var name string
switch s := f.Sys().(type) {
case *tar.Header:
name = s.Name
case *gzip.Header:
name = s.Name
case *zip.FileHeader:
name = s.Name
default:
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
}
// Guard against a zip-slip attack and prevent writing a file to a destination outside of
// the server root directory.
p, err := fs.SafePath(name)
if err != nil {
return err
}
// Ensure the directory structure for this file exists before trying to write the file
// to the disk, otherwise we'll have some unexpected fun.
if err := os.MkdirAll(strings.TrimSuffix(p, filepath.Base(p)), 0755); err != nil {
return err
}
// Open the file and truncate it if it already exists.
o, err := os.OpenFile(p, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer o.Close()
_, cerr := io.Copy(o, f)
return cerr
}

View File

@ -2,69 +2,140 @@ package server
import (
"context"
"golang.org/x/sync/errgroup"
"github.com/gammazero/workerpool"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sync"
)
type FileWalker struct {
*Filesystem
}
type PooledFileWalker struct {
wg sync.WaitGroup
pool *workerpool.WorkerPool
callback filepath.WalkFunc
cancel context.CancelFunc
err error
errOnce sync.Once
Filesystem *Filesystem
}
// Returns a new walker instance.
func (fs *Filesystem) NewWalker() *FileWalker {
return &FileWalker{fs}
}
// Iterate over all of the files and directories within a given directory. When a file is
// found the callback will be called with the file information. If a directory is encountered
// it will be recursively passed back through to this function.
func (fw *FileWalker) Walk(dir string, ctx context.Context, callback func (os.FileInfo, string) bool) error {
cleaned, err := fw.SafePath(dir)
// Creates a new pooled file walker that will concurrently walk over a given directory but limit itself
// to a worker pool as to not completely flood out the system or cause a process crash.
func newPooledWalker(fs *Filesystem) *PooledFileWalker {
return &PooledFileWalker{
Filesystem: fs,
// Create a worker pool that is the same size as the number of processors available on the
// system. Going much higher doesn't provide much of a performance boost, and is only more
// likely to lead to resource overloading anyways.
pool: workerpool.New(runtime.GOMAXPROCS(0)),
}
}
// Process a given path by calling the callback function for all of the files and directories within
// the path, and then dropping into any directories that we come across.
func (w *PooledFileWalker) process(path string) error {
p, err := w.Filesystem.SafePath(path)
if err != nil {
return err
}
// Get all of the files from this directory.
files, err := ioutil.ReadDir(cleaned)
files, err := ioutil.ReadDir(p)
if err != nil {
return err
}
// Create an error group that we can use to run processes in parallel while retaining
// the ability to cancel the entire process immediately should any of it fail.
g, ctx := errgroup.WithContext(ctx)
// Loop over all of the files and directories in the given directory and call the provided
// callback function. If we encounter a directory, push that directory onto the worker queue
// to be processed.
for _, f := range files {
if f.IsDir() {
fi := f
p := filepath.Join(cleaned, f.Name())
// Recursively call this function to continue digging through the directory tree within
// a seperate goroutine. If the context is canceled abort this process.
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// If the callback returns true, go ahead and keep walking deeper. This allows
// us to programatically continue deeper into directories, or stop digging
// if that pathway knows it needs nothing else.
if callback(fi, p) {
return fw.Walk(p, ctx, callback)
}
sp, err := w.Filesystem.SafeJoin(p, f)
if err != nil {
// Let the callback function handle what to do if there is a path resolution error because a
// dangerous path was resolved. If there is an error returned, return from this entire process
// otherwise just skip over this specific file. We don't care if its a file or a directory at
// this point since either way we're skipping it, however, still check for the SkipDir since that
// would be thrown otherwise.
if err = w.callback(sp, f, err); err != nil && err != filepath.SkipDir {
return err
}
return nil
}
})
} else {
// If this isn't a directory, go ahead and pass the file information into the
// callback. We don't care about the response since we won't be stepping into
// anything from here.
callback(f, filepath.Join(cleaned, f.Name()))
continue
}
i, err := os.Stat(sp)
// You might end up getting an error about a file or folder not existing if the given path
// if it is an invalid symlink. We can safely just skip over these files I believe.
if os.IsNotExist(err) {
continue
}
// Call the user-provided callback for this file or directory. If an error is returned that is
// not a SkipDir call, abort the entire process and bubble that error up.
if err = w.callback(sp, i, err); err != nil && err != filepath.SkipDir {
return err
}
// If this is a directory, and we didn't get a SkipDir error, continue through by pushing another
// job to the pool to handle it. If we requested a skip, don't do anything just continue on to the
// next item.
if i.IsDir() && err != filepath.SkipDir {
w.push(sp)
} else if !i.IsDir() && err == filepath.SkipDir {
// Per the spec for the callback, if we get a SkipDir error but it is returned for an item
// that is _not_ a directory, abort the remaining operations on the directory.
return nil
}
}
// Block until all of the routines finish and have returned a value.
return g.Wait()
}
return nil
}
// Push a new path into the worker pool and increment the waitgroup so that we do not return too
// early and cause panic's as internal directories attempt to submit to the pool.
func (w *PooledFileWalker) push(path string) {
w.wg.Add(1)
w.pool.Submit(func() {
defer w.wg.Done()
if err := w.process(path); err != nil {
w.errOnce.Do(func() {
w.err = err
if w.cancel != nil {
w.cancel()
}
})
}
})
}
// Walks the given directory and executes the callback function for all of the files and directories
// that are encountered.
func (fs *Filesystem) Walk(dir string, callback filepath.WalkFunc) error {
w := newPooledWalker(fs)
w.callback = callback
_, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.push(dir)
w.wg.Wait()
w.pool.StopWait()
if w.err != nil {
return w.err
}
return nil
}

View File

@ -36,6 +36,9 @@ func (s *Server) Install(sync bool) error {
}
}
// Send the start event so the Panel can automatically update.
s.Events().Publish(InstallStartedEvent, "")
err := s.internalInstall()
s.Log().Debug("notifying panel of server install state")
@ -52,6 +55,10 @@ func (s *Server) Install(sync bool) error {
l.Warn("failed to notify panel of server install state")
}
// Push an event to the websocket so we can auto-refresh the information in the panel once
// the install is completed.
s.Events().Publish(InstallCompletedEvent, "")
return err
}
@ -70,7 +77,7 @@ func (s *Server) Reinstall() error {
// Internal installation function used to simplify reporting back to the Panel.
func (s *Server) internalInstall() error {
script, rerr, err := api.NewRequester().GetInstallationScript(s.Uuid)
script, rerr, err := api.NewRequester().GetInstallationScript(s.Id())
if err != nil || rerr != nil {
if err != nil {
return err
@ -170,7 +177,7 @@ func (s *Server) AbortInstallation() {
// Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() {
err := ip.client.ContainerRemove(ip.context, ip.Server.Uuid+"_installer", types.ContainerRemoveOptions{
err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
@ -313,7 +320,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) {
Force: true,
}
if err := ip.client.ContainerRemove(ip.context, ip.Server.Uuid+"_installer", opts); err != nil {
if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil {
if !client.IsErrNotFound(err) {
e = append(e, err)
}
@ -333,7 +340,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) {
// Returns the log path for the installation process.
func (ip *InstallationProcess) GetLogPath() string {
return filepath.Join(config.Get().System.GetInstallLogPath(), ip.Server.Uuid+".log")
return filepath.Join(config.Get().System.GetInstallLogPath(), ip.Server.Id()+".log")
}
// Cleans up after the execution of the installation process. This grabs the logs from the
@ -369,7 +376,7 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
|
| Details
| ------------------------------
Server UUID: {{.Server.Uuid}}
Server UUID: {{.Server.Id}}
Container Image: {{.Script.ContainerImage}}
Container Entrypoint: {{.Script.Entrypoint}}
@ -448,7 +455,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) {
}
ip.Server.Log().WithField("install_script", installPath+"/install.sh").Info("creating install container for server process")
r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Uuid+"_installer")
r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Id()+"_installer")
if err != nil {
return "", errors.WithStack(err)
}
@ -516,7 +523,7 @@ func (ip *InstallationProcess) StreamOutput(id string) error {
func (s *Server) SyncInstallState(successful bool) error {
r := api.NewRequester()
rerr, err := r.SendInstallationStatus(s.Uuid, successful)
rerr, err := r.SendInstallationStatus(s.Id(), successful)
if rerr != nil || err != nil {
if err != nil {
return errors.WithStack(err)

View File

@ -27,9 +27,11 @@ func (s *Server) onConsoleOutput(data string) {
// If the specific line of output is one that would mark the server as started,
// set the server to that state. Only do this if the server is not currently stopped
// or stopping.
if s.GetState() == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) {
match := s.ProcessConfiguration().Startup.Done
if s.GetState() == ProcessStartingState && strings.Contains(data, match) {
s.Log().WithFields(log.Fields{
"match": s.processConfiguration.Startup.Done,
"match": match,
"against": data,
}).Debug("detected server in running state based on console line output")
@ -40,7 +42,8 @@ func (s *Server) onConsoleOutput(data string) {
// set the server to be in a stopping state, otherwise crash detection will kick in and
// cause the server to unexpectedly restart on the user.
if s.IsRunning() {
if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value {
stop := s.ProcessConfiguration().Stop
if stop.Type == api.ProcessStopCommand && data == stop.Value {
s.SetState(ProcessStoppingState)
}
}

120
server/loader.go Normal file
View File

@ -0,0 +1,120 @@
package server
import (
"github.com/apex/log"
"github.com/creasty/defaults"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/remeh/sizedwaitgroup"
"time"
)
var servers = NewCollection(nil)
func GetServers() *Collection {
return servers
}
// Iterates over a given directory and loads all of the servers listed before returning
// them to the calling function.
func LoadDirectory() error {
if len(servers.items) != 0 {
return errors.New("cannot call LoadDirectory with a non-nil collection")
}
// We could theoretically use a standard wait group here, however doing
// that introduces the potential to crash the program due to too many
// open files. This wouldn't happen on a small setup, but once the daemon is
// handling many servers you run that risk.
//
// For now just process 10 files at a time, that should be plenty fast to
// read and parse the YAML. We should probably make this configurable down
// the road to help big instances scale better.
wg := sizedwaitgroup.New(10)
configs, rerr, err := api.NewRequester().GetAllServerConfigurations()
if err != nil || rerr != nil {
if err != nil {
return errors.WithStack(err)
}
return errors.New(rerr.String())
}
log.Debug("retrieving cached server states from disk")
states, err := getServerStates()
if err != nil {
return errors.WithStack(err)
}
log.WithField("total_configs", len(configs)).Debug("looping over received configurations from API")
for uuid, data := range configs {
wg.Add()
go func(uuid string, data *api.ServerConfigurationResponse) {
defer wg.Done()
log.WithField("uuid", uuid).Debug("creating server object from configuration")
s, err := FromConfiguration(data)
if err != nil {
log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
if state, exists := states[s.Id()]; exists {
s.SetState(state)
s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file")
}
servers.Add(s)
}(uuid, data)
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
wg.Wait()
return nil
}
// Initializes a server using a data byte array. This will be marshaled into the
// given struct using a YAML marshaler. This will also configure the given environment
// for a server.
func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
cfg := Configuration{}
if err := defaults.Set(&cfg); err != nil {
return nil, err
}
s := new(Server)
s.cfg = cfg
if err := s.UpdateDataStructure(data.Settings, false); err != nil {
return nil, err
}
s.AddEventListeners()
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
if err := NewDockerEnvironment(s); err != nil {
return nil, err
}
s.cache = cache.New(time.Minute*10, time.Minute*15)
s.Archiver = Archiver{
Server: s,
}
s.Filesystem = Filesystem{
Server: s,
}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
return s, nil
}

10
server/process.go Normal file
View File

@ -0,0 +1,10 @@
package server
import "github.com/pterodactyl/wings/api"
func (s *Server) ProcessConfiguration() *api.ProcessConfiguration {
s.RLock()
defer s.RUnlock()
return s.procConfig
}

View File

@ -3,27 +3,38 @@ package server
import (
"github.com/docker/docker/api/types"
"math"
"sync"
"sync/atomic"
)
// Defines the current resource usage for a given server instance. If a server is offline you
// should obviously expect memory and CPU usage to be 0. However, disk will always be returned
// since that is not dependent on the server being running to collect that data.
type ResourceUsage struct {
mu sync.RWMutex
// The current server status.
State string `json:"state" default:"offline"`
// The total amount of memory, in bytes, that this server instance is consuming. This is
// calculated slightly differently than just using the raw Memory field that the stats
// return from the container, so please check the code setting this value for how that
// is calculated.
Memory uint64 `json:"memory_bytes"`
// The total amount of memory this container or resource can use. Inside Docker this is
// going to be higher than you'd expect because we're automatically allocating overhead
// abilities for the container, so its not going to be a perfect match.
MemoryLimit uint64 `json:"memory_limit_bytes"`
// The absolute CPU usage is the amount of CPU used in relation to the entire system and
// does not take into account any limits on the server process itself.
CpuAbsolute float64 `json:"cpu_absolute"`
// The current disk space being used by the server. This is cached to prevent slow lookup
// issues on frequent refreshes.
Disk int64 `json:"disk_bytes"`
// Current network transmit in & out for a container.
Network struct {
RxBytes uint64 `json:"rx_bytes"`
@ -31,6 +42,66 @@ type ResourceUsage struct {
} `json:"network"`
}
// Returns the resource usage stats for the server instance. If the server is not running, only the
// disk space currently used will be returned. When the server is running all of the other stats will
// be returned.
//
// When a process is stopped all of the stats are zeroed out except for the disk.
func (s *Server) Proc() *ResourceUsage {
s.resources.mu.RLock()
defer s.resources.mu.RUnlock()
return &s.resources
}
// Returns the servers current state.
func (ru *ResourceUsage) getInternalState() string {
ru.mu.RLock()
defer ru.mu.RUnlock()
return ru.State
}
// Sets the new state for the server.
func (ru *ResourceUsage) setInternalState(state string) {
ru.mu.Lock()
ru.State = state
ru.mu.Unlock()
}
// Resets the usages values to zero, used when a server is stopped to ensure we don't hold
// onto any values incorrectly.
func (ru *ResourceUsage) Empty() {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.Memory = 0
ru.CpuAbsolute = 0
ru.Network.TxBytes = 0
ru.Network.RxBytes = 0
}
func (ru *ResourceUsage) SetDisk(i int64) {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.Disk = i
}
func (ru *ResourceUsage) UpdateFromDocker(v *types.StatsJSON) {
ru.mu.Lock()
defer ru.mu.Unlock()
ru.CpuAbsolute = ru.calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats)
ru.Memory = ru.calculateDockerMemory(v.MemoryStats)
ru.MemoryLimit = v.MemoryStats.Limit
}
func (ru *ResourceUsage) UpdateNetworkBytes(nw *types.NetworkStats) {
atomic.AddUint64(&ru.Network.RxBytes, nw.RxBytes)
atomic.AddUint64(&ru.Network.TxBytes, nw.TxBytes)
}
// The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage
// value which can be rather confusing to people trying to compare panel usage to
// their stats output.
@ -40,7 +111,7 @@ type ResourceUsage struct {
// correct memory value anyways.
//
// @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249
func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 {
func (ru *ResourceUsage) calculateDockerMemory(stats types.MemoryStats) uint64 {
if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage {
return stats.Usage - v
}
@ -56,7 +127,7 @@ func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 {
// by the defined CPU limits on the container.
//
// @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166
func (ru *ResourceUsage) CalculateAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
func (ru *ResourceUsage) calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 {
// Calculate the change in CPU usage between the current and previous reading.
cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage)

View File

@ -4,99 +4,38 @@ import (
"context"
"fmt"
"github.com/apex/log"
"github.com/creasty/defaults"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/remeh/sizedwaitgroup"
"golang.org/x/sync/semaphore"
"math"
"os"
"strconv"
"strings"
"sync"
"time"
)
var servers *Collection
func GetServers() *Collection {
return servers
}
type EnvironmentVariables map[string]interface{}
// Ugly hacky function to handle environment variables that get passed through as not-a-string
// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a
// string wasn't passed through you'd cause a crash or the server to become unavailable. For now
// try to handle the most likely values from the JSON and hope for the best.
func (ev EnvironmentVariables) Get(key string) string {
val, ok := ev[key]
if !ok {
return ""
}
switch val.(type) {
case int:
return strconv.Itoa(val.(int))
case int32:
return strconv.FormatInt(val.(int64), 10)
case int64:
return strconv.FormatInt(val.(int64), 10)
case float32:
return fmt.Sprintf("%f", val.(float32))
case float64:
return fmt.Sprintf("%f", val.(float64))
case bool:
return strconv.FormatBool(val.(bool))
}
return val.(string)
}
// High level definition for a server instance being controlled by Wings.
type Server struct {
// The unique identifier for the server that should be used when referencing
// it against the Panel API (and internally). This will be used when naming
// docker containers as well as in log output.
Uuid string `json:"uuid"`
// Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk.
sync.RWMutex
emitterLock sync.Mutex
// Whether or not the server is in a suspended state. Suspended servers cannot
// be started or modified except in certain scenarios by an admin user.
Suspended bool `json:"suspended"`
// Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images.
cfg Configuration
// The power state of the server.
State string `default:"offline" json:"state"`
// The command that should be used when booting up the server instance.
Invocation string `json:"invocation"`
// An array of environment variables that should be passed along to the running
// server process.
EnvVars EnvironmentVariables `json:"environment"`
Allocations Allocations `json:"allocations"`
Build BuildSettings `json:"build"`
CrashDetection CrashDetection `json:"crash_detection"`
Mounts []Mount `json:"mounts"`
Resources ResourceUsage `json:"resources"`
// The crash handler for this server instance.
crasher CrashHandler
resources ResourceUsage
Archiver Archiver `json:"-"`
Environment Environment `json:"-"`
Filesystem Filesystem `json:"-"`
Container struct {
// Defines the Docker image that will be used for this server
Image string `json:"image,omitempty"`
// If set to true, OOM killer will be disabled on the server's Docker container.
// If not present (nil) we will default to disabling it.
OomDisabled bool `default:"true" json:"oom_disabled"`
} `json:"container,omitempty"`
// Server cache used to store frequently requested information in memory and make
// certain long operations return faster. For example, FS disk space usage.
Cache *cache.Cache `json:"-"`
cache *cache.Cache
// Events emitted by the server instance.
emitter *EventBus
@ -104,17 +43,13 @@ type Server struct {
// Defines the process configuration for the server instance. This is dynamically
// fetched from the Pterodactyl Server instance each time the server process is
// started, and then cached here.
processConfiguration *api.ProcessConfiguration
procConfig *api.ProcessConfiguration
// Tracks the installation process for this server and prevents a server from running
// two installer processes at the same time. This also allows us to cancel a running
// installation process, for example when a server is deleted from the panel while the
// installer process is still running.
installer InstallerDetails
// Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk.
sync.RWMutex
}
type InstallerDetails struct {
@ -127,183 +62,9 @@ type InstallerDetails struct {
sem *semaphore.Weighted
}
// The build settings for a given server that impact docker container creation and
// resource limits for a server instance.
type BuildSettings struct {
// The total amount of memory in megabytes that this server is allowed to
// use on the host system.
MemoryLimit int64 `json:"memory_limit"`
// The amount of additional swap space to be provided to a container instance.
Swap int64 `json:"swap"`
// The relative weight for IO operations in a container. This is relative to other
// containers on the system and should be a value between 10 and 1000.
IoWeight uint16 `json:"io_weight"`
// The percentage of CPU that this instance is allowed to consume relative to
// the host. A value of 200% represents complete utilization of two cores. This
// should be a value between 1 and THREAD_COUNT * 100.
CpuLimit int64 `json:"cpu_limit"`
// The amount of disk space in megabytes that a server is allowed to use.
DiskSpace int64 `json:"disk_space"`
// Sets which CPU threads can be used by the docker instance.
Threads string `json:"threads"`
}
// Converts the CPU limit for a server build into a number that can be better understood
// by the Docker environment. If there is no limit set, return -1 which will indicate to
// Docker that it has unlimited CPU quota.
func (b *BuildSettings) ConvertedCpuLimit() int64 {
if b.CpuLimit == 0 {
return -1
}
return b.CpuLimit * 1000
}
// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to
// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use
// 15%. This avoids unexpected crashes from processes like Java which run over the limit.
func (b *BuildSettings) MemoryOverheadMultiplier() float64 {
if b.MemoryLimit <= 2048 {
return 1.15
} else if b.MemoryLimit <= 4096 {
return 1.10
}
return 1.05
}
func (b *BuildSettings) BoundedMemoryLimit() int64 {
return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000))
}
// Returns the amount of swap available as a total in bytes. This is returned as the amount
// of memory available to the server initially, PLUS the amount of additional swap to include
// which is the format used by Docker.
func (b *BuildSettings) ConvertedSwap() int64 {
if b.Swap < 0 {
return -1
}
return (b.Swap * 1_000_000) + b.BoundedMemoryLimit()
}
// Defines the allocations available for a given server. When using the Docker environment
// driver these correspond to mappings for the container that allow external connections.
type Allocations struct {
// Defines the default allocation that should be used for this server. This is
// what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration
// files or the startup arguments for a server.
DefaultMapping struct {
Ip string `json:"ip"`
Port int `json:"port"`
} `json:"default"`
// Mappings contains all of the ports that should be assigned to a given server
// attached to the IP they correspond to.
Mappings map[string][]int `json:"mappings"`
}
// Iterates over a given directory and loads all of the servers listed before returning
// them to the calling function.
func LoadDirectory() error {
// We could theoretically use a standard wait group here, however doing
// that introduces the potential to crash the program due to too many
// open files. This wouldn't happen on a small setup, but once the daemon is
// handling many servers you run that risk.
//
// For now just process 10 files at a time, that should be plenty fast to
// read and parse the YAML. We should probably make this configurable down
// the road to help big instances scale better.
wg := sizedwaitgroup.New(10)
configs, rerr, err := api.NewRequester().GetAllServerConfigurations()
if err != nil || rerr != nil {
if err != nil {
return errors.WithStack(err)
}
return errors.New(rerr.String())
}
states, err := getServerStates()
if err != nil {
return errors.WithStack(err)
}
servers = NewCollection(nil)
for uuid, data := range configs {
wg.Add()
go func(uuid string, data *api.ServerConfigurationResponse) {
defer wg.Done()
s, err := FromConfiguration(data)
if err != nil {
log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
if state, exists := states[s.Uuid]; exists {
s.SetState(state)
s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file")
}
servers.Add(s)
}(uuid, data)
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
wg.Wait()
return nil
}
// Initializes a server using a data byte array. This will be marshaled into the
// given struct using a YAML marshaler. This will also configure the given environment
// for a server.
func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
s := new(Server)
if err := defaults.Set(s); err != nil {
return nil, err
}
if err := s.UpdateDataStructure(data.Settings, false); err != nil {
return nil, err
}
s.AddEventListeners()
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
if err := NewDockerEnvironment(s); err != nil {
return nil, err
}
s.Cache = cache.New(time.Minute*10, time.Minute*15)
s.Archiver = Archiver{
Server: s,
}
s.Filesystem = Filesystem{
Configuration: &config.Get().System,
Server: s,
}
s.Resources = ResourceUsage{}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
return s, nil
// Returns the UUID for the server instance.
func (s *Server) Id() string {
return s.Config().GetUuid()
}
// Returns all of the environment variables that should be assigned to a running
@ -313,28 +74,28 @@ func (s *Server) GetEnvironmentVariables() []string {
var out = []string{
fmt.Sprintf("TZ=%s", zone),
fmt.Sprintf("STARTUP=%s", s.Invocation),
fmt.Sprintf("SERVER_MEMORY=%d", s.Build.MemoryLimit),
fmt.Sprintf("SERVER_IP=%s", s.Allocations.DefaultMapping.Ip),
fmt.Sprintf("SERVER_PORT=%d", s.Allocations.DefaultMapping.Port),
fmt.Sprintf("STARTUP=%s", s.Config().Invocation),
fmt.Sprintf("SERVER_MEMORY=%d", s.Build().MemoryLimit),
fmt.Sprintf("SERVER_IP=%s", s.Config().Allocations.DefaultMapping.Ip),
fmt.Sprintf("SERVER_PORT=%d", s.Config().Allocations.DefaultMapping.Port),
}
eloop:
for k := range s.EnvVars {
for k := range s.Config().EnvVars {
for _, e := range out {
if strings.HasPrefix(e, strings.ToUpper(k)) {
continue eloop
}
}
out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.EnvVars.Get(k)))
out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.Config().EnvVars.Get(k)))
}
return out
}
func (s *Server) Log() *log.Entry {
return log.WithField("server", s.Uuid)
return log.WithField("server", s.Id())
}
// Syncs the state of the server on the Panel with Wings. This ensures that we're always
@ -366,7 +127,10 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
return errors.WithStack(err)
}
s.processConfiguration = cfg.ProcessConfiguration
s.Lock()
s.procConfig = cfg.ProcessConfiguration
s.Unlock()
return nil
}
@ -391,7 +155,7 @@ func (s *Server) CreateEnvironment() error {
// Gets the process configuration data for the server.
func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *api.RequestError, error) {
return api.NewRequester().GetServerConfiguration(s.Uuid)
return api.NewRequester().GetServerConfiguration(s.Id())
}
// Helper function that can receieve a power action and then process the
@ -401,11 +165,7 @@ func (s *Server) HandlePowerAction(action PowerAction) error {
case "start":
return s.Environment.Start()
case "restart":
if err := s.Environment.WaitForStop(60, false); err != nil {
return err
}
return s.Environment.Start()
return s.Environment.Restart()
case "stop":
return s.Environment.Stop()
case "kill":
@ -414,3 +174,8 @@ func (s *Server) HandlePowerAction(action PowerAction) error {
return errors.New("an invalid power action was provided")
}
}
// Checks if the server is marked as being suspended or not on the system.
func (s *Server) IsSuspended() bool {
return s.Config().Suspended
}

View File

@ -13,6 +13,13 @@ import (
var stateMutex sync.Mutex
const (
ProcessOfflineState = "offline"
ProcessStartingState = "starting"
ProcessRunningState = "running"
ProcessStoppingState = "stopping"
)
// Returns the state of the servers.
func getServerStates() (map[string]string, error) {
// Request a lock after we check if the file exists.
@ -40,7 +47,7 @@ func saveServerStates() error {
// Get the states of all servers on the daemon.
states := map[string]string{}
for _, s := range GetServers().All() {
states[s.Uuid] = s.GetState()
states[s.Id()] = s.GetState()
}
// Convert the map to a json object.
@ -60,13 +67,6 @@ func saveServerStates() error {
return nil
}
const (
ProcessOfflineState = "offline"
ProcessStartingState = "starting"
ProcessRunningState = "running"
ProcessStoppingState = "stopping"
)
// Sets the state of the server internally. This function handles crash detection as
// well as reporting to event listeners for the server.
func (s *Server) SetState(state string) error {
@ -76,16 +76,14 @@ func (s *Server) SetState(state string) error {
prevState := s.GetState()
// Obtain a mutex lock and update the current state of the server.
s.Lock()
s.State = state
// Update the currently tracked state for the server.
s.Proc().setInternalState(state)
// Emit the event to any listeners that are currently registered.
s.Log().WithField("status", s.State).Debug("saw server status change event")
s.Events().Publish(StatusEvent, s.State)
// Release the lock as it is no longer needed for the following actions.
s.Unlock()
if prevState != state {
s.Log().WithField("status", s.Proc().State).Debug("saw server status change event")
s.Events().Publish(StatusEvent, s.Proc().State)
}
// Persist this change to the disk immediately so that should the Daemon be stopped or
// crash we can immediately restore the server state.
@ -128,15 +126,14 @@ func (s *Server) SetState(state string) error {
// Returns the current state of the server in a race-safe manner.
func (s *Server) GetState() string {
s.RLock()
defer s.RUnlock()
return s.State
return s.Proc().getInternalState()
}
// Determines if the server state is running or not. This is different than the
// environment state, it is simply the tracked state from this daemon instance, and
// not the response from Docker.
func (s *Server) IsRunning() bool {
return s.GetState() == ProcessRunningState || s.GetState() == ProcessStartingState
st := s.GetState()
return st == ProcessRunningState || st == ProcessStartingState
}

View File

@ -15,7 +15,7 @@ import (
// it is up to the specific environment to determine what needs to happen when
// that is the case.
func (s *Server) UpdateDataStructure(data []byte, background bool) error {
src := new(Server)
src := new(Configuration)
if err := json.Unmarshal(data, src); err != nil {
return errors.WithStack(err)
}
@ -23,13 +23,29 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
// Don't allow obviously corrupted data to pass through into this function. If the UUID
// doesn't match something has gone wrong and the API is attempting to meld this server
// instance into a totally different one, which would be bad.
if src.Uuid != "" && s.Uuid != "" && src.Uuid != s.Uuid {
if src.Uuid != "" && s.Id() != "" && src.Uuid != s.Id() {
return errors.New("attempting to merge a data stack with an invalid UUID")
}
// Grab a copy of the configuration to work on.
c := *s.Config()
// Lock our copy of the configuration since the defered unlock will end up acting upon this
// new memory address rather than the old one. If we don't lock this, the defered unlock will
// cause a panic when it goes to run. However, since we only update s.cfg at the end, if there
// is an error before that point we'll still properly unlock the original configuration for the
// server.
c.mu.Lock()
// Lock the server configuration while we're doing this merge to avoid anything
// trying to overwrite it or make modifications while we're sorting out what we
// need to do.
s.cfg.mu.Lock()
defer s.cfg.mu.Unlock()
// Merge the new data object that we have received with the existing server data object
// and then save it to the disk so it is persistent.
if err := mergo.Merge(s, src, mergo.WithOverride); err != nil {
if err := mergo.Merge(&c, src, mergo.WithOverride); err != nil {
return errors.WithStack(err)
}
@ -39,9 +55,9 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
// backfiring at some point, but until then...
//
// We'll go ahead and do this with swap as well.
s.Build.CpuLimit = src.Build.CpuLimit
s.Build.Swap = src.Build.Swap
s.Build.DiskSpace = src.Build.DiskSpace
c.Build.CpuLimit = src.Build.CpuLimit
c.Build.Swap = src.Build.Swap
c.Build.DiskSpace = src.Build.DiskSpace
// Mergo can't quite handle this boolean value correctly, so for now we'll just
// handle this edge case manually since none of the other data passed through in this
@ -51,7 +67,7 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.WithStack(err)
}
} else {
s.Container.OomDisabled = v
c.Container.OomDisabled = v
}
// Mergo also cannot handle this boolean value.
@ -60,25 +76,28 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
return errors.WithStack(err)
}
} else {
s.Suspended = v
c.Suspended = v
}
// Environment and Mappings should be treated as a full update at all times, never a
// true patch, otherwise we can't know what we're passing along.
if src.EnvVars != nil && len(src.EnvVars) > 0 {
s.EnvVars = src.EnvVars
c.EnvVars = src.EnvVars
}
if src.Allocations.Mappings != nil && len(src.Allocations.Mappings) > 0 {
s.Allocations.Mappings = src.Allocations.Mappings
c.Allocations.Mappings = src.Allocations.Mappings
}
if src.Mounts != nil && len(src.Mounts) > 0 {
s.Mounts = src.Mounts
c.Mounts = src.Mounts
}
// Update the configuration once we have a lock on the configuration object.
s.cfg = c
if background {
s.runBackgroundActions()
go s.runBackgroundActions()
}
return nil
@ -91,24 +110,22 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error {
// These tasks run in independent threads where relevant to speed up any updates
// that need to happen.
func (s *Server) runBackgroundActions() {
// Update the environment in place, allowing memory and CPU usage to be adjusted
// on the fly without the user needing to reboot (theoretically).
go func(server *Server) {
server.Log().Info("performing server limit modification on-the-fly")
if err := server.Environment.InSituUpdate(); err != nil {
server.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment")
}
}(s)
// Check if the server is now suspended, and if so and the process is not terminated
// Check if the s is now suspended, and if so and the process is not terminated
// yet, do it immediately.
go func(server *Server) {
if server.Suspended && server.GetState() != ProcessOfflineState {
server.Log().Info("server suspended with running process state, terminating now")
if s.IsSuspended() && s.GetState() != ProcessOfflineState {
s.Log().Info("server suspended with running process state, terminating now")
if err := server.Environment.WaitForStop(10, true); err != nil {
server.Log().WithField("error", err).Warn("failed to terminate server environment after suspension")
}
if err := s.Environment.WaitForStop(10, true); err != nil {
s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension")
}
}(s)
}
if !s.IsSuspended() {
// Update the environment in place, allowing memory and CPU usage to be adjusted
// on the fly without the user needing to reboot (theoretically).
s.Log().Info("performing server limit modification on-the-fly")
if err := s.Environment.InSituUpdate(); err != nil {
s.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment")
}
}
}

View File

@ -49,7 +49,7 @@ func Initialize(config *config.Configuration) error {
func validatePath(fs sftp_server.FileSystem, p string) (string, error) {
s := server.GetServers().Find(func(server *server.Server) bool {
return server.Uuid == fs.UUID
return server.Id() == fs.UUID
})
if s == nil {
@ -61,7 +61,7 @@ func validatePath(fs sftp_server.FileSystem, p string) (string, error) {
func validateDiskSpace(fs sftp_server.FileSystem) bool {
s := server.GetServers().Find(func(server *server.Server) bool {
return server.Uuid == fs.UUID
return server.Id() == fs.UUID
})
if s == nil {
@ -105,7 +105,7 @@ func validateCredentials(c sftp_server.AuthenticationRequest) (*sftp_server.Auth
}
s := server.GetServers().Find(func(server *server.Server) bool {
return server.Uuid == resp.Server
return server.Id() == resp.Server
})
if s == nil {