diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 20fe3b7..4360d3f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,7 +23,7 @@ jobs: run: go test ./... - name: Compress binary and make it executable - run: upx build/wings_linux_amd64 && chmod +x build/wings_linux_amd64 + run: upx --brute build/wings_linux_amd64 && chmod +x build/wings_linux_amd64 - name: Extract changelog env: @@ -32,7 +32,6 @@ jobs: sed -n "/^## ${REF:10}/,/^## /{/^## /b;p}" CHANGELOG.md > ./RELEASE_CHANGELOG echo ::set-output name=version_name::`sed -nr "s/^## (${REF:10} .*)$/\1/p" CHANGELOG.md` - - name: Create checksum and add to changelog run: | SUM=`cd build && sha256sum wings_linux_amd64` @@ -48,8 +47,8 @@ jobs: git config --local user.name "Pterodactyl CI" git checkout -b $BRANCH git push -u origin $BRANCH - sed -i "s/ Version = \".*\"/ Version = \"${REF:11}\"/" config/app.php - git add config/app.php + sed -i "s/ Version = \".*\"/ Version = \"${REF:11}\"/" system/const.go + git add system/const.go git commit -m "bump version for release" git push diff --git a/cmd/root.go b/cmd/root.go index 60b5d31..ffbf96f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -169,7 +169,7 @@ func rootCmdRun(*cobra.Command, []string) { // Just for some nice log output. for _, s := range server.GetServers().All() { - log.WithField("server", s.Uuid).Info("loaded configuration for server") + log.WithField("server", s.Id()).Info("loaded configuration for server") } // Create a new WaitGroup that limits us to 4 servers being bootstrapped at a time @@ -181,17 +181,13 @@ func rootCmdRun(*cobra.Command, []string) { wg.Add() go func(s *server.Server) { - // Required for tracing purposes. - var err error + defer wg.Done() - defer func() { - s.Log().Trace("ensuring server environment exists").Stop(&err) - wg.Done() - }() + s.Log().Info("ensuring server environment exists") // Create a server environment if none exists currently. This allows us to recover from Docker // being reinstalled on the host system for example. - if err = s.Environment.Create(); err != nil { + if err := s.Environment.Create(); err != nil { s.Log().WithField("error", err).Error("failed to process environment") } diff --git a/go.mod b/go.mod index 8f61d89..0de71c2 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/docker/go-units v0.3.3 // indirect github.com/fatih/color v1.9.0 github.com/gabriel-vasile/mimetype v0.1.4 + github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 github.com/gin-gonic/gin v1.6.3 github.com/golang/protobuf v1.3.5 // indirect diff --git a/go.sum b/go.sum index 889a55f..2ac6433 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,10 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v0.1.4 h1:5mcsq3+DXypREUkW+1juhjeKmE/XnWgs+paHMJn7lf8= github.com/gabriel-vasile/mimetype v0.1.4/go.mod h1:kMJbg3SlWZCsj4R73F1WDzbT9AyGCOVmUtIxxwO5pmI= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753 h1:oSQ61LxZkz3Z4La0O5cbyVDvLWEfbNgiD43cSPdjPQQ= +github.com/gammazero/workerpool v0.0.0-20200608033439-1a5ca90a5753/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0 h1:7KeiSrO5puFH1+vdAdbpiie2TrNnkvFc/eOQzT60Z2k= github.com/gbrlsnchs/jwt/v3 v3.0.0-rc.0/go.mod h1:D1+3UtCYAJ1os1PI+zhTVEj6Tb+IHJvXjXKz83OstmM= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/installer/installer.go b/installer/installer.go index 37ce0cb..92ba655 100644 --- a/installer/installer.go +++ b/installer/installer.go @@ -29,12 +29,10 @@ func New(data []byte) (*Installer, error) { return nil, NewValidationError("service egg provided was not in a valid format") } - s := &server.Server{ + cfg := &server.Configuration{ Uuid: getString(data, "uuid"), Suspended: false, - State: server.ProcessOfflineState, Invocation: getString(data, "invocation"), - EnvVars: make(server.EnvironmentVariables), Build: server.BuildSettings{ MemoryLimit: getInt(data, "build", "memory"), Swap: getInt(data, "build", "swap"), @@ -43,20 +41,18 @@ func New(data []byte) (*Installer, error) { DiskSpace: getInt(data, "build", "disk"), Threads: getString(data, "build", "threads"), }, - Allocations: server.Allocations{ - Mappings: make(map[string][]int), - }, + CrashDetectionEnabled: true, } - s.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip") - s.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port")) + cfg.Allocations.DefaultMapping.Ip = getString(data, "allocations", "default", "ip") + cfg.Allocations.DefaultMapping.Port = int(getInt(data, "allocations", "default", "port")) // Unmarshal the environment variables from the request into the server struct. if b, _, _, err := jsonparser.Get(data, "environment"); err != nil { return nil, errors.WithStack(err) } else { - s.EnvVars = make(server.EnvironmentVariables) - if err := json.Unmarshal(b, &s.EnvVars); err != nil { + cfg.EnvVars = make(server.EnvironmentVariables) + if err := json.Unmarshal(b, &cfg.EnvVars); err != nil { return nil, errors.WithStack(err) } } @@ -65,15 +61,15 @@ func New(data []byte) (*Installer, error) { if b, _, _, err := jsonparser.Get(data, "allocations", "mappings"); err != nil { return nil, errors.WithStack(err) } else { - s.Allocations.Mappings = make(map[string][]int) - if err := json.Unmarshal(b, &s.Allocations.Mappings); err != nil { + cfg.Allocations.Mappings = make(map[string][]int) + if err := json.Unmarshal(b, &cfg.Allocations.Mappings); err != nil { return nil, errors.WithStack(err) } } - s.Container.Image = getString(data, "container", "image") + cfg.Container.Image = getString(data, "container", "image") - c, rerr, err := api.NewRequester().GetServerConfiguration(s.Uuid) + c, rerr, err := api.NewRequester().GetServerConfiguration(cfg.Uuid) if err != nil || rerr != nil { if err != nil { return nil, errors.WithStack(err) @@ -82,21 +78,18 @@ func New(data []byte) (*Installer, error) { return nil, errors.New(rerr.String()) } - // Destroy the temporary server instance. - s = nil - // Create a new server instance using the configuration we wrote to the disk // so that everything gets instantiated correctly on the struct. - s2, err := server.FromConfiguration(c) + s, err := server.FromConfiguration(c) return &Installer{ - server: s2, + server: s, }, err } // Returns the UUID associated with this installer instance. func (i *Installer) Uuid() string { - return i.server.Uuid + return i.server.Id() } // Return the server instance. diff --git a/loggers/cli/cli.go b/loggers/cli/cli.go index bb673b0..af71c31 100644 --- a/loggers/cli/cli.go +++ b/loggers/cli/cli.go @@ -81,7 +81,7 @@ func (h *Handler) HandleLog(e *log.Entry) error { } func getErrorStack(err error, i bool) errors.StackTrace { - e, ok := errors.Cause(err).(tracer) + e, ok := err.(tracer) if !ok { if i { // Just abort out of this and return a stacktrace leading up to this point. It isn't perfect @@ -89,7 +89,7 @@ func getErrorStack(err error, i bool) errors.StackTrace { return errors.Wrap(err, "failed to generate stacktrace for caught error").(tracer).StackTrace() } - return getErrorStack(errors.New(err.Error()), true) + return getErrorStack(errors.Wrap(err, err.Error()), true) } st := e.StackTrace() diff --git a/router/error.go b/router/error.go index 5755b7f..0ad58b8 100644 --- a/router/error.go +++ b/router/error.go @@ -33,7 +33,7 @@ func TrackedError(err error) *RequestError { // generated this server for the purposes of logging. func TrackedServerError(err error, s *server.Server) *RequestError { return &RequestError{ - Err: err, + Err: errors.WithStack(err), Uuid: uuid.Must(uuid.NewRandom()).String(), Message: "", server: s, diff --git a/router/middleware.go b/router/middleware.go index 45c50f8..102d1aa 100644 --- a/router/middleware.go +++ b/router/middleware.go @@ -48,7 +48,7 @@ func AuthorizationMiddleware(c *gin.Context) { // Helper function to fetch a server out of the servers collection stored in memory. func GetServer(uuid string) *server.Server { return server.GetServers().Find(func(s *server.Server) bool { - return uuid == s.Uuid + return uuid == s.Id() }) } diff --git a/router/router.go b/router/router.go index 8c534ed..ed3a5a9 100644 --- a/router/router.go +++ b/router/router.go @@ -85,6 +85,7 @@ func Configure() *gin.Engine { files.POST("/create-directory", postServerCreateDirectory) files.POST("/delete", postServerDeleteFiles) files.POST("/compress", postServerCompressFiles) + files.POST("/decompress", postServerDecompressFiles) } backup := server.Group("/backup") diff --git a/router/router_server.go b/router/router_server.go index 87c0b67..6af62d0 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -13,7 +13,9 @@ import ( // Returns a single server from the collection of servers. func getServer(c *gin.Context) { - c.JSON(http.StatusOK, GetServer(c.Param("server"))) + s := GetServer(c.Param("server")) + + c.JSON(http.StatusOK, s.Proc()) } // Returns the logs for a given server instance. @@ -64,7 +66,7 @@ func postServerPower(c *gin.Context) { // // We don't really care about any of the other actions at this point, they'll all result // in the process being stopped, which should have happened anyways if the server is suspended. - if (data.Action == "start" || data.Action == "restart") && s.Suspended { + if (data.Action == "start" || data.Action == "restart") && s.IsSuspended() { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "Cannot start or restart a server that is suspended.", }) @@ -162,7 +164,7 @@ func deleteServer(c *gin.Context) { // Immediately suspend the server to prevent a user from attempting // to start it while this process is running. - s.Suspended = true + s.Config().SetSuspended(true) // If the server is currently installing, abort it. if s.IsInstalling() { @@ -200,9 +202,9 @@ func deleteServer(c *gin.Context) { } }(s.Filesystem.Path()) - var uuid = s.Uuid + var uuid = s.Id() server.GetServers().Remove(func(s2 *server.Server) bool { - return s2.Uuid == uuid + return s2.Id() == uuid }) // Deallocate the reference to this server. diff --git a/router/router_server_files.go b/router/router_server_files.go index c1ac534..311703a 100644 --- a/router/router_server_files.go +++ b/router/router_server_files.go @@ -3,7 +3,6 @@ package router import ( "bufio" "context" - "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/pkg/errors" "github.com/pterodactyl/wings/router/tokens" @@ -83,6 +82,13 @@ func getServerListDirectory(c *gin.Context) { stats, err := s.Filesystem.ListDirectory(d) if err != nil { + if err.Error() == "readdirent: not a directory" { + c.AbortWithStatusJSON(http.StatusNotFound, gin.H{ + "error": "The requested directory does not exist.", + }) + return + } + TrackedServerError(err, s).AbortWithServerError(c) return } @@ -175,7 +181,7 @@ func postServerDeleteFiles(c *gin.Context) { if len(data.Files) == 0 { c.AbortWithStatusJSON(http.StatusUnprocessableEntity, gin.H{ - "error": "No files were specified for deletion.", + "error": "No files were specififed for deletion.", }) return } @@ -283,6 +289,39 @@ func postServerCompressFiles(c *gin.Context) { }) } +func postServerDecompressFiles(c *gin.Context) { + s := GetServer(c.Param("server")) + + var data struct { + RootPath string `json:"root"` + File string `json:"file"` + } + + if err := c.BindJSON(&data); err != nil { + return + } + + hasSpace, err := s.Filesystem.SpaceAvailableForDecompression(data.RootPath, data.File) + if err != nil { + TrackedServerError(err, s).AbortWithServerError(c) + return + } + + if !hasSpace { + c.AbortWithStatusJSON(http.StatusConflict, gin.H{ + "error": "This server does not have enough available disk space to decompress this archive.", + }) + return + } + + if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil { + TrackedServerError(err, s).AbortWithServerError(c) + return + } + + c.Status(http.StatusNoContent) +} + func postServerUploadFiles(c *gin.Context) { token := tokens.UploadPayload{} if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil { @@ -313,10 +352,6 @@ func postServerUploadFiles(c *gin.Context) { return } - for i := range form.File { - log.Debug(i) - } - headers, ok := form.File["files"] if !ok { c.AbortWithStatusJSON(http.StatusNotModified, gin.H{ @@ -335,7 +370,6 @@ func postServerUploadFiles(c *gin.Context) { c.AbortWithError(http.StatusInternalServerError, err) return } - log.Debug(p) // We run this in a different method so I can use defer without any of // the consequences caused by calling it in a loop. diff --git a/router/router_server_ws.go b/router/router_server_ws.go index 4f60f93..f630240 100644 --- a/router/router_server_ws.go +++ b/router/router_server_ws.go @@ -51,8 +51,10 @@ func getServerWebsocket(c *gin.Context) { continue } - if err := handler.HandleInbound(j); err != nil { - handler.SendErrorJson(j, err) - } + go func(msg websocket.Message) { + if err := handler.HandleInbound(msg); err != nil { + handler.SendErrorJson(msg, err) + } + }(j) } } diff --git a/router/router_transfer.go b/router/router_transfer.go index 0a1778f..cf3bece 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -98,33 +98,33 @@ func postServerArchive(c *gin.Context) { start := time.Now() if err := server.Archiver.Archive(); err != nil { - zap.S().Errorw("failed to get archive for server", zap.String("server", s.Uuid), zap.Error(err)) + zap.S().Errorw("failed to get archive for server", zap.String("server", server.Id()), zap.Error(err)) return } zap.S().Debugw( "successfully created archive for server", - zap.String("server", server.Uuid), + zap.String("server", server.Id()), zap.Duration("time", time.Now().Sub(start).Round(time.Microsecond)), ) r := api.NewRequester() - rerr, err := r.SendArchiveStatus(server.Uuid, true) + rerr, err := r.SendArchiveStatus(server.Id(), true) if rerr != nil || err != nil { if err != nil { - zap.S().Errorw("failed to notify panel with archive status", zap.String("server", server.Uuid), zap.Error(err)) + zap.S().Errorw("failed to notify panel with archive status", zap.String("server", server.Id()), zap.Error(err)) return } zap.S().Errorw( "panel returned an error when sending the archive status", - zap.String("server", server.Uuid), + zap.String("server", server.Id()), zap.Error(errors.New(rerr.String())), ) return } - zap.S().Debugw("successfully notified panel about archive status", zap.String("server", server.Uuid)) + zap.S().Debugw("successfully notified panel about archive status", zap.String("server", server.Id())) }(s) c.Status(http.StatusAccepted) diff --git a/router/tokens/websocket.go b/router/tokens/websocket.go index cc03306..d9d710f 100644 --- a/router/tokens/websocket.go +++ b/router/tokens/websocket.go @@ -4,10 +4,13 @@ import ( "encoding/json" "github.com/gbrlsnchs/jwt/v3" "strings" + "sync" ) type WebsocketPayload struct { jwt.Payload + sync.RWMutex + UserID json.Number `json:"user_id"` ServerUUID string `json:"server_uuid"` Permissions []string `json:"permissions"` @@ -15,11 +18,24 @@ type WebsocketPayload struct { // Returns the JWT payload. func (p *WebsocketPayload) GetPayload() *jwt.Payload { + p.RLock() + defer p.RUnlock() + return &p.Payload } +func (p *WebsocketPayload) GetServerUuid() string { + p.RLock() + defer p.RUnlock() + + return p.ServerUUID +} + // Checks if the given token payload has a permission string. func (p *WebsocketPayload) HasPermission(permission string) bool { + p.RLock() + defer p.RUnlock() + for _, k := range p.Permissions { if k == permission || (!strings.HasPrefix(permission, "admin") && k == "*") { return true diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index bf872c6..b1a99ac 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -43,6 +43,8 @@ func (h *Handler) ListenForServerEvents(ctx context.Context) { server.StatusEvent, server.ConsoleOutputEvent, server.InstallOutputEvent, + server.InstallStartedEvent, + server.InstallCompletedEvent, server.DaemonMessageEvent, server.BackupCompletedEvent, } @@ -52,16 +54,15 @@ func (h *Handler) ListenForServerEvents(ctx context.Context) { h.server.Events().Subscribe(event, eventChannel) } - select { - case <-ctx.Done(): - for _, event := range events { - h.server.Events().Unsubscribe(event, eventChannel) - } + for d := range eventChannel { + select { + case <-ctx.Done(): + for _, event := range events { + h.server.Events().Unsubscribe(event, eventChannel) + } - close(eventChannel) - default: - // Listen for different events emitted by the server and respond to them appropriately. - for d := range eventChannel { + close(eventChannel) + default: h.SendJson(&Message{ Event: d.Topic, Args: []string{d.Data}, diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index 7a0507d..5b0cf3c 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -127,7 +127,7 @@ func (h *Handler) TokenValid() error { return errors.New("jwt does not have connect permission") } - if h.server.Uuid != j.ServerUUID { + if h.server.Id() != j.GetServerUuid() { return errors.New("jwt server uuid mismatch") } @@ -247,16 +247,7 @@ func (h *Handler) HandleInbound(m Message) error { if state == server.ProcessOfflineState { _ = h.server.Filesystem.HasSpaceAvailable() - resources := server.ResourceUsage{ - Memory: 0, - MemoryLimit: 0, - CpuAbsolute: 0.0, - Disk: h.server.Resources.Disk, - } - resources.Network.RxBytes = 0 - resources.Network.TxBytes = 0 - - b, _ := json.Marshal(resources) + b, _ := json.Marshal(h.server.Proc()) h.SendJson(&Message{ Event: server.StatsEvent, Args: []string{string(b)}, @@ -280,11 +271,14 @@ func (h *Handler) HandleInbound(m Message) error { break case "restart": if h.GetJwt().HasPermission(PermissionSendPowerRestart) { - if err := h.server.Environment.WaitForStop(60, false); err != nil { - return err + // If the server is alreay restarting don't do anything. Perhaps we send back an event + // in the future for this? For now no reason to knowingly trigger an error by trying to + // restart a process already restarting. + if h.server.Environment.IsRestarting() { + return nil } - return h.server.Environment.Start() + return h.server.Environment.Restart() } break case "kill": diff --git a/server/allocations.go b/server/allocations.go new file mode 100644 index 0000000..ac51a0d --- /dev/null +++ b/server/allocations.go @@ -0,0 +1,17 @@ +package server + +// Defines the allocations available for a given server. When using the Docker environment +// driver these correspond to mappings for the container that allow external connections. +type Allocations struct { + // Defines the default allocation that should be used for this server. This is + // what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration + // files or the startup arguments for a server. + DefaultMapping struct { + Ip string `json:"ip"` + Port int `json:"port"` + } `json:"default"` + + // Mappings contains all of the ports that should be assigned to a given server + // attached to the IP they correspond to. + Mappings map[string][]int `json:"mappings"` +} \ No newline at end of file diff --git a/server/archiver.go b/server/archiver.go index 5c12976..d177d76 100644 --- a/server/archiver.go +++ b/server/archiver.go @@ -23,7 +23,7 @@ func (a *Archiver) ArchivePath() string { // ArchiveName returns the name of the server's archive. func (a *Archiver) ArchiveName() string { - return a.Server.Uuid + ".tar.gz" + return a.Server.Id() + ".tar.gz" } // Exists returns a boolean based off if the archive exists. @@ -52,7 +52,12 @@ func (a *Archiver) Archive() error { } for _, file := range fileInfo { - files = append(files, filepath.Join(path, file.Name())) + f, err := a.Server.Filesystem.SafeJoin(path, file) + if err != nil { + return err + } + + files = append(files, f) } stat, err := a.Stat() diff --git a/server/backup/archiver.go b/server/backup/archiver.go index e2c1ec3..5330e6f 100644 --- a/server/backup/archiver.go +++ b/server/backup/archiver.go @@ -20,9 +20,9 @@ type Archive struct { Files *IncludedFiles } -// Creates an archive at dest with all of the files definied in the included files struct. -func (a *Archive) Create(dest string, ctx context.Context) (os.FileInfo, error) { - f, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) +// Creates an archive at dst with all of the files defined in the included files struct. +func (a *Archive) Create(dst string, ctx context.Context) (os.FileInfo, error) { + f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return nil, err } @@ -66,14 +66,17 @@ func (a *Archive) Create(dest string, ctx context.Context) (os.FileInfo, error) // Attempt to remove the archive if there is an error, report that error to // the logger if it fails. - if rerr := os.Remove(dest); rerr != nil && !os.IsNotExist(rerr) { - log.WithField("location", dest).Warn("failed to delete corrupted backup archive") + if rerr := os.Remove(dst); rerr != nil && !os.IsNotExist(rerr) { + log.WithField("location", dst).Warn("failed to delete corrupted backup archive") } return nil, err } - st, _ := f.Stat() + st, err := f.Stat() + if err != nil { + return nil, err + } return st, nil } @@ -101,7 +104,7 @@ func (a *Archive) addToArchive(p string, s *os.FileInfo, w *tar.Writer) error { a.Lock() defer a.Unlock() - if err = w.WriteHeader(header); err != nil { + if err := w.WriteHeader(header); err != nil { return err } diff --git a/server/build_settings.go b/server/build_settings.go new file mode 100644 index 0000000..357aecf --- /dev/null +++ b/server/build_settings.go @@ -0,0 +1,72 @@ +package server + +import "math" + +// The build settings for a given server that impact docker container creation and +// resource limits for a server instance. +type BuildSettings struct { + // The total amount of memory in megabytes that this server is allowed to + // use on the host system. + MemoryLimit int64 `json:"memory_limit"` + + // The amount of additional swap space to be provided to a container instance. + Swap int64 `json:"swap"` + + // The relative weight for IO operations in a container. This is relative to other + // containers on the system and should be a value between 10 and 1000. + IoWeight uint16 `json:"io_weight"` + + // The percentage of CPU that this instance is allowed to consume relative to + // the host. A value of 200% represents complete utilization of two cores. This + // should be a value between 1 and THREAD_COUNT * 100. + CpuLimit int64 `json:"cpu_limit"` + + // The amount of disk space in megabytes that a server is allowed to use. + DiskSpace int64 `json:"disk_space"` + + // Sets which CPU threads can be used by the docker instance. + Threads string `json:"threads"` +} + +func (s *Server) Build() *BuildSettings { + return &s.Config().Build +} + +// Converts the CPU limit for a server build into a number that can be better understood +// by the Docker environment. If there is no limit set, return -1 which will indicate to +// Docker that it has unlimited CPU quota. +func (b *BuildSettings) ConvertedCpuLimit() int64 { + if b.CpuLimit == 0 { + return -1 + } + + return b.CpuLimit * 1000 +} + +// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to +// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use +// 15%. This avoids unexpected crashes from processes like Java which run over the limit. +func (b *BuildSettings) MemoryOverheadMultiplier() float64 { + if b.MemoryLimit <= 2048 { + return 1.15 + } else if b.MemoryLimit <= 4096 { + return 1.10 + } + + return 1.05 +} + +func (b *BuildSettings) BoundedMemoryLimit() int64 { + return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000)) +} + +// Returns the amount of swap available as a total in bytes. This is returned as the amount +// of memory available to the server initially, PLUS the amount of additional swap to include +// which is the format used by Docker. +func (b *BuildSettings) ConvertedSwap() int64 { + if b.Swap < 0 { + return -1 + } + + return (b.Swap * 1_000_000) + b.BoundedMemoryLimit() +} diff --git a/server/config_parser.go b/server/config_parser.go index a0f1f11..5b64099 100644 --- a/server/config_parser.go +++ b/server/config_parser.go @@ -10,7 +10,8 @@ import ( func (s *Server) UpdateConfigurationFiles() { wg := new(sync.WaitGroup) - for _, v := range s.processConfiguration.ConfigurationFiles { + files := s.ProcessConfiguration().ConfigurationFiles + for _, v := range files { wg.Add(1) go func(f parser.ConfigurationFile, server *Server) { diff --git a/server/configuration.go b/server/configuration.go new file mode 100644 index 0000000..11ac4bb --- /dev/null +++ b/server/configuration.go @@ -0,0 +1,91 @@ +package server + +import ( + "fmt" + "strconv" + "sync" +) + +type EnvironmentVariables map[string]interface{} + +// Ugly hacky function to handle environment variables that get passed through as not-a-string +// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a +// string wasn't passed through you'd cause a crash or the server to become unavailable. For now +// try to handle the most likely values from the JSON and hope for the best. +func (ev EnvironmentVariables) Get(key string) string { + val, ok := ev[key] + if !ok { + return "" + } + + switch val.(type) { + case int: + return strconv.Itoa(val.(int)) + case int32: + return strconv.FormatInt(val.(int64), 10) + case int64: + return strconv.FormatInt(val.(int64), 10) + case float32: + return fmt.Sprintf("%f", val.(float32)) + case float64: + return fmt.Sprintf("%f", val.(float64)) + case bool: + return strconv.FormatBool(val.(bool)) + } + + return val.(string) +} + +type Configuration struct { + mu sync.RWMutex + + // The unique identifier for the server that should be used when referencing + // it against the Panel API (and internally). This will be used when naming + // docker containers as well as in log output. + Uuid string `json:"uuid"` + + // Whether or not the server is in a suspended state. Suspended servers cannot + // be started or modified except in certain scenarios by an admin user. + Suspended bool `json:"suspended"` + + // The command that should be used when booting up the server instance. + Invocation string `json:"invocation"` + + // An array of environment variables that should be passed along to the running + // server process. + EnvVars EnvironmentVariables `json:"environment"` + + Allocations Allocations `json:"allocations"` + Build BuildSettings `json:"build"` + CrashDetectionEnabled bool `default:"true" json:"enabled" yaml:"enabled"` + Mounts []Mount `json:"mounts"` + Resources ResourceUsage `json:"resources"` + + Container struct { + // Defines the Docker image that will be used for this server + Image string `json:"image,omitempty"` + // If set to true, OOM killer will be disabled on the server's Docker container. + // If not present (nil) we will default to disabling it. + OomDisabled bool `default:"true" json:"oom_disabled"` + } `json:"container,omitempty"` +} + +func (s *Server) Config() *Configuration { + s.cfg.mu.RLock() + defer s.cfg.mu.RUnlock() + + return &s.cfg +} + +func (c *Configuration) GetUuid() string { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.Uuid +} + +func (c *Configuration) SetSuspended(s bool) { + c.mu.Lock() + c.Suspended = s + c.mu.Unlock() +} diff --git a/server/crash.go b/server/crash.go index 649580f..3eb2ba4 100644 --- a/server/crash.go +++ b/server/crash.go @@ -4,18 +4,32 @@ import ( "fmt" "github.com/pkg/errors" "github.com/pterodactyl/wings/config" + "sync" "time" ) -type CrashDetection struct { - // If set to false, the system will not listen for crash detection events that - // can indicate that the server stopped unexpectedly. - Enabled bool `default:"true" json:"enabled" yaml:"enabled"` +type CrashHandler struct { + mu sync.RWMutex // Tracks the time of the last server crash event. lastCrash time.Time } +// Returns the time of the last crash for this server instance. +func (cd *CrashHandler) LastCrashTime() time.Time { + cd.mu.RLock() + defer cd.mu.RUnlock() + + return cd.lastCrash +} + +// Sets the last crash time for a server. +func (cd *CrashHandler) SetLastCrash(t time.Time) { + cd.mu.Lock() + cd.lastCrash = t + cd.mu.Unlock() +} + // Looks at the environment exit state to determine if the process exited cleanly or // if it was the result of an event that we should try to recover from. // @@ -30,8 +44,8 @@ func (s *Server) handleServerCrash() error { // No point in doing anything here if the server isn't currently offline, there // is no reason to do a crash detection event. If the server crash detection is // disabled we want to skip anything after this as well. - if s.GetState() != ProcessOfflineState || !s.CrashDetection.Enabled { - if !s.CrashDetection.Enabled { + if s.GetState() != ProcessOfflineState || !s.Config().CrashDetectionEnabled { + if !s.Config().CrashDetectionEnabled { s.Log().Debug("server triggered crash detection but handler is disabled for server process") s.PublishConsoleOutputFromDaemon("Server detected as crashed; crash detection is disabled for this instance.") @@ -57,7 +71,7 @@ func (s *Server) handleServerCrash() error { s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Exit code: %d", exitCode)) s.PublishConsoleOutputFromDaemon(fmt.Sprintf("Out of memory: %t", oomKilled)) - c := s.CrashDetection.lastCrash + c := s.crasher.LastCrashTime() // If the last crash time was within the last 60 seconds we do not want to perform // an automatic reboot of the process. Return an error that can be handled. if !c.IsZero() && c.Add(time.Second * 60).After(time.Now()) { @@ -66,7 +80,7 @@ func (s *Server) handleServerCrash() error { return &crashTooFrequent{} } - s.CrashDetection.lastCrash = time.Now() + s.crasher.SetLastCrash(time.Now()) return s.Environment.Start() } \ No newline at end of file diff --git a/server/environment.go b/server/environment.go index 4573fdd..ce6eed0 100644 --- a/server/environment.go +++ b/server/environment.go @@ -31,6 +31,13 @@ type Environment interface { // not be returned. Stop() error + // Restart a server instance. If already stopped the process will be started. This function + // will return an error if the server is already performing a restart process as to avoid + // unnecessary double/triple/quad looping issues if multiple people press restart or spam the + // button to restart. + Restart() error + IsRestarting() bool + // Waits for a server instance to stop gracefully. If the server is still detected // as running after seconds, an error will be returned, or the server will be terminated // depending on the value of the second argument. diff --git a/server/environment_docker.go b/server/environment_docker.go index 3fc30fa..fd2dda6 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -16,16 +16,20 @@ import ( "github.com/pkg/errors" "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" + "golang.org/x/sync/semaphore" "io" "os" "path/filepath" "strconv" "strings" + "sync" "time" ) // Defines the base environment for Docker instances running through Wings. type DockerEnvironment struct { + sync.RWMutex + Server *Server // The Docker client being used for this instance. @@ -43,6 +47,25 @@ type DockerEnvironment struct { // Holds the stats stream used by the polling commands so that we can easily close // it out. stats io.ReadCloser + + // Locks when we're performing a restart to avoid trying to restart a process that is already + // being restarted. + restartSem *semaphore.Weighted +} + +// Set if this process is currently attached to the process. +func (d *DockerEnvironment) SetAttached(a bool) { + d.Lock() + d.attached = a + d.Unlock() +} + +// Determine if the this process is currently attached to the container. +func (d *DockerEnvironment) IsAttached() bool { + d.RLock() + defer d.RUnlock() + + return d.attached } // Creates a new base Docker environment. A server must still be attached to it. @@ -71,7 +94,7 @@ func (d *DockerEnvironment) Type() string { // Determines if the container exists in this environment. func (d *DockerEnvironment) Exists() (bool, error) { - _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) + _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()) if err != nil { // If this error is because the container instance wasn't found via Docker we @@ -95,7 +118,7 @@ func (d *DockerEnvironment) Exists() (bool, error) { // // @see docker/client/errors.go func (d *DockerEnvironment) IsRunning() (bool, error) { - c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) + c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()) if err != nil { return false, err } @@ -107,7 +130,7 @@ func (d *DockerEnvironment) IsRunning() (bool, error) { // making any changes to the operational state of the container. This allows memory, cpu, // and IO limitations to be adjusted on the fly for individual instances. func (d *DockerEnvironment) InSituUpdate() error { - if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil { + if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil { // If the container doesn't exist for some reason there really isn't anything // we can do to fix that in this process (it doesn't make sense at least). In those // cases just return without doing anything since we still want to save the configuration @@ -129,7 +152,7 @@ func (d *DockerEnvironment) InSituUpdate() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - if _, err := d.Client.ContainerUpdate(ctx, d.Server.Uuid, u); err != nil { + if _, err := d.Client.ContainerUpdate(ctx, d.Server.Id(), u); err != nil { return errors.WithStack(err) } @@ -155,7 +178,7 @@ func (d *DockerEnvironment) OnBeforeStart() error { // Always destroy and re-create the server container to ensure that synced data from // the Panel is used. - if err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil { + if err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil { if !client.IsErrNotFound(err) { return err } @@ -199,11 +222,11 @@ func (d *DockerEnvironment) Start() error { // Theoretically you'd have the Panel handle all of this logic, but we cannot do that // because we allow the websocket to control the server power state as well, so we'll // need to handle that action in here. - if d.Server.Suspended { + if d.Server.IsSuspended() { return &suspendedError{} } - if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err != nil { + if c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err != nil { // Do nothing if the container is not found, we just don't want to continue // to the next block of code here. This check was inlined here to guard againt // a nil-pointer when checking c.State below. @@ -258,7 +281,7 @@ func (d *DockerEnvironment) Start() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - if err := d.Client.ContainerStart(ctx, d.Server.Uuid, types.ContainerStartOptions{}); err != nil { + if err := d.Client.ContainerStart(ctx, d.Server.Id(), types.ContainerStartOptions{}); err != nil { return errors.WithStack(err) } @@ -271,19 +294,89 @@ func (d *DockerEnvironment) Start() error { // Stops the container that the server is running in. This will allow up to 10 // seconds to pass before a failure occurs. func (d *DockerEnvironment) Stop() error { - stop := d.Server.processConfiguration.Stop + stop := d.Server.ProcessConfiguration().Stop if stop.Type == api.ProcessStopSignal { return d.Terminate(os.Kill) } d.Server.SetState(ProcessStoppingState) - if stop.Type == api.ProcessStopCommand { + // Only attempt to send the stop command to the instance if we are actually attached to + // the instance. If we are not for some reason, just send the container stop event. + if d.IsAttached() && stop.Type == api.ProcessStopCommand { return d.SendCommand(stop.Value) } t := time.Second * 10 - return d.Client.ContainerStop(context.Background(), d.Server.Uuid, &t) + err := d.Client.ContainerStop(context.Background(), d.Server.Id(), &t) + if err != nil { + // If the container does not exist just mark the process as stopped and return without + // an error. + if client.IsErrNotFound(err) { + d.SetAttached(false) + d.Server.SetState(ProcessOfflineState) + + return nil + } + + return err + } + + return nil +} + +// Try to acquire a lock to restart the server. If one cannot be obtained within 5 seconds return +// an error to the caller. You should ideally be checking IsRestarting() before calling this function +// to avoid unnecessary delays since you can respond immediately from that. +func (d *DockerEnvironment) acquireRestartLock() error { + if d.restartSem == nil { + d.restartSem = semaphore.NewWeighted(1) + } + + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + + return d.restartSem.Acquire(ctx, 1) +} + +// Restarts the server process by waiting for the process to gracefully stop and then triggering a +// start command. This will return an error if there is already a restart process executing for the +// server. The lock is released when the process is stopped and a start has begun. +func (d *DockerEnvironment) Restart() error { + d.Server.Log().Debug("attempting to acquire restart lock...") + if err := d.acquireRestartLock(); err != nil { + d.Server.Log().Warn("failed to acquire restart lock; already acquired by a different process") + return err + } + + d.Server.Log().Debug("acquired restart lock") + + err := d.WaitForStop(60, false) + if err != nil { + d.restartSem.Release(1) + return err + } + + // Release the restart lock, it is now safe for someone to attempt restarting the server again. + d.restartSem.Release(1) + + // Start the process. + return d.Start() +} + +// Check if the server is currently running the restart process by checking if there is a semaphore +// allocated, and if so, if we can aquire a lock on it. +func (d *DockerEnvironment) IsRestarting() bool { + if d.restartSem == nil { + return false + } + + if d.restartSem.TryAcquire(1) { + d.restartSem.Release(1) + + return false + } + + return true } // Attempts to gracefully stop a server using the defined stop command. If the server @@ -304,7 +397,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error { // Block the return of this function until the container as been marked as no // longer running. If this wait does not end by the time seconds have passed, // attempt to terminate the container, or return an error. - ok, errChan := d.Client.ContainerWait(ctx, d.Server.Uuid, container.WaitConditionNotRunning) + ok, errChan := d.Client.ContainerWait(ctx, d.Server.Id(), container.WaitConditionNotRunning) select { case <-ctx.Done(): if ctxErr := ctx.Err(); ctxErr != nil { @@ -326,7 +419,7 @@ func (d *DockerEnvironment) WaitForStop(seconds int, terminate bool) error { // Forcefully terminates the container using the signal passed through. func (d *DockerEnvironment) Terminate(signal os.Signal) error { - c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) + c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()) if err != nil { return errors.WithStack(err) } @@ -338,7 +431,7 @@ func (d *DockerEnvironment) Terminate(signal os.Signal) error { d.Server.SetState(ProcessStoppingState) return d.Client.ContainerKill( - context.Background(), d.Server.Uuid, strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"), + context.Background(), d.Server.Id(), strings.TrimSuffix(strings.TrimPrefix(signal.String(), "signal "), "ed"), ) } @@ -348,7 +441,7 @@ func (d *DockerEnvironment) Destroy() error { // Avoid crash detection firing off. d.Server.SetState(ProcessStoppingState) - err := d.Client.ContainerRemove(context.Background(), d.Server.Uuid, types.ContainerRemoveOptions{ + err := d.Client.ContainerRemove(context.Background(), d.Server.Id(), types.ContainerRemoveOptions{ RemoveVolumes: true, RemoveLinks: false, Force: true, @@ -368,7 +461,7 @@ func (d *DockerEnvironment) Destroy() error { // Determine the container exit state and return the exit code and wether or not // the container was killed by the OOM killer. func (d *DockerEnvironment) ExitState() (uint32, bool, error) { - c, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) + c, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()) if err != nil { // I'm not entirely sure how this can happen to be honest. I tried deleting a // container _while_ a server was running and wings gracefully saw the crash and @@ -394,7 +487,7 @@ func (d *DockerEnvironment) ExitState() (uint32, bool, error) { // miss important output at the beginning because of the time delay with attaching to the // output. func (d *DockerEnvironment) Attach() error { - if d.attached { + if d.IsAttached() { return nil } @@ -403,7 +496,7 @@ func (d *DockerEnvironment) Attach() error { } var err error - d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Uuid, types.ContainerAttachOptions{ + d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{ Stdin: true, Stdout: true, Stderr: true, @@ -418,7 +511,7 @@ func (d *DockerEnvironment) Attach() error { Server: d.Server, } - d.attached = true + d.SetAttached(true) go func() { if err := d.EnableResourcePolling(); err != nil { d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server") @@ -429,7 +522,7 @@ func (d *DockerEnvironment) Attach() error { defer d.stream.Close() defer func() { d.Server.SetState(ProcessOfflineState) - d.attached = false + d.SetAttached(false) }() io.Copy(console, d.stream.Reader) @@ -447,7 +540,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { return errors.WithStack(err) } - return errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid)) + return errors.New(fmt.Sprintf("no such container: %s", d.Server.Id())) } opts := types.ContainerLogsOptions{ @@ -457,7 +550,7 @@ func (d *DockerEnvironment) FollowConsoleOutput() error { Since: time.Now().Format(time.RFC3339), } - reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Uuid, opts) + reader, err := d.Client.ContainerLogs(context.Background(), d.Server.Id(), opts) go func(r io.ReadCloser) { defer r.Close() @@ -483,7 +576,7 @@ func (d *DockerEnvironment) EnableResourcePolling() error { return errors.New("cannot enable resource polling on a server that is not running") } - stats, err := d.Client.ContainerStats(context.Background(), d.Server.Uuid, true) + stats, err := d.Client.ContainerStats(context.Background(), d.Server.Id(), true) if err != nil { return errors.WithStack(err) } @@ -510,20 +603,16 @@ func (d *DockerEnvironment) EnableResourcePolling() error { return } - s.Resources.CpuAbsolute = s.Resources.CalculateAbsoluteCpu(&v.PreCPUStats, &v.CPUStats) - s.Resources.Memory = s.Resources.CalculateDockerMemory(v.MemoryStats) - s.Resources.MemoryLimit = v.MemoryStats.Limit + s.Proc().UpdateFromDocker(v) + for _, nw := range v.Networks { + s.Proc().UpdateNetworkBytes(&nw) + } // Why you ask? This already has the logic for caching disk space in use and then // also handles pushing that value to the resources object automatically. s.Filesystem.HasSpaceAvailable() - for _, nw := range v.Networks { - s.Resources.Network.RxBytes += nw.RxBytes - s.Resources.Network.TxBytes += nw.TxBytes - } - - b, _ := json.Marshal(s.Resources) + b, _ := json.Marshal(s.Proc()) s.Events().Publish(StatsEvent, string(b)) } }(d.Server) @@ -538,15 +627,16 @@ func (d *DockerEnvironment) DisableResourcePolling() error { } err := d.stats.Close() - - d.Server.Resources.CpuAbsolute = 0 - d.Server.Resources.Memory = 0 - d.Server.Resources.Network.TxBytes = 0 - d.Server.Resources.Network.RxBytes = 0 + d.Server.Proc().Empty() return errors.WithStack(err) } +// Returns the image to be used for the instance. +func (d *DockerEnvironment) Image() string { + return d.Server.Config().Container.Image +} + // Pulls the image from Docker. If there is an error while pulling the image from the source // but the image already exists locally, we will report that error to the logger but continue // with the process. @@ -564,7 +654,7 @@ func (d *DockerEnvironment) ensureImageExists() error { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) defer cancel() - out, err := d.Client.ImagePull(ctx, d.Server.Container.Image, types.ImagePullOptions{All: false}) + out, err := d.Client.ImagePull(ctx, d.Image(), types.ImagePullOptions{All: false}) if err != nil { images, ierr := d.Client.ImageList(ctx, types.ImageListOptions{}) if ierr != nil { @@ -575,12 +665,12 @@ func (d *DockerEnvironment) ensureImageExists() error { for _, img := range images { for _, t := range img.RepoTags { - if t != d.Server.Container.Image { + if t != d.Image() { continue } d.Server.Log().WithFields(log.Fields{ - "image": d.Server.Container.Image, + "image": d.Image(), "error": errors.New(err.Error()), }).Warn("unable to pull requested image from remote source, however the image exists locally") @@ -594,7 +684,7 @@ func (d *DockerEnvironment) ensureImageExists() error { } defer out.Close() - log.WithField("image", d.Server.Container.Image).Debug("pulling docker image... this could take a bit of time") + log.WithField("image", d.Image()).Debug("pulling docker image... this could take a bit of time") // I'm not sure what the best approach here is, but this will block execution until the image // is done being pulled, which is what we need. @@ -621,7 +711,7 @@ func (d *DockerEnvironment) Create() error { // If the container already exists don't hit the user with an error, just return // the current information about it which is what we would do when creating the // container anyways. - if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid); err == nil { + if _, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()); err == nil { return nil } else if !client.IsErrNotFound(err) { return errors.WithStack(err) @@ -633,7 +723,7 @@ func (d *DockerEnvironment) Create() error { } conf := &container.Config{ - Hostname: d.Server.Uuid, + Hostname: d.Server.Id(), Domainname: config.Get().Docker.Domainname, User: strconv.Itoa(config.Get().System.User.Uid), AttachStdin: true, @@ -641,12 +731,9 @@ func (d *DockerEnvironment) Create() error { AttachStderr: true, OpenStdin: true, Tty: true, - ExposedPorts: d.exposedPorts(), - - Image: d.Server.Container.Image, - Env: d.Server.GetEnvironmentVariables(), - + Image: d.Image(), + Env: d.Server.GetEnvironmentVariables(), Labels: map[string]string{ "Service": "Pterodactyl", "ContainerType": "server_process", @@ -663,7 +750,7 @@ func (d *DockerEnvironment) Create() error { } var mounted bool - for _, m := range d.Server.Mounts { + for _, m := range d.Server.Config().Mounts { mounted = false source := filepath.Clean(m.Source) target := filepath.Clean(m.Target) @@ -685,16 +772,17 @@ func (d *DockerEnvironment) Create() error { break } - log := log.WithFields(log.Fields{ - "server": d.Server.Uuid, + logger := log.WithFields(log.Fields{ + "server": d.Server.Id(), "source_path": source, "target_path": target, "read_only": m.ReadOnly, }) + if mounted { - log.Debug("attaching mount to server's container") + logger.Debug("attaching mount to server's container") } else { - log.Warn("skipping mount because it isn't allowed") + logger.Warn("skipping mount because it isn't allowed") } } @@ -738,7 +826,7 @@ func (d *DockerEnvironment) Create() error { NetworkMode: container.NetworkMode(config.Get().Docker.Network.Mode), } - if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Uuid); err != nil { + if _, err := d.Client.ContainerCreate(context.Background(), conf, hostConf, nil, d.Server.Id()); err != nil { return errors.WithStack(err) } @@ -748,7 +836,7 @@ func (d *DockerEnvironment) Create() error { // Sends the specified command to the stdin of the running container instance. There is no // confirmation that this data is sent successfully, only that it gets pushed into the stdin. func (d *DockerEnvironment) SendCommand(c string) error { - if !d.attached { + if !d.IsAttached() { return errors.New("attempting to send command to non-attached instance") } @@ -760,7 +848,7 @@ func (d *DockerEnvironment) SendCommand(c string) error { // Reads the log file for the server. This does not care if the server is running or not, it will // simply try to read the last X bytes of the file and return them. func (d *DockerEnvironment) Readlog(len int64) ([]string, error) { - j, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) + j, err := d.Client.ContainerInspect(context.Background(), d.Server.Id()) if err != nil { return nil, err } @@ -837,7 +925,7 @@ func (d *DockerEnvironment) parseLogToStrings(b []byte) ([]string, error) { func (d *DockerEnvironment) portBindings() nat.PortMap { var out = nat.PortMap{} - for ip, ports := range d.Server.Allocations.Mappings { + for ip, ports := range d.Server.Config().Allocations.Mappings { for _, port := range ports { // Skip over invalid ports. if port < 0 || port > 65535 { @@ -887,14 +975,14 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet { // the same or higher than the memory limit. func (d *DockerEnvironment) getResourcesForServer() container.Resources { return container.Resources{ - Memory: d.Server.Build.BoundedMemoryLimit(), - MemoryReservation: d.Server.Build.MemoryLimit * 1_000_000, - MemorySwap: d.Server.Build.ConvertedSwap(), - CPUQuota: d.Server.Build.ConvertedCpuLimit(), + Memory: d.Server.Build().BoundedMemoryLimit(), + MemoryReservation: d.Server.Build().MemoryLimit * 1_000_000, + MemorySwap: d.Server.Build().ConvertedSwap(), + CPUQuota: d.Server.Build().ConvertedCpuLimit(), CPUPeriod: 100_000, CPUShares: 1024, - BlkioWeight: d.Server.Build.IoWeight, - OomKillDisable: &d.Server.Container.OomDisabled, - CpusetCpus: d.Server.Build.Threads, + BlkioWeight: d.Server.Build().IoWeight, + OomKillDisable: &d.Server.Config().Container.OomDisabled, + CpusetCpus: d.Server.Build().Threads, } } diff --git a/server/events.go b/server/events.go index 5ea6ceb..71da55b 100644 --- a/server/events.go +++ b/server/events.go @@ -9,12 +9,14 @@ import ( // Defines all of the possible output events for a server. // noinspection GoNameStartsWithPackageName const ( - DaemonMessageEvent = "daemon message" - InstallOutputEvent = "install output" - ConsoleOutputEvent = "console output" - StatusEvent = "status" - StatsEvent = "stats" - BackupCompletedEvent = "backup completed" + DaemonMessageEvent = "daemon message" + InstallOutputEvent = "install output" + InstallStartedEvent = "install started" + InstallCompletedEvent = "install completed" + ConsoleOutputEvent = "console output" + StatusEvent = "status" + StatsEvent = "stats" + BackupCompletedEvent = "backup completed" ) type Event struct { @@ -30,6 +32,9 @@ type EventBus struct { // Returns the server's emitter instance. func (s *Server) Events() *EventBus { + s.emitterLock.Lock() + defer s.emitterLock.Unlock() + if s.emitter == nil { s.emitter = &EventBus{ subscribers: map[string][]chan Event{}, @@ -83,11 +88,27 @@ func (e *EventBus) Subscribe(topic string, ch chan Event) { e.Lock() defer e.Unlock() - if p, ok := e.subscribers[topic]; ok { - e.subscribers[topic] = append(p, ch) - } else { + p, ok := e.subscribers[topic] + + // If there is nothing currently subscribed to this topic just go ahead and create + // the item and then return. + if !ok { e.subscribers[topic] = append([]chan Event{}, ch) + return } + + // If this topic is already setup, first iterate over the event channels currently in there + // and confirm there is not a match. If there _is_ a match do nothing since that means this + // channel is already being tracked. This avoids registering two identical handlers for the + // same topic, and means the Unsubscribe function can safely assume there will only be a + // single match for an event. + for i := range e.subscribers[topic] { + if ch == e.subscribers[topic][i] { + return + } + } + + e.subscribers[topic] = append(p, ch) } // Unsubscribe a channel from a topic. @@ -99,6 +120,10 @@ func (e *EventBus) Unsubscribe(topic string, ch chan Event) { for i := range e.subscribers[topic] { if ch == e.subscribers[topic][i] { e.subscribers[topic] = append(e.subscribers[topic][:i], e.subscribers[topic][i+1:]...) + // Subscribe enforces a unique event channel for the topic, so we can safely exit + // this loop once matched since there should not be any additional matches after + // this point. + break } } } diff --git a/server/filesystem.go b/server/filesystem.go index 01a7b48..4c1ea69 100644 --- a/server/filesystem.go +++ b/server/filesystem.go @@ -26,18 +26,27 @@ import ( ) // Error returned when there is a bad path provided to one of the FS calls. -var InvalidPathResolution = errors.New("invalid path resolution") +type PathResolutionError struct{} + +// Returns the error response in a string form that can be more easily consumed. +func (pre PathResolutionError) Error() string { + return "invalid path resolution" +} + +func IsPathResolutionError(err error) bool { + _, ok := err.(PathResolutionError) + + return ok +} type Filesystem struct { - // The server object associated with this Filesystem. - Server *Server - - Configuration *config.SystemConfiguration + Server *Server + cacheDiskMu sync.Mutex } // Returns the root path that contains all of a server's data. func (fs *Filesystem) Path() string { - return filepath.Join(fs.Configuration.Data, fs.Server.Uuid) + return filepath.Join(config.Get().System.Data, fs.Server.Id()) } // Normalizes a directory being passed in to ensure the user is not able to escape @@ -49,12 +58,8 @@ func (fs *Filesystem) Path() string { func (fs *Filesystem) SafePath(p string) (string, error) { var nonExistentPathResolution string - // Calling filpath.Clean on the joined directory will resolve it to the absolute path, - // removing any ../ type of resolution arguments, and leaving us with a direct path link. - // - // This will also trim the existing root path off the beginning of the path passed to - // the function since that can get a bit messy. - r := filepath.Clean(filepath.Join(fs.Path(), strings.TrimPrefix(p, fs.Path()))) + // Start with a cleaned up path before checking the more complex bits. + r := fs.unsafeFilePath(p) // At the same time, evaluate the symlink status and determine where this file or folder // is truly pointing to. @@ -72,7 +77,7 @@ func (fs *Filesystem) SafePath(p string) (string, error) { for k := range parts { try = strings.Join(parts[:(len(parts)-k)], "/") - if !strings.HasPrefix(try, fs.Path()) { + if !fs.unsafeIsInDataDirectory(try) { break } @@ -87,8 +92,8 @@ func (fs *Filesystem) SafePath(p string) (string, error) { // If the new path doesn't start with their root directory there is clearly an escape // attempt going on, and we should NOT resolve this path for them. if nonExistentPathResolution != "" { - if !strings.HasPrefix(nonExistentPathResolution, fs.Path()) { - return "", InvalidPathResolution + if !fs.unsafeIsInDataDirectory(nonExistentPathResolution) { + return "", PathResolutionError{} } // If the nonExistentPathResolution variable is not empty then the initial path requested @@ -101,11 +106,51 @@ func (fs *Filesystem) SafePath(p string) (string, error) { // If the requested directory from EvalSymlinks begins with the server root directory go // ahead and return it. If not we'll return an error which will block any further action // on the file. - if strings.HasPrefix(p, fs.Path()) { + if fs.unsafeIsInDataDirectory(p) { return p, nil } - return "", InvalidPathResolution + return "", PathResolutionError{} +} + +// Generate a path to the file by cleaning it up and appending the root server path to it. This +// DOES NOT gaurantee that the file resolves within the server data directory. You'll want to use +// the fs.unsafeIsInDataDirectory(p) function to confirm. +func (fs *Filesystem) unsafeFilePath(p string) string { + // Calling filpath.Clean on the joined directory will resolve it to the absolute path, + // removing any ../ type of resolution arguments, and leaving us with a direct path link. + // + // This will also trim the existing root path off the beginning of the path passed to + // the function since that can get a bit messy. + return filepath.Clean(filepath.Join(fs.Path(), strings.TrimPrefix(p, fs.Path()))) +} + +// Check that that path string starts with the server data directory path. This function DOES NOT +// validate that the rest of the path does not end up resolving out of this directory, or that the +// targeted file or folder is not a symlink doing the same thing. +func (fs *Filesystem) unsafeIsInDataDirectory(p string) bool { + return strings.HasPrefix(strings.TrimSuffix(p, "/")+"/", strings.TrimSuffix(fs.Path(), "/")+"/") +} + +// Helper function to keep some of the codebase a little cleaner. Returns a "safe" version of the path +// joined with a file. This is important because you cannot just assume that appending a file to a cleaned +// path will result in a cleaned path to that file. For example, imagine you have the following scenario: +// +// my_bad_file -> symlink:/etc/passwd +// +// cleaned := SafePath("../../etc") -> "/" +// filepath.Join(cleaned, my_bad_file) -> "/my_bad_file" +// +// You might think that "/my_bad_file" is fine since it isn't pointing to the original "../../etc/my_bad_file". +// However, this doesn't account for symlinks where the file might be pointing outside of the directory, so +// calling a function such as Chown against it would chown the symlinked location, and not the file within the +// Wings daemon. +func (fs *Filesystem) SafeJoin(dir string, f os.FileInfo) (string, error) { + if f.Mode()&os.ModeSymlink != 0 { + return fs.SafePath(filepath.Join(dir, f.Name())) + } + + return filepath.Join(dir, f.Name()), nil } // Executes the fs.SafePath function in parallel against an array of paths. If any of the calls @@ -162,18 +207,45 @@ func (fs *Filesystem) ParallelSafePath(paths []string) ([]string, error) { // Because determining the amount of space being used by a server is a taxing operation we // will load it all up into a cache and pull from that as long as the key is not expired. func (fs *Filesystem) HasSpaceAvailable() bool { - var space = fs.Server.Build.DiskSpace + space := fs.Server.Build().DiskSpace + + size, err := fs.getCachedDiskUsage() + if err != nil { + fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size") + } + + // Determine if their folder size, in bytes, is smaller than the amount of space they've + // been allocated. + fs.Server.Proc().SetDisk(size) // If space is -1 or 0 just return true, means they're allowed unlimited. + // + // Technically we could skip disk space calculation because we don't need to check if the server exceeds it's limit + // but because this method caches the disk usage it would be best to calculate the disk usage and always + // return true. if space <= 0 { return true } - // If we have a match in the cache, use that value in the return. No need to perform an expensive - // disk operation, even if this is an empty value. - if x, exists := fs.Server.Cache.Get("disk_used"); exists { - fs.Server.Resources.Disk = x.(int64) - return (x.(int64) / 1000.0 / 1000.0) <= space + return (size / 1000.0 / 1000.0) <= space +} + +// Internal helper function to allow other parts of the codebase to check the total used disk space +// as needed without overly taxing the system. This will prioritize the value from the cache to avoid +// excessive IO usage. We will only walk the filesystem and determine the size of the directory if there +// is no longer a cached value. +func (fs *Filesystem) getCachedDiskUsage() (int64, error) { + // Obtain an exclusive lock on this process so that we don't unintentionally run it at the same + // time as another running process. Once the lock is available it'll read from the cache for the + // second call rather than hitting the disk in parallel. + // + // This effectively the same speed as running this call in parallel since this cache will return + // instantly on the second call. + fs.cacheDiskMu.Lock() + defer fs.cacheDiskMu.Unlock() + + if x, exists := fs.Server.cache.Get("disk_used"); exists { + return x.(int64), nil } // If there is no size its either because there is no data (in which case running this function @@ -181,37 +253,30 @@ func (fs *Filesystem) HasSpaceAvailable() bool { // grab the size of their data directory. This is a taxing operation, so we want to store it in // the cache once we've gotten it. size, err := fs.DirectorySize("/") - if err != nil { - fs.Server.Log().WithField("error", err).Warn("failed to determine root server directory size") - } // Always cache the size, even if there is an error. We want to always return that value // so that we don't cause an endless loop of determining the disk size if there is a temporary // error encountered. - fs.Server.Cache.Set("disk_used", size, time.Second*60) + fs.Server.cache.Set("disk_used", size, time.Second*60) - // Determine if their folder size, in bytes, is smaller than the amount of space they've - // been allocated. - fs.Server.Resources.Disk = size - - return (size / 1000.0 / 1000.0) <= space + return size, err } // Determines the directory size of a given location by running parallel tasks to iterate // through all of the folders. Returns the size in bytes. This can be a fairly taxing operation // on locations with tons of files, so it is recommended that you cache the output. func (fs *Filesystem) DirectorySize(dir string) (int64, error) { - w := fs.NewWalker() - ctx := context.Background() - var size int64 - err := w.Walk(dir, ctx, func(f os.FileInfo, _ string) bool { - // Only increment the size when we're dealing with a file specifically, otherwise - // just continue digging deeper until there are no more directories to iterate over. + err := fs.Walk(dir, func(_ string, f os.FileInfo, err error) error { + if err != nil { + return fs.handleWalkerError(err, f) + } + if !f.IsDir() { atomic.AddInt64(&size, f.Size()) } - return true + + return nil }) return size, err @@ -293,7 +358,7 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error { // Finally, chown the file to ensure the permissions don't end up out-of-whack // if we had just created it. - return fs.Chown(p) + return fs.Chown(cleaned) } // Defines the stat struct object. @@ -409,7 +474,7 @@ func (fs *Filesystem) Chown(path string) error { if s, err := os.Stat(cleaned); err != nil { return errors.WithStack(err) } else if !s.IsDir() { - return os.Chown(cleaned, fs.Configuration.User.Uid, fs.Configuration.User.Gid) + return os.Chown(cleaned, config.Get().System.User.Uid, config.Get().System.User.Gid) } return fs.chownDirectory(cleaned) @@ -435,16 +500,27 @@ func (fs *Filesystem) chownDirectory(path string) error { } for _, f := range files { + // Do not attempt to chmod a symlink. Go's os.Chown function will affect the symlink + // so if it points to a location outside the data directory the user would be able to + // (un)intentionally modify that files permissions. + if f.Mode()&os.ModeSymlink != 0 { + continue + } + + p, err := fs.SafeJoin(cleaned, f) + if err != nil { + return err + } + if f.IsDir() { wg.Add(1) go func(p string) { defer wg.Done() fs.chownDirectory(p) - }(filepath.Join(cleaned, f.Name())) + }(p) } else { - // Chown the file. - os.Chown(filepath.Join(cleaned, f.Name()), fs.Configuration.User.Uid, fs.Configuration.User.Gid) + os.Chown(p, config.Get().System.User.Uid, config.Get().System.User.Gid) } } @@ -536,17 +612,26 @@ func (fs *Filesystem) Copy(p string) error { // Deletes a file or folder from the system. Prevents the user from accidentally // (or maliciously) removing their root server data directory. func (fs *Filesystem) Delete(p string) error { - cleaned, err := fs.SafePath(p) - if err != nil { - return errors.WithStack(err) + // This is one of the few (only?) places in the codebase where we're explictly not using + // the SafePath functionality when working with user provided input. If we did, you would + // not be able to delete a file that is a symlink pointing to a location outside of the data + // directory. + // + // We also want to avoid resolving a symlink that points _within_ the data directory and thus + // deleting the actual source file for the symlink rather than the symlink itself. For these + // purposes just resolve the actual file path using filepath.Join() and confirm that the path + // exists within the data directory. + resolved := fs.unsafeFilePath(p) + if !fs.unsafeIsInDataDirectory(resolved) { + return PathResolutionError{} } // Block any whoopsies. - if cleaned == fs.Path() { + if resolved == fs.Path() { return errors.New("cannot delete root server directory") } - return os.RemoveAll(cleaned) + return os.RemoveAll(resolved) } // Lists the contents of a given directory and returns stat information about each @@ -579,7 +664,14 @@ func (fs *Filesystem) ListDirectory(p string) ([]*Stat, error) { var m = "inode/directory" if !f.IsDir() { - m, _, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name())) + cleanedp, _ := fs.SafeJoin(cleaned, f) + if cleanedp != "" { + m, _, _ = mimetype.DetectFile(filepath.Join(cleaned, f.Name())) + } else { + // Just pass this for an unknown type because the file could not safely be resolved within + // the server data path. + m = "application/octet-stream" + } } out[idx] = &Stat{ @@ -636,9 +728,6 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In return nil, err } - w := fs.NewWalker() - ctx := context.Background() - i, err := ignore.CompileIgnoreLines(ignored...) if err != nil { return nil, err @@ -647,7 +736,12 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In // Walk through all of the files and directories on a server. This callback only returns // files found, and will keep walking deeper and deeper into directories. inc := new(backup.IncludedFiles) - if err := w.Walk(cleaned, ctx, func(f os.FileInfo, p string) bool { + + if err := fs.Walk(cleaned, func(p string, f os.FileInfo, err error) error { + if err != nil { + return fs.handleWalkerError(err, f) + } + // Avoid unnecessary parsing if there are no ignored files, nothing will match anyways // so no reason to call the function. if len(ignored) == 0 || !i.MatchesPath(strings.TrimPrefix(p, fs.Path()+"/")) { @@ -657,7 +751,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In // We can't just abort if the path is technically ignored. It is possible there is a nested // file or folder that should not be excluded, so in this case we need to just keep going // until we get to a final state. - return true + return nil }); err != nil { return nil, err } @@ -665,7 +759,7 @@ func (fs *Filesystem) GetIncludedFiles(dir string, ignored []string) (*backup.In return inc, nil } -// Compresses all of the files matching the given paths in the specififed directory. This function +// Compresses all of the files matching the given paths in the specified directory. This function // also supports passing nested paths to only compress certain files and folders when working in // a larger directory. This effectively creates a local backup, but rather than ignoring specific // files and folders, it takes an allow-list of files and folders. @@ -688,46 +782,58 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er return nil, err } - w := fs.NewWalker() - wg := new(sync.WaitGroup) - inc := new(backup.IncludedFiles) // Iterate over all of the cleaned paths and merge them into a large object of final file // paths to pass into the archiver. As directories are encountered this will drop into them // and look for all of the files. for _, p := range cleaned { - wg.Add(1) + f, err := os.Stat(p) + if err != nil { + fs.Server.Log().WithField("error", err).WithField("path", p).Debug("failed to stat file or directory for compression") + continue + } - go func(pa string) { - defer wg.Done() + if f.IsDir() { + err := fs.Walk(p, func(s string, info os.FileInfo, err error) error { + if err != nil { + return fs.handleWalkerError(err, info) + } + + if !info.IsDir() { + inc.Push(&info, s) + } + + return nil + }) - f, err := os.Stat(pa) if err != nil { - fs.Server.Log().WithField("error", err).WithField("path", pa).Warn("failed to stat file or directory for compression") - return + return nil, err } - - if f.IsDir() { - // Recursively drop into directory and get all of the additional files and directories within - // it that should be included in this backup. - w.Walk(pa, context.Background(), func(info os.FileInfo, s string) bool { - if !info.IsDir() { - inc.Push(&info, s) - } - - return true - }) - } else { - inc.Push(&f, pa) - } - }(p) + } else { + inc.Push(&f, p) + } } - wg.Wait() - a := &backup.Archive{TrimPrefix: fs.Path(), Files: inc} d := path.Join(cleanedRootDir, fmt.Sprintf("archive-%s.tar.gz", strings.ReplaceAll(time.Now().Format(time.RFC3339), ":", ""))) return a.Create(d, context.Background()) } + +// Handle errors encountered when walking through directories. +// +// If there is a path resolution error just skip the item entirely. Only return this for a +// directory, otherwise return nil. Returning this error for a file will stop the walking +// for the remainder of the directory. This is assuming an os.FileInfo struct was even returned. +func (fs *Filesystem) handleWalkerError(err error, f os.FileInfo) error { + if !IsPathResolutionError(err) { + return err + } + + if f != nil && f.IsDir() { + return filepath.SkipDir + } + + return nil +} diff --git a/server/filesystem_unarchive.go b/server/filesystem_unarchive.go new file mode 100644 index 0000000..9e40c28 --- /dev/null +++ b/server/filesystem_unarchive.go @@ -0,0 +1,130 @@ +package server + +import ( + "archive/tar" + "archive/zip" + "compress/gzip" + "fmt" + "github.com/mholt/archiver/v3" + "github.com/pkg/errors" + "io" + "os" + "path/filepath" + "reflect" + "strings" + "sync" + "sync/atomic" +) + +// Look through a given archive and determine if decompressing it would put the server over +// its allocated disk space limit. +func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (bool, error) { + // Don't waste time trying to determine this if we know the server will have the space for + // it since there is no limit. + if fs.Server.Build().DiskSpace <= 0 { + return true, nil + } + + source, err := fs.SafePath(filepath.Join(dir, file)) + if err != nil { + return false, err + } + + wg := new(sync.WaitGroup) + + var dirSize int64 + var cErr error + // Get the cached size in a parallel process so that if it is not cached we are not + // waiting an unnecessary amount of time on this call. + go func() { + wg.Add(1) + defer wg.Done() + + dirSize, cErr = fs.getCachedDiskUsage() + }() + + var size int64 + // In a seperate thread, walk over the archive and figure out just how large the final + // output would be from dearchiving it. + go func() { + wg.Add(1) + defer wg.Done() + + // Walk all of the files and calculate the total decompressed size of this archive. + archiver.Walk(source, func(f archiver.File) error { + atomic.AddInt64(&size, f.Size()) + + return nil + }) + }() + + wg.Wait() + + return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.Build().DiskSpace, cErr +} + +// Decompress a file in a given directory by using the archiver tool to infer the file +// type and go from there. This will walk over all of the files within the given archive +// and ensure that there is not a zip-slip attack being attempted by validating that the +// final path is within the server data directory. +func (fs *Filesystem) DecompressFile(dir string, file string) error { + source, err := fs.SafePath(filepath.Join(dir, file)) + if err != nil { + return errors.WithStack(err) + } + + // Walk over all of the files spinning up an additional go-routine for each file we've encountered + // and then extract that file from the archive and write it to the disk. If any part of this process + // encounters an error the entire process will be stopped. + return archiver.Walk(source, func(f archiver.File) error { + // Don't waste time with directories, we don't need to create them if they have no contents, and + // we will ensure the directory exists when opening the file for writing anyways. + if f.IsDir() { + return nil + } + + return fs.extractFileFromArchive(f) + }) +} + +// Extracts a single file from the archive and writes it to the disk after verifying that it will end +// up in the server data directory. +func (fs *Filesystem) extractFileFromArchive(f archiver.File) error { + var name string + + switch s := f.Sys().(type) { + case *tar.Header: + name = s.Name + case *gzip.Header: + name = s.Name + case *zip.FileHeader: + name = s.Name + default: + return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String())) + } + + // Guard against a zip-slip attack and prevent writing a file to a destination outside of + // the server root directory. + p, err := fs.SafePath(name) + if err != nil { + return err + } + + // Ensure the directory structure for this file exists before trying to write the file + // to the disk, otherwise we'll have some unexpected fun. + if err := os.MkdirAll(strings.TrimSuffix(p, filepath.Base(p)), 0755); err != nil { + return err + } + + // Open the file and truncate it if it already exists. + o, err := os.OpenFile(p, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + + defer o.Close() + + _, cerr := io.Copy(o, f) + + return cerr +} diff --git a/server/filesystem_walker.go b/server/filesystem_walker.go index 0db761a..7594ffe 100644 --- a/server/filesystem_walker.go +++ b/server/filesystem_walker.go @@ -2,69 +2,140 @@ package server import ( "context" - "golang.org/x/sync/errgroup" + "github.com/gammazero/workerpool" "io/ioutil" "os" "path/filepath" + "runtime" + "sync" ) type FileWalker struct { *Filesystem } +type PooledFileWalker struct { + wg sync.WaitGroup + pool *workerpool.WorkerPool + callback filepath.WalkFunc + cancel context.CancelFunc + + err error + errOnce sync.Once + + Filesystem *Filesystem +} + // Returns a new walker instance. func (fs *Filesystem) NewWalker() *FileWalker { return &FileWalker{fs} } -// Iterate over all of the files and directories within a given directory. When a file is -// found the callback will be called with the file information. If a directory is encountered -// it will be recursively passed back through to this function. -func (fw *FileWalker) Walk(dir string, ctx context.Context, callback func (os.FileInfo, string) bool) error { - cleaned, err := fw.SafePath(dir) +// Creates a new pooled file walker that will concurrently walk over a given directory but limit itself +// to a worker pool as to not completely flood out the system or cause a process crash. +func newPooledWalker(fs *Filesystem) *PooledFileWalker { + return &PooledFileWalker{ + Filesystem: fs, + // Create a worker pool that is the same size as the number of processors available on the + // system. Going much higher doesn't provide much of a performance boost, and is only more + // likely to lead to resource overloading anyways. + pool: workerpool.New(runtime.GOMAXPROCS(0)), + } +} + +// Process a given path by calling the callback function for all of the files and directories within +// the path, and then dropping into any directories that we come across. +func (w *PooledFileWalker) process(path string) error { + p, err := w.Filesystem.SafePath(path) if err != nil { return err } - // Get all of the files from this directory. - files, err := ioutil.ReadDir(cleaned) + files, err := ioutil.ReadDir(p) if err != nil { return err } - // Create an error group that we can use to run processes in parallel while retaining - // the ability to cancel the entire process immediately should any of it fail. - g, ctx := errgroup.WithContext(ctx) - + // Loop over all of the files and directories in the given directory and call the provided + // callback function. If we encounter a directory, push that directory onto the worker queue + // to be processed. for _, f := range files { - if f.IsDir() { - fi := f - p := filepath.Join(cleaned, f.Name()) - // Recursively call this function to continue digging through the directory tree within - // a seperate goroutine. If the context is canceled abort this process. - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // If the callback returns true, go ahead and keep walking deeper. This allows - // us to programatically continue deeper into directories, or stop digging - // if that pathway knows it needs nothing else. - if callback(fi, p) { - return fw.Walk(p, ctx, callback) - } + sp, err := w.Filesystem.SafeJoin(p, f) + if err != nil { + // Let the callback function handle what to do if there is a path resolution error because a + // dangerous path was resolved. If there is an error returned, return from this entire process + // otherwise just skip over this specific file. We don't care if its a file or a directory at + // this point since either way we're skipping it, however, still check for the SkipDir since that + // would be thrown otherwise. + if err = w.callback(sp, f, err); err != nil && err != filepath.SkipDir { + return err + } - return nil - } - }) - } else { - // If this isn't a directory, go ahead and pass the file information into the - // callback. We don't care about the response since we won't be stepping into - // anything from here. - callback(f, filepath.Join(cleaned, f.Name())) + continue + } + + i, err := os.Stat(sp) + // You might end up getting an error about a file or folder not existing if the given path + // if it is an invalid symlink. We can safely just skip over these files I believe. + if os.IsNotExist(err) { + continue + } + + // Call the user-provided callback for this file or directory. If an error is returned that is + // not a SkipDir call, abort the entire process and bubble that error up. + if err = w.callback(sp, i, err); err != nil && err != filepath.SkipDir { + return err + } + + // If this is a directory, and we didn't get a SkipDir error, continue through by pushing another + // job to the pool to handle it. If we requested a skip, don't do anything just continue on to the + // next item. + if i.IsDir() && err != filepath.SkipDir { + w.push(sp) + } else if !i.IsDir() && err == filepath.SkipDir { + // Per the spec for the callback, if we get a SkipDir error but it is returned for an item + // that is _not_ a directory, abort the remaining operations on the directory. + return nil } } - // Block until all of the routines finish and have returned a value. - return g.Wait() -} \ No newline at end of file + return nil +} + +// Push a new path into the worker pool and increment the waitgroup so that we do not return too +// early and cause panic's as internal directories attempt to submit to the pool. +func (w *PooledFileWalker) push(path string) { + w.wg.Add(1) + w.pool.Submit(func() { + defer w.wg.Done() + if err := w.process(path); err != nil { + w.errOnce.Do(func() { + w.err = err + if w.cancel != nil { + w.cancel() + } + }) + } + }) +} + +// Walks the given directory and executes the callback function for all of the files and directories +// that are encountered. +func (fs *Filesystem) Walk(dir string, callback filepath.WalkFunc) error { + w := newPooledWalker(fs) + w.callback = callback + + _, cancel := context.WithCancel(context.Background()) + w.cancel = cancel + + w.push(dir) + + w.wg.Wait() + w.pool.StopWait() + + if w.err != nil { + return w.err + } + + return nil +} diff --git a/server/install.go b/server/install.go index 6a7805e..54ebcfb 100644 --- a/server/install.go +++ b/server/install.go @@ -36,6 +36,9 @@ func (s *Server) Install(sync bool) error { } } + // Send the start event so the Panel can automatically update. + s.Events().Publish(InstallStartedEvent, "") + err := s.internalInstall() s.Log().Debug("notifying panel of server install state") @@ -52,6 +55,10 @@ func (s *Server) Install(sync bool) error { l.Warn("failed to notify panel of server install state") } + // Push an event to the websocket so we can auto-refresh the information in the panel once + // the install is completed. + s.Events().Publish(InstallCompletedEvent, "") + return err } @@ -70,7 +77,7 @@ func (s *Server) Reinstall() error { // Internal installation function used to simplify reporting back to the Panel. func (s *Server) internalInstall() error { - script, rerr, err := api.NewRequester().GetInstallationScript(s.Uuid) + script, rerr, err := api.NewRequester().GetInstallationScript(s.Id()) if err != nil || rerr != nil { if err != nil { return err @@ -170,7 +177,7 @@ func (s *Server) AbortInstallation() { // Removes the installer container for the server. func (ip *InstallationProcess) RemoveContainer() { - err := ip.client.ContainerRemove(ip.context, ip.Server.Uuid+"_installer", types.ContainerRemoveOptions{ + err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", types.ContainerRemoveOptions{ RemoveVolumes: true, Force: true, }) @@ -313,7 +320,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) { Force: true, } - if err := ip.client.ContainerRemove(ip.context, ip.Server.Uuid+"_installer", opts); err != nil { + if err := ip.client.ContainerRemove(ip.context, ip.Server.Id()+"_installer", opts); err != nil { if !client.IsErrNotFound(err) { e = append(e, err) } @@ -333,7 +340,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) { // Returns the log path for the installation process. func (ip *InstallationProcess) GetLogPath() string { - return filepath.Join(config.Get().System.GetInstallLogPath(), ip.Server.Uuid+".log") + return filepath.Join(config.Get().System.GetInstallLogPath(), ip.Server.Id()+".log") } // Cleans up after the execution of the installation process. This grabs the logs from the @@ -369,7 +376,7 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error { | | Details | ------------------------------ - Server UUID: {{.Server.Uuid}} + Server UUID: {{.Server.Id}} Container Image: {{.Script.ContainerImage}} Container Entrypoint: {{.Script.Entrypoint}} @@ -448,7 +455,7 @@ func (ip *InstallationProcess) Execute(installPath string) (string, error) { } ip.Server.Log().WithField("install_script", installPath+"/install.sh").Info("creating install container for server process") - r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Uuid+"_installer") + r, err := ip.client.ContainerCreate(ip.context, conf, hostConf, nil, ip.Server.Id()+"_installer") if err != nil { return "", errors.WithStack(err) } @@ -516,7 +523,7 @@ func (ip *InstallationProcess) StreamOutput(id string) error { func (s *Server) SyncInstallState(successful bool) error { r := api.NewRequester() - rerr, err := r.SendInstallationStatus(s.Uuid, successful) + rerr, err := r.SendInstallationStatus(s.Id(), successful) if rerr != nil || err != nil { if err != nil { return errors.WithStack(err) diff --git a/server/listeners.go b/server/listeners.go index 73df884..bcb4bf7 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -27,9 +27,11 @@ func (s *Server) onConsoleOutput(data string) { // If the specific line of output is one that would mark the server as started, // set the server to that state. Only do this if the server is not currently stopped // or stopping. - if s.GetState() == ProcessStartingState && strings.Contains(data, s.processConfiguration.Startup.Done) { + match := s.ProcessConfiguration().Startup.Done + + if s.GetState() == ProcessStartingState && strings.Contains(data, match) { s.Log().WithFields(log.Fields{ - "match": s.processConfiguration.Startup.Done, + "match": match, "against": data, }).Debug("detected server in running state based on console line output") @@ -40,7 +42,8 @@ func (s *Server) onConsoleOutput(data string) { // set the server to be in a stopping state, otherwise crash detection will kick in and // cause the server to unexpectedly restart on the user. if s.IsRunning() { - if s.processConfiguration.Stop.Type == api.ProcessStopCommand && data == s.processConfiguration.Stop.Value { + stop := s.ProcessConfiguration().Stop + if stop.Type == api.ProcessStopCommand && data == stop.Value { s.SetState(ProcessStoppingState) } } diff --git a/server/loader.go b/server/loader.go new file mode 100644 index 0000000..70a7d30 --- /dev/null +++ b/server/loader.go @@ -0,0 +1,120 @@ +package server + +import ( + "github.com/apex/log" + "github.com/creasty/defaults" + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + "github.com/pterodactyl/wings/api" + "github.com/remeh/sizedwaitgroup" + "time" +) + +var servers = NewCollection(nil) + +func GetServers() *Collection { + return servers +} + +// Iterates over a given directory and loads all of the servers listed before returning +// them to the calling function. +func LoadDirectory() error { + if len(servers.items) != 0 { + return errors.New("cannot call LoadDirectory with a non-nil collection") + } + + // We could theoretically use a standard wait group here, however doing + // that introduces the potential to crash the program due to too many + // open files. This wouldn't happen on a small setup, but once the daemon is + // handling many servers you run that risk. + // + // For now just process 10 files at a time, that should be plenty fast to + // read and parse the YAML. We should probably make this configurable down + // the road to help big instances scale better. + wg := sizedwaitgroup.New(10) + + configs, rerr, err := api.NewRequester().GetAllServerConfigurations() + if err != nil || rerr != nil { + if err != nil { + return errors.WithStack(err) + } + + return errors.New(rerr.String()) + } + + log.Debug("retrieving cached server states from disk") + states, err := getServerStates() + if err != nil { + return errors.WithStack(err) + } + + log.WithField("total_configs", len(configs)).Debug("looping over received configurations from API") + for uuid, data := range configs { + wg.Add() + + go func(uuid string, data *api.ServerConfigurationResponse) { + defer wg.Done() + + log.WithField("uuid", uuid).Debug("creating server object from configuration") + s, err := FromConfiguration(data) + if err != nil { + log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...") + return + } + + if state, exists := states[s.Id()]; exists { + s.SetState(state) + s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file") + } + + servers.Add(s) + }(uuid, data) + } + + // Wait until we've processed all of the configuration files in the directory + // before continuing. + wg.Wait() + + return nil +} + +// Initializes a server using a data byte array. This will be marshaled into the +// given struct using a YAML marshaler. This will also configure the given environment +// for a server. +func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { + cfg := Configuration{} + if err := defaults.Set(&cfg); err != nil { + return nil, err + } + + s := new(Server) + s.cfg = cfg + + if err := s.UpdateDataStructure(data.Settings, false); err != nil { + return nil, err + } + + s.AddEventListeners() + + // Right now we only support a Docker based environment, so I'm going to hard code + // this logic in. When we're ready to support other environment we'll need to make + // some modifications here obviously. + if err := NewDockerEnvironment(s); err != nil { + return nil, err + } + + s.cache = cache.New(time.Minute*10, time.Minute*15) + s.Archiver = Archiver{ + Server: s, + } + s.Filesystem = Filesystem{ + Server: s, + } + + // Forces the configuration to be synced with the panel. + if err := s.SyncWithConfiguration(data); err != nil { + return nil, err + } + + return s, nil +} diff --git a/server/process.go b/server/process.go new file mode 100644 index 0000000..b8a489d --- /dev/null +++ b/server/process.go @@ -0,0 +1,10 @@ +package server + +import "github.com/pterodactyl/wings/api" + +func (s *Server) ProcessConfiguration() *api.ProcessConfiguration { + s.RLock() + defer s.RUnlock() + + return s.procConfig +} diff --git a/server/resources.go b/server/resources.go index 55f5cd1..f333cb8 100644 --- a/server/resources.go +++ b/server/resources.go @@ -3,27 +3,38 @@ package server import ( "github.com/docker/docker/api/types" "math" + "sync" + "sync/atomic" ) // Defines the current resource usage for a given server instance. If a server is offline you // should obviously expect memory and CPU usage to be 0. However, disk will always be returned // since that is not dependent on the server being running to collect that data. type ResourceUsage struct { + mu sync.RWMutex + + // The current server status. + State string `json:"state" default:"offline"` + // The total amount of memory, in bytes, that this server instance is consuming. This is // calculated slightly differently than just using the raw Memory field that the stats // return from the container, so please check the code setting this value for how that // is calculated. Memory uint64 `json:"memory_bytes"` + // The total amount of memory this container or resource can use. Inside Docker this is // going to be higher than you'd expect because we're automatically allocating overhead // abilities for the container, so its not going to be a perfect match. MemoryLimit uint64 `json:"memory_limit_bytes"` + // The absolute CPU usage is the amount of CPU used in relation to the entire system and // does not take into account any limits on the server process itself. CpuAbsolute float64 `json:"cpu_absolute"` + // The current disk space being used by the server. This is cached to prevent slow lookup // issues on frequent refreshes. Disk int64 `json:"disk_bytes"` + // Current network transmit in & out for a container. Network struct { RxBytes uint64 `json:"rx_bytes"` @@ -31,6 +42,66 @@ type ResourceUsage struct { } `json:"network"` } +// Returns the resource usage stats for the server instance. If the server is not running, only the +// disk space currently used will be returned. When the server is running all of the other stats will +// be returned. +// +// When a process is stopped all of the stats are zeroed out except for the disk. +func (s *Server) Proc() *ResourceUsage { + s.resources.mu.RLock() + defer s.resources.mu.RUnlock() + + return &s.resources +} + +// Returns the servers current state. +func (ru *ResourceUsage) getInternalState() string { + ru.mu.RLock() + defer ru.mu.RUnlock() + + return ru.State +} + +// Sets the new state for the server. +func (ru *ResourceUsage) setInternalState(state string) { + ru.mu.Lock() + ru.State = state + ru.mu.Unlock() +} + +// Resets the usages values to zero, used when a server is stopped to ensure we don't hold +// onto any values incorrectly. +func (ru *ResourceUsage) Empty() { + ru.mu.Lock() + defer ru.mu.Unlock() + + ru.Memory = 0 + ru.CpuAbsolute = 0 + ru.Network.TxBytes = 0 + ru.Network.RxBytes = 0 +} + +func (ru *ResourceUsage) SetDisk(i int64) { + ru.mu.Lock() + defer ru.mu.Unlock() + + ru.Disk = i +} + +func (ru *ResourceUsage) UpdateFromDocker(v *types.StatsJSON) { + ru.mu.Lock() + defer ru.mu.Unlock() + + ru.CpuAbsolute = ru.calculateDockerAbsoluteCpu(&v.PreCPUStats, &v.CPUStats) + ru.Memory = ru.calculateDockerMemory(v.MemoryStats) + ru.MemoryLimit = v.MemoryStats.Limit +} + +func (ru *ResourceUsage) UpdateNetworkBytes(nw *types.NetworkStats) { + atomic.AddUint64(&ru.Network.RxBytes, nw.RxBytes) + atomic.AddUint64(&ru.Network.TxBytes, nw.TxBytes) +} + // The "docker stats" CLI call does not return the same value as the types.MemoryStats.Usage // value which can be rather confusing to people trying to compare panel usage to // their stats output. @@ -40,7 +111,7 @@ type ResourceUsage struct { // correct memory value anyways. // // @see https://github.com/docker/cli/blob/96e1d1d6/cli/command/container/stats_helpers.go#L227-L249 -func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 { +func (ru *ResourceUsage) calculateDockerMemory(stats types.MemoryStats) uint64 { if v, ok := stats.Stats["total_inactive_file"]; ok && v < stats.Usage { return stats.Usage - v } @@ -56,7 +127,7 @@ func (ru *ResourceUsage) CalculateDockerMemory(stats types.MemoryStats) uint64 { // by the defined CPU limits on the container. // // @see https://github.com/docker/cli/blob/aa097cf1aa19099da70930460250797c8920b709/cli/command/container/stats_helpers.go#L166 -func (ru *ResourceUsage) CalculateAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 { +func (ru *ResourceUsage) calculateDockerAbsoluteCpu(pStats *types.CPUStats, stats *types.CPUStats) float64 { // Calculate the change in CPU usage between the current and previous reading. cpuDelta := float64(stats.CPUUsage.TotalUsage) - float64(pStats.CPUUsage.TotalUsage) diff --git a/server/server.go b/server/server.go index 7f48148..7530f36 100644 --- a/server/server.go +++ b/server/server.go @@ -4,99 +4,38 @@ import ( "context" "fmt" "github.com/apex/log" - "github.com/creasty/defaults" "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/pterodactyl/wings/api" - "github.com/pterodactyl/wings/config" - "github.com/remeh/sizedwaitgroup" "golang.org/x/sync/semaphore" - "math" "os" - "strconv" "strings" "sync" "time" ) -var servers *Collection - -func GetServers() *Collection { - return servers -} - -type EnvironmentVariables map[string]interface{} - -// Ugly hacky function to handle environment variables that get passed through as not-a-string -// from the Panel. Ideally we'd just say only pass strings, but that is a fragile idea and if a -// string wasn't passed through you'd cause a crash or the server to become unavailable. For now -// try to handle the most likely values from the JSON and hope for the best. -func (ev EnvironmentVariables) Get(key string) string { - val, ok := ev[key] - if !ok { - return "" - } - - switch val.(type) { - case int: - return strconv.Itoa(val.(int)) - case int32: - return strconv.FormatInt(val.(int64), 10) - case int64: - return strconv.FormatInt(val.(int64), 10) - case float32: - return fmt.Sprintf("%f", val.(float32)) - case float64: - return fmt.Sprintf("%f", val.(float64)) - case bool: - return strconv.FormatBool(val.(bool)) - } - - return val.(string) -} - // High level definition for a server instance being controlled by Wings. type Server struct { - // The unique identifier for the server that should be used when referencing - // it against the Panel API (and internally). This will be used when naming - // docker containers as well as in log output. - Uuid string `json:"uuid"` + // Internal mutex used to block actions that need to occur sequentially, such as + // writing the configuration to the disk. + sync.RWMutex + emitterLock sync.Mutex - // Whether or not the server is in a suspended state. Suspended servers cannot - // be started or modified except in certain scenarios by an admin user. - Suspended bool `json:"suspended"` + // Maintains the configuration for the server. This is the data that gets returned by the Panel + // such as build settings and container images. + cfg Configuration - // The power state of the server. - State string `default:"offline" json:"state"` - - // The command that should be used when booting up the server instance. - Invocation string `json:"invocation"` - - // An array of environment variables that should be passed along to the running - // server process. - EnvVars EnvironmentVariables `json:"environment"` - - Allocations Allocations `json:"allocations"` - Build BuildSettings `json:"build"` - CrashDetection CrashDetection `json:"crash_detection"` - Mounts []Mount `json:"mounts"` - Resources ResourceUsage `json:"resources"` + // The crash handler for this server instance. + crasher CrashHandler + resources ResourceUsage Archiver Archiver `json:"-"` Environment Environment `json:"-"` Filesystem Filesystem `json:"-"` - Container struct { - // Defines the Docker image that will be used for this server - Image string `json:"image,omitempty"` - // If set to true, OOM killer will be disabled on the server's Docker container. - // If not present (nil) we will default to disabling it. - OomDisabled bool `default:"true" json:"oom_disabled"` - } `json:"container,omitempty"` - // Server cache used to store frequently requested information in memory and make // certain long operations return faster. For example, FS disk space usage. - Cache *cache.Cache `json:"-"` + cache *cache.Cache // Events emitted by the server instance. emitter *EventBus @@ -104,17 +43,13 @@ type Server struct { // Defines the process configuration for the server instance. This is dynamically // fetched from the Pterodactyl Server instance each time the server process is // started, and then cached here. - processConfiguration *api.ProcessConfiguration + procConfig *api.ProcessConfiguration // Tracks the installation process for this server and prevents a server from running // two installer processes at the same time. This also allows us to cancel a running // installation process, for example when a server is deleted from the panel while the // installer process is still running. installer InstallerDetails - - // Internal mutex used to block actions that need to occur sequentially, such as - // writing the configuration to the disk. - sync.RWMutex } type InstallerDetails struct { @@ -127,183 +62,9 @@ type InstallerDetails struct { sem *semaphore.Weighted } -// The build settings for a given server that impact docker container creation and -// resource limits for a server instance. -type BuildSettings struct { - // The total amount of memory in megabytes that this server is allowed to - // use on the host system. - MemoryLimit int64 `json:"memory_limit"` - - // The amount of additional swap space to be provided to a container instance. - Swap int64 `json:"swap"` - - // The relative weight for IO operations in a container. This is relative to other - // containers on the system and should be a value between 10 and 1000. - IoWeight uint16 `json:"io_weight"` - - // The percentage of CPU that this instance is allowed to consume relative to - // the host. A value of 200% represents complete utilization of two cores. This - // should be a value between 1 and THREAD_COUNT * 100. - CpuLimit int64 `json:"cpu_limit"` - - // The amount of disk space in megabytes that a server is allowed to use. - DiskSpace int64 `json:"disk_space"` - - // Sets which CPU threads can be used by the docker instance. - Threads string `json:"threads"` -} - -// Converts the CPU limit for a server build into a number that can be better understood -// by the Docker environment. If there is no limit set, return -1 which will indicate to -// Docker that it has unlimited CPU quota. -func (b *BuildSettings) ConvertedCpuLimit() int64 { - if b.CpuLimit == 0 { - return -1 - } - - return b.CpuLimit * 1000 -} - -// Set the hard limit for memory usage to be 5% more than the amount of memory assigned to -// the server. If the memory limit for the server is < 4G, use 10%, if less than 2G use -// 15%. This avoids unexpected crashes from processes like Java which run over the limit. -func (b *BuildSettings) MemoryOverheadMultiplier() float64 { - if b.MemoryLimit <= 2048 { - return 1.15 - } else if b.MemoryLimit <= 4096 { - return 1.10 - } - - return 1.05 -} - -func (b *BuildSettings) BoundedMemoryLimit() int64 { - return int64(math.Round(float64(b.MemoryLimit) * b.MemoryOverheadMultiplier() * 1_000_000)) -} - -// Returns the amount of swap available as a total in bytes. This is returned as the amount -// of memory available to the server initially, PLUS the amount of additional swap to include -// which is the format used by Docker. -func (b *BuildSettings) ConvertedSwap() int64 { - if b.Swap < 0 { - return -1 - } - - return (b.Swap * 1_000_000) + b.BoundedMemoryLimit() -} - -// Defines the allocations available for a given server. When using the Docker environment -// driver these correspond to mappings for the container that allow external connections. -type Allocations struct { - // Defines the default allocation that should be used for this server. This is - // what will be used for {SERVER_IP} and {SERVER_PORT} when modifying configuration - // files or the startup arguments for a server. - DefaultMapping struct { - Ip string `json:"ip"` - Port int `json:"port"` - } `json:"default"` - - // Mappings contains all of the ports that should be assigned to a given server - // attached to the IP they correspond to. - Mappings map[string][]int `json:"mappings"` -} - -// Iterates over a given directory and loads all of the servers listed before returning -// them to the calling function. -func LoadDirectory() error { - // We could theoretically use a standard wait group here, however doing - // that introduces the potential to crash the program due to too many - // open files. This wouldn't happen on a small setup, but once the daemon is - // handling many servers you run that risk. - // - // For now just process 10 files at a time, that should be plenty fast to - // read and parse the YAML. We should probably make this configurable down - // the road to help big instances scale better. - wg := sizedwaitgroup.New(10) - - configs, rerr, err := api.NewRequester().GetAllServerConfigurations() - if err != nil || rerr != nil { - if err != nil { - return errors.WithStack(err) - } - - return errors.New(rerr.String()) - } - - states, err := getServerStates() - if err != nil { - return errors.WithStack(err) - } - - servers = NewCollection(nil) - - for uuid, data := range configs { - wg.Add() - - go func(uuid string, data *api.ServerConfigurationResponse) { - defer wg.Done() - - s, err := FromConfiguration(data) - if err != nil { - log.WithField("server", uuid).WithField("error", err).Error("failed to load server, skipping...") - return - } - - if state, exists := states[s.Uuid]; exists { - s.SetState(state) - s.Log().WithField("state", s.GetState()).Debug("loaded server state from cache file") - } - - servers.Add(s) - }(uuid, data) - } - - // Wait until we've processed all of the configuration files in the directory - // before continuing. - wg.Wait() - - return nil -} - -// Initializes a server using a data byte array. This will be marshaled into the -// given struct using a YAML marshaler. This will also configure the given environment -// for a server. -func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { - s := new(Server) - - if err := defaults.Set(s); err != nil { - return nil, err - } - - if err := s.UpdateDataStructure(data.Settings, false); err != nil { - return nil, err - } - - s.AddEventListeners() - - // Right now we only support a Docker based environment, so I'm going to hard code - // this logic in. When we're ready to support other environment we'll need to make - // some modifications here obviously. - if err := NewDockerEnvironment(s); err != nil { - return nil, err - } - - s.Cache = cache.New(time.Minute*10, time.Minute*15) - s.Archiver = Archiver{ - Server: s, - } - s.Filesystem = Filesystem{ - Configuration: &config.Get().System, - Server: s, - } - s.Resources = ResourceUsage{} - - // Forces the configuration to be synced with the panel. - if err := s.SyncWithConfiguration(data); err != nil { - return nil, err - } - - return s, nil +// Returns the UUID for the server instance. +func (s *Server) Id() string { + return s.Config().GetUuid() } // Returns all of the environment variables that should be assigned to a running @@ -313,28 +74,28 @@ func (s *Server) GetEnvironmentVariables() []string { var out = []string{ fmt.Sprintf("TZ=%s", zone), - fmt.Sprintf("STARTUP=%s", s.Invocation), - fmt.Sprintf("SERVER_MEMORY=%d", s.Build.MemoryLimit), - fmt.Sprintf("SERVER_IP=%s", s.Allocations.DefaultMapping.Ip), - fmt.Sprintf("SERVER_PORT=%d", s.Allocations.DefaultMapping.Port), + fmt.Sprintf("STARTUP=%s", s.Config().Invocation), + fmt.Sprintf("SERVER_MEMORY=%d", s.Build().MemoryLimit), + fmt.Sprintf("SERVER_IP=%s", s.Config().Allocations.DefaultMapping.Ip), + fmt.Sprintf("SERVER_PORT=%d", s.Config().Allocations.DefaultMapping.Port), } eloop: - for k := range s.EnvVars { + for k := range s.Config().EnvVars { for _, e := range out { if strings.HasPrefix(e, strings.ToUpper(k)) { continue eloop } } - out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.EnvVars.Get(k))) + out = append(out, fmt.Sprintf("%s=%s", strings.ToUpper(k), s.Config().EnvVars.Get(k))) } return out } func (s *Server) Log() *log.Entry { - return log.WithField("server", s.Uuid) + return log.WithField("server", s.Id()) } // Syncs the state of the server on the Panel with Wings. This ensures that we're always @@ -366,7 +127,10 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err return errors.WithStack(err) } - s.processConfiguration = cfg.ProcessConfiguration + s.Lock() + s.procConfig = cfg.ProcessConfiguration + s.Unlock() + return nil } @@ -391,7 +155,7 @@ func (s *Server) CreateEnvironment() error { // Gets the process configuration data for the server. func (s *Server) GetProcessConfiguration() (*api.ServerConfigurationResponse, *api.RequestError, error) { - return api.NewRequester().GetServerConfiguration(s.Uuid) + return api.NewRequester().GetServerConfiguration(s.Id()) } // Helper function that can receieve a power action and then process the @@ -401,11 +165,7 @@ func (s *Server) HandlePowerAction(action PowerAction) error { case "start": return s.Environment.Start() case "restart": - if err := s.Environment.WaitForStop(60, false); err != nil { - return err - } - - return s.Environment.Start() + return s.Environment.Restart() case "stop": return s.Environment.Stop() case "kill": @@ -414,3 +174,8 @@ func (s *Server) HandlePowerAction(action PowerAction) error { return errors.New("an invalid power action was provided") } } + +// Checks if the server is marked as being suspended or not on the system. +func (s *Server) IsSuspended() bool { + return s.Config().Suspended +} diff --git a/server/state.go b/server/state.go index 033e267..c47c4e6 100644 --- a/server/state.go +++ b/server/state.go @@ -13,6 +13,13 @@ import ( var stateMutex sync.Mutex +const ( + ProcessOfflineState = "offline" + ProcessStartingState = "starting" + ProcessRunningState = "running" + ProcessStoppingState = "stopping" +) + // Returns the state of the servers. func getServerStates() (map[string]string, error) { // Request a lock after we check if the file exists. @@ -40,7 +47,7 @@ func saveServerStates() error { // Get the states of all servers on the daemon. states := map[string]string{} for _, s := range GetServers().All() { - states[s.Uuid] = s.GetState() + states[s.Id()] = s.GetState() } // Convert the map to a json object. @@ -60,13 +67,6 @@ func saveServerStates() error { return nil } -const ( - ProcessOfflineState = "offline" - ProcessStartingState = "starting" - ProcessRunningState = "running" - ProcessStoppingState = "stopping" -) - // Sets the state of the server internally. This function handles crash detection as // well as reporting to event listeners for the server. func (s *Server) SetState(state string) error { @@ -76,16 +76,14 @@ func (s *Server) SetState(state string) error { prevState := s.GetState() - // Obtain a mutex lock and update the current state of the server. - s.Lock() - s.State = state + // Update the currently tracked state for the server. + s.Proc().setInternalState(state) // Emit the event to any listeners that are currently registered. - s.Log().WithField("status", s.State).Debug("saw server status change event") - s.Events().Publish(StatusEvent, s.State) - - // Release the lock as it is no longer needed for the following actions. - s.Unlock() + if prevState != state { + s.Log().WithField("status", s.Proc().State).Debug("saw server status change event") + s.Events().Publish(StatusEvent, s.Proc().State) + } // Persist this change to the disk immediately so that should the Daemon be stopped or // crash we can immediately restore the server state. @@ -128,15 +126,14 @@ func (s *Server) SetState(state string) error { // Returns the current state of the server in a race-safe manner. func (s *Server) GetState() string { - s.RLock() - defer s.RUnlock() - - return s.State + return s.Proc().getInternalState() } // Determines if the server state is running or not. This is different than the // environment state, it is simply the tracked state from this daemon instance, and // not the response from Docker. func (s *Server) IsRunning() bool { - return s.GetState() == ProcessRunningState || s.GetState() == ProcessStartingState + st := s.GetState() + + return st == ProcessRunningState || st == ProcessStartingState } diff --git a/server/update.go b/server/update.go index 7e4a643..7ab63ba 100644 --- a/server/update.go +++ b/server/update.go @@ -15,7 +15,7 @@ import ( // it is up to the specific environment to determine what needs to happen when // that is the case. func (s *Server) UpdateDataStructure(data []byte, background bool) error { - src := new(Server) + src := new(Configuration) if err := json.Unmarshal(data, src); err != nil { return errors.WithStack(err) } @@ -23,13 +23,29 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { // Don't allow obviously corrupted data to pass through into this function. If the UUID // doesn't match something has gone wrong and the API is attempting to meld this server // instance into a totally different one, which would be bad. - if src.Uuid != "" && s.Uuid != "" && src.Uuid != s.Uuid { + if src.Uuid != "" && s.Id() != "" && src.Uuid != s.Id() { return errors.New("attempting to merge a data stack with an invalid UUID") } + // Grab a copy of the configuration to work on. + c := *s.Config() + + // Lock our copy of the configuration since the defered unlock will end up acting upon this + // new memory address rather than the old one. If we don't lock this, the defered unlock will + // cause a panic when it goes to run. However, since we only update s.cfg at the end, if there + // is an error before that point we'll still properly unlock the original configuration for the + // server. + c.mu.Lock() + + // Lock the server configuration while we're doing this merge to avoid anything + // trying to overwrite it or make modifications while we're sorting out what we + // need to do. + s.cfg.mu.Lock() + defer s.cfg.mu.Unlock() + // Merge the new data object that we have received with the existing server data object // and then save it to the disk so it is persistent. - if err := mergo.Merge(s, src, mergo.WithOverride); err != nil { + if err := mergo.Merge(&c, src, mergo.WithOverride); err != nil { return errors.WithStack(err) } @@ -39,9 +55,9 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { // backfiring at some point, but until then... // // We'll go ahead and do this with swap as well. - s.Build.CpuLimit = src.Build.CpuLimit - s.Build.Swap = src.Build.Swap - s.Build.DiskSpace = src.Build.DiskSpace + c.Build.CpuLimit = src.Build.CpuLimit + c.Build.Swap = src.Build.Swap + c.Build.DiskSpace = src.Build.DiskSpace // Mergo can't quite handle this boolean value correctly, so for now we'll just // handle this edge case manually since none of the other data passed through in this @@ -51,7 +67,7 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { return errors.WithStack(err) } } else { - s.Container.OomDisabled = v + c.Container.OomDisabled = v } // Mergo also cannot handle this boolean value. @@ -60,25 +76,28 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { return errors.WithStack(err) } } else { - s.Suspended = v + c.Suspended = v } // Environment and Mappings should be treated as a full update at all times, never a // true patch, otherwise we can't know what we're passing along. if src.EnvVars != nil && len(src.EnvVars) > 0 { - s.EnvVars = src.EnvVars + c.EnvVars = src.EnvVars } if src.Allocations.Mappings != nil && len(src.Allocations.Mappings) > 0 { - s.Allocations.Mappings = src.Allocations.Mappings + c.Allocations.Mappings = src.Allocations.Mappings } if src.Mounts != nil && len(src.Mounts) > 0 { - s.Mounts = src.Mounts + c.Mounts = src.Mounts } + // Update the configuration once we have a lock on the configuration object. + s.cfg = c + if background { - s.runBackgroundActions() + go s.runBackgroundActions() } return nil @@ -91,24 +110,22 @@ func (s *Server) UpdateDataStructure(data []byte, background bool) error { // These tasks run in independent threads where relevant to speed up any updates // that need to happen. func (s *Server) runBackgroundActions() { - // Update the environment in place, allowing memory and CPU usage to be adjusted - // on the fly without the user needing to reboot (theoretically). - go func(server *Server) { - server.Log().Info("performing server limit modification on-the-fly") - if err := server.Environment.InSituUpdate(); err != nil { - server.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment") - } - }(s) - - // Check if the server is now suspended, and if so and the process is not terminated + // Check if the s is now suspended, and if so and the process is not terminated // yet, do it immediately. - go func(server *Server) { - if server.Suspended && server.GetState() != ProcessOfflineState { - server.Log().Info("server suspended with running process state, terminating now") + if s.IsSuspended() && s.GetState() != ProcessOfflineState { + s.Log().Info("server suspended with running process state, terminating now") - if err := server.Environment.WaitForStop(10, true); err != nil { - server.Log().WithField("error", err).Warn("failed to terminate server environment after suspension") - } + if err := s.Environment.WaitForStop(10, true); err != nil { + s.Log().WithField("error", err).Warn("failed to terminate server environment after suspension") } - }(s) + } + + if !s.IsSuspended() { + // Update the environment in place, allowing memory and CPU usage to be adjusted + // on the fly without the user needing to reboot (theoretically). + s.Log().Info("performing server limit modification on-the-fly") + if err := s.Environment.InSituUpdate(); err != nil { + s.Log().WithField("error", err).Warn("failed to perform on-the-fly update of the server environment") + } + } } diff --git a/sftp/server.go b/sftp/server.go index 8a9f243..44617a8 100644 --- a/sftp/server.go +++ b/sftp/server.go @@ -49,7 +49,7 @@ func Initialize(config *config.Configuration) error { func validatePath(fs sftp_server.FileSystem, p string) (string, error) { s := server.GetServers().Find(func(server *server.Server) bool { - return server.Uuid == fs.UUID + return server.Id() == fs.UUID }) if s == nil { @@ -61,7 +61,7 @@ func validatePath(fs sftp_server.FileSystem, p string) (string, error) { func validateDiskSpace(fs sftp_server.FileSystem) bool { s := server.GetServers().Find(func(server *server.Server) bool { - return server.Uuid == fs.UUID + return server.Id() == fs.UUID }) if s == nil { @@ -105,7 +105,7 @@ func validateCredentials(c sftp_server.AuthenticationRequest) (*sftp_server.Auth } s := server.GetServers().Find(func(server *server.Server) bool { - return server.Uuid == resp.Server + return server.Id() == resp.Server }) if s == nil {