From f3a6ee7a454f43d71ff3e01a0ddab8e9554b70cf Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Mon, 25 Jan 2021 20:28:24 -0800 Subject: [PATCH] re-refactor code --- api/server_endpoints.go | 2 +- cmd/root.go | 48 +++++--- config/config.go | 4 +- router/middleware.go | 136 ++------------------- router/middleware/middleware.go | 27 ++++- router/router.go | 5 +- router/router_download.go | 14 +-- router/router_server.go | 8 +- router/router_server_files.go | 6 +- router/router_server_ws.go | 5 +- router/router_system.go | 8 +- router/router_transfer.go | 10 +- server/collection.go | 77 ------------ server/loader.go | 128 ------------------- server/manager.go | 209 ++++++++++++++++++++++++++++---- server/server.go | 114 ++++++++++++++++- server/state.go | 137 --------------------- sftp/server.go | 8 +- 18 files changed, 390 insertions(+), 556 deletions(-) delete mode 100644 server/collection.go delete mode 100644 server/loader.go delete mode 100644 server/state.go diff --git a/api/server_endpoints.go b/api/server_endpoints.go index acab7ff..946b949 100644 --- a/api/server_endpoints.go +++ b/api/server_endpoints.go @@ -55,7 +55,7 @@ type RawServerData struct { // be loaded. If so, those requests are spun-up in additional routines and the final resulting // slice of all servers will be returned. func (r *Request) GetServers() ([]RawServerData, error) { - resp, err := r.Get("/servers", Q{"per_page": strconv.Itoa(int(config.Get().RemoteQuery.BootServersPerPage))}) + resp, err := r.Get("/servers", Q{"per_page": strconv.Itoa(config.Get().RemoteQuery.BootServersPerPage)}) if err != nil { return nil, err } diff --git a/cmd/root.go b/cmd/root.go index c2c5ec5..a0558d0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -137,17 +137,15 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { "gid": config.Get().System.User.Gid, }).Info("configured system user successfully") - panelClient := remote.CreateClient( + pclient := remote.CreateClient( config.Get().PanelLocation, config.Get().AuthenticationTokenId, config.Get().AuthenticationToken, remote.WithTimeout(time.Second*time.Duration(config.Get().RemoteQuery.Timeout)), ) - _ = panelClient - serverManager := server.NewManager(panelClient) - - if err := serverManager.Initialize(int(c.RemoteQuery.BootServersPerPage)); err != nil { + manager, err := server.NewManager(cmd.Context(), pclient) + if err != nil { log.WithField("error", err).Fatal("failed to load server configurations") } @@ -160,20 +158,38 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { } // Just for some nice log output. - for _, s := range serverManager.GetAll() { - log.WithField("server", s.Id()).Info("loaded configuration for server") + for _, s := range manager.All() { + log.WithField("server", s.Id()).Info("finished loading configuration for server") } - states, err := server.CachedServerStates() + states, err := manager.ReadStates() if err != nil { log.WithField("error", err).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state") } + ticker := time.NewTicker(time.Minute) + // Every minute, write the current server states to the disk to allow for a more + // seamless hard-reboot process in which wings will re-sync server states based + // on it's last tracked state. + go func() { + for { + select { + case <-ticker.C: + if err := manager.PersistStates(); err != nil { + log.WithField("error", err).Warn("failed to persist server states to disk") + } + case <-cmd.Context().Done(): + ticker.Stop() + return + } + } + }() + // Create a new workerpool that limits us to 4 servers being bootstrapped at a time // on Wings. This allows us to ensure the environment exists, write configurations, // and reboot processes without causing a slow-down due to sequential booting. pool := workerpool.New(4) - for _, serv := range serverManager.GetAll() { + for _, serv := range manager.All() { s := serv // For each server we encounter make sure the root data directory exists. @@ -184,7 +200,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { pool.Submit(func() { s.Log().Info("configuring server environment and restoring to previous state") - var st string if state, exists := states[s.Id()]; exists { st = state @@ -236,14 +251,14 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { defer func() { // Cancel the context on all of the running servers at this point, even though the // program is just shutting down. - for _, s := range server.GetServers().All() { + for _, s := range manager.All() { s.CtxCancel() } }() go func() { // Run the SFTP server. - if err := sftp.New().Run(); err != nil { + if err := sftp.New(manager).Run(); err != nil { log.WithError(err).Fatal("failed to initialize the sftp server") return } @@ -278,7 +293,7 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { // and external clients. s := &http.Server{ Addr: api.Host + ":" + strconv.Itoa(api.Port), - Handler: router.Configure(serverManager), + Handler: router.Configure(manager), TLSConfig: config.DefaultTLSConfig, } @@ -323,13 +338,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { } } - // Cancel the context on all of the running servers at this point, even though the - // program is just shutting down. - for _, s := range serverManager.GetAll() { - s.CtxCancel() - } -} - // Reads the configuration from the disk and then sets up the global singleton // with all of the configuration values. func initConfig() { diff --git a/config/config.go b/config/config.go index 0a7ec8a..d79dd83 100644 --- a/config/config.go +++ b/config/config.go @@ -99,7 +99,7 @@ type RemoteQueryConfiguration struct { // are taking longer than 30 seconds to complete it is likely a performance issue that // should be resolved on the Panel, and not something that should be resolved by upping this // number. - Timeout uint `default:"30" yaml:"timeout"` + Timeout int `default:"30" yaml:"timeout"` // The number of servers to load in a single request to the Panel API when booting the // Wings instance. A single request is initially made to the Panel to get this number @@ -110,7 +110,7 @@ type RemoteQueryConfiguration struct { // memory limits on your Panel instance. In the grand scheme of things 4 requests for // 50 servers is likely just as quick as two for 100 or one for 400, and will certainly // be less likely to cause performance issues on the Panel. - BootServersPerPage uint `default:"50" yaml:"boot_servers_per_page"` + BootServersPerPage int `default:"50" yaml:"boot_servers_per_page"` } // SystemConfiguration defines basic system configuration settings. diff --git a/router/middleware.go b/router/middleware.go index f2aff64..ee8f18e 100644 --- a/router/middleware.go +++ b/router/middleware.go @@ -1,138 +1,16 @@ package router import ( - "io" - "net/http" - "strings" - - "emperror.dev/errors" "github.com/gin-gonic/gin" - "github.com/google/uuid" - "github.com/pterodactyl/wings/config" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/server" ) -type Middleware struct { - serverManager server.Manager -} - -// A custom handler function allowing for errors bubbled up by c.Error() to be returned in a -// standardized format with tracking UUIDs on them for easier log searching. -func (m *Middleware) ErrorHandler() gin.HandlerFunc { - return func(c *gin.Context) { - c.Next() - err := c.Errors.Last() - if err == nil || err.Err == nil { - return - } - tracked := NewTrackedError(err.Err) - // If there is a server in the context for this request pull it out so that we can - // track the error specifically for that server. - if s, ok := c.Get("server"); ok { - tracked = NewServerError(err.Err, s.(*server.Server)) - } - // This error occurs if you submit invalid JSON data to an endpoint. - if err.Err.Error() == io.EOF.Error() { - c.JSON(c.Writer.Status(), gin.H{"error": "A JSON formatted body is required for this endpoint."}) - return - } - tracked.Abort(c) - return - } -} - -// Set the access request control headers on all of the requests. -func (m *Middleware) SetAccessControlHeaders() gin.HandlerFunc { - origins := config.Get().AllowedOrigins - location := config.Get().PanelLocation - return func(c *gin.Context) { - c.Header("Access-Control-Allow-Credentials", "true") - c.Header("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS") - c.Header("Access-Control-Allow-Headers", "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token") - - o := c.GetHeader("Origin") - if o != location { - for _, origin := range origins { - if origin != "*" && o != origin { - continue - } - c.Header("Access-Control-Allow-Origin", origin) - c.Next() - return - } - } - c.Header("Access-Control-Allow-Origin", location) - c.Next() - } -} - -// Authenticates the request token against the given permission string, ensuring that -// if it is a server permission, the token has control over that server. If it is a global -// token, this will ensure that the request is using a properly signed global token. -func (m *Middleware) RequireAuthorization() gin.HandlerFunc { - token := config.Get().AuthenticationToken - return func(c *gin.Context) { - auth := strings.SplitN(c.GetHeader("Authorization"), " ", 2) - if len(auth) != 2 || auth[0] != "Bearer" { - c.Header("WWW-Authenticate", "Bearer") - c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ - "error": "The required authorization heads were not present in the request.", - }) - return - } - - // All requests to Wings must be authorized with the authentication token present in - // the Wings configuration file. Remeber, all requests to Wings come from the Panel - // backend, or using a signed JWT for temporary authentication. - if auth[1] == token { - c.Next() - return - } - c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ - "error": "You are not authorized to access this endpoint.", - }) - } -} - -func (m *Middleware) WithServerManager() gin.HandlerFunc { - return func(c *gin.Context) { - c.Set("servermanager", m.serverManager) - } -} - -func ExtractServerManager(c *gin.Context) server.Manager { - if s, ok := c.Get("servermanager"); ok { - if srvs, ok := s.(server.Manager); ok { - return srvs - } - } - return nil -} - -// Ensure that the requested server exists in this setup. Returns a 404 if we cannot -// locate it. -func (m *Middleware) ServerExists() gin.HandlerFunc { - return func(c *gin.Context) { - u, err := uuid.Parse(c.Param("server")) - if err == nil { - if s := m.serverManager.Get(u.String()); s != nil { - c.Set("server", s) - c.Next() - return - } - } - c.AbortWithStatusJSON(http.StatusNotFound, gin.H{ - "error": "The resource you requested does not exist.", - }) - } -} - -// Returns the server instance from the gin context. If there is no server set in the -// context (e.g. calling from a controller not protected by ServerExists) this function -// will panic. +// ExtractServer returns the server instance from the gin context. If there is +// no server set in the context (e.g. calling from a controller not protected +// by ServerExists) this function will panic. +// +// This function is deprecated. Use middleware.ExtractServer. func ExtractServer(c *gin.Context) *server.Server { - if s, ok := c.Get("server"); ok { - return s.(*server.Server) - } - panic(errors.New("cannot extract server, missing on gin context")) + return middleware.ExtractServer(c) } diff --git a/router/middleware/middleware.go b/router/middleware/middleware.go index ff61300..745f28e 100644 --- a/router/middleware/middleware.go +++ b/router/middleware/middleware.go @@ -159,6 +159,15 @@ func AttachRequestID() gin.HandlerFunc { } } +// AttachServerManager attaches the server manager to the request context which +// allows routes to access the underlying server collection. +func AttachServerManager(m *server.Manager) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("manager", m) + c.Next() + } +} + // CaptureAndAbort aborts the request and attaches the provided error to the gin // context so it can be reported properly. If the error is missing a stacktrace // at the time it is called the stack will be attached. @@ -239,9 +248,13 @@ func SetAccessControlHeaders() gin.HandlerFunc { // the server ID in the fields list. func ServerExists() gin.HandlerFunc { return func(c *gin.Context) { - s := server.GetServers().Find(func(s *server.Server) bool { - return c.Param("server") == s.Id() - }) + var s *server.Server + if c.Param("server") != "" { + manager := ExtractManager(c) + s = manager.Find(func(s *server.Server) bool { + return c.Param("server") == s.Id() + }) + } if s == nil { c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "The requested resource does not exist on this instance."}) return @@ -313,3 +326,11 @@ func ExtractServer(c *gin.Context) *server.Server { } return v.(*server.Server) } + +// ExtractManager returns the server manager instance set on the request context. +func ExtractManager(c *gin.Context) *server.Manager { + if v, ok := c.Get("manager"); ok { + return v.(*server.Manager) + } + panic("middleware/middleware: cannot extract server manager: not present in context") +} diff --git a/router/router.go b/router/router.go index 00eea02..27c5985 100644 --- a/router/router.go +++ b/router/router.go @@ -4,21 +4,22 @@ import ( "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/pterodactyl/wings/router/middleware" + "github.com/pterodactyl/wings/server" ) // Configure configures the routing infrastructure for this daemon instance. -func Configure() *gin.Engine { +func Configure(m *server.Manager) *gin.Engine { gin.SetMode("release") router := gin.New() router.Use(gin.Recovery()) router.Use(middleware.AttachRequestID(), middleware.CaptureErrors(), middleware.SetAccessControlHeaders()) + router.Use(middleware.AttachServerManager(m)) // @todo log this into a different file so you can setup IP blocking for abusive requests and such. // This should still dump requests in debug mode since it does help with understanding the request // lifecycle and quickly seeing what was called leading to the logs. However, it isn't feasible to mix // this output in production and still get meaningful logs from it since they'll likely just be a huge // spamfest. - router.Use() router.Use(gin.LoggerWithFormatter(func(params gin.LogFormatterParams) string { log.WithFields(log.Fields{ "client_ip": params.ClientIP, diff --git a/router/router_download.go b/router/router_download.go index 325ce6e..d72cc48 100644 --- a/router/router_download.go +++ b/router/router_download.go @@ -8,13 +8,14 @@ import ( "strconv" "github.com/gin-gonic/gin" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/server/backup" ) // Handle a download request for a server backup. func getDownloadBackup(c *gin.Context) { - serverManager := ExtractServerManager(c) + manager := middleware.ExtractManager(c) token := tokens.BackupPayload{} if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil { @@ -22,8 +23,8 @@ func getDownloadBackup(c *gin.Context) { return } - s := serverManager.Get(token.ServerUuid) - if s == nil || !token.IsUniqueRequest() { + s, ok := manager.Get(token.ServerUuid) + if !ok || !token.IsUniqueRequest() { c.AbortWithStatusJSON(http.StatusNotFound, gin.H{ "error": "The requested resource was not found on this server.", }) @@ -59,16 +60,15 @@ func getDownloadBackup(c *gin.Context) { // Handles downloading a specific file for a server. func getDownloadFile(c *gin.Context) { - serverManager := ExtractServerManager(c) - + manager := middleware.ExtractManager(c) token := tokens.FilePayload{} if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil { NewTrackedError(err).Abort(c) return } - s := serverManager.Get(token.ServerUuid) - if s == nil || !token.IsUniqueRequest() { + s, ok := manager.Get(token.ServerUuid) + if !ok || !token.IsUniqueRequest() { c.AbortWithStatusJSON(http.StatusNotFound, gin.H{ "error": "The requested resource was not found on this server.", }) diff --git a/router/router_server.go b/router/router_server.go index 8b5ff82..265d642 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -11,6 +11,7 @@ import ( "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/pterodactyl/wings/router/downloader" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/server" ) @@ -190,8 +191,7 @@ func postServerReinstall(c *gin.Context) { // Deletes a server from the wings daemon and dissociate it's objects. func deleteServer(c *gin.Context) { - s := ExtractServer(c) - sm := ExtractServerManager(c) + s := middleware.ExtractServer(c) // Immediately suspend the server to prevent a user from attempting // to start it while this process is running. @@ -235,7 +235,9 @@ func deleteServer(c *gin.Context) { } }(s.Filesystem().Path()) - sm.Remove(s) + middleware.ExtractManager(c).Remove(func(server *server.Server) bool { + return server.Id() == s.Id() + }) // Deallocate the reference to this server. s = nil diff --git a/router/router_server_files.go b/router/router_server_files.go index 8e372b0..cddcb96 100644 --- a/router/router_server_files.go +++ b/router/router_server_files.go @@ -487,7 +487,7 @@ func postServerChmodFile(c *gin.Context) { } func postServerUploadFiles(c *gin.Context) { - serverManager := ExtractServerManager(c) + manager := middleware.ExtractManager(c) token := tokens.UploadPayload{} if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil { @@ -495,8 +495,8 @@ func postServerUploadFiles(c *gin.Context) { return } - s := serverManager.Get(token.ServerUuid) - if s == nil || !token.IsUniqueRequest() { + s, ok := manager.Get(token.ServerUuid) + if !ok || !token.IsUniqueRequest() { c.AbortWithStatusJSON(http.StatusNotFound, gin.H{ "error": "The requested resource was not found on this server.", }) diff --git a/router/router_server_ws.go b/router/router_server_ws.go index c65a366..eab309d 100644 --- a/router/router_server_ws.go +++ b/router/router_server_ws.go @@ -7,13 +7,14 @@ import ( "github.com/gin-gonic/gin" ws "github.com/gorilla/websocket" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/router/websocket" ) // Upgrades a connection to a websocket and passes events along between. func getServerWebsocket(c *gin.Context) { - serverManager := ExtractServerManager(c) - s := serverManager.Get(c.Param("server")) + manager := middleware.ExtractManager(c) + s, _ := manager.Get(c.Param("server")) handler, err := websocket.GetHandler(s, c.Writer, c.Request) if err != nil { NewServerError(err, s).Abort(c) diff --git a/router/router_system.go b/router/router_system.go index 279a3a8..166e8b6 100644 --- a/router/router_system.go +++ b/router/router_system.go @@ -9,6 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/installer" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/system" ) @@ -27,8 +28,7 @@ func getSystemInformation(c *gin.Context) { // Returns all of the servers that are registered and configured correctly on // this wings instance. func getAllServers(c *gin.Context) { - serverManager := ExtractServerManager(c) - c.JSON(http.StatusOK, serverManager.GetAll()) + c.JSON(http.StatusOK, middleware.ExtractManager(c).All()) } // Creates a new server on the wings daemon and begins the installation process @@ -52,8 +52,8 @@ func postCreateServer(c *gin.Context) { // Plop that server instance onto the request so that it can be referenced in // requests from here-on out. - serverManager := ExtractServerManager(c) - serverManager.Add(install.Server()) + manager := middleware.ExtractManager(c) + manager.Add(install.Server()) // Begin the installation process in the background to not block the request // cycle. If there are any errors they will be logged and communicated back diff --git a/router/router_transfer.go b/router/router_transfer.go index e413de6..1a4d987 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -25,6 +25,7 @@ import ( "github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/installer" + "github.com/pterodactyl/wings/router/middleware" "github.com/pterodactyl/wings/router/tokens" "github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/system" @@ -323,19 +324,20 @@ func postTransfer(c *gin.Context) { i.Server().Events().Publish(server.TransferLogsEvent, output) } - serverManager := ExtractServerManager(c) - + manager := middleware.ExtractManager(c) // Mark the server as transferring to prevent problems later on during the process and // then push the server into the global server collection for this instance. i.Server().SetTransferring(true) - serverManager.Add(i.Server()) + manager.Add(i.Server()) defer func(s *server.Server) { // In the event that this transfer call fails, remove the server from the global // server tracking so that we don't have a dangling instance. if err := data.sendTransferStatus(!hasError); hasError || err != nil { sendTransferLog("Server transfer failed, check Wings logs for additional information.") s.Events().Publish(server.TransferStatusEvent, "failure") - serverManager.Remove(s) + manager.Remove(func(match *server.Server) bool { + return match.Id() == s.Id() + }) // If the transfer status was successful but the request failed, act like the transfer failed. if !hasError && err != nil { diff --git a/server/collection.go b/server/collection.go deleted file mode 100644 index 5b7e46d..0000000 --- a/server/collection.go +++ /dev/null @@ -1,77 +0,0 @@ -package server - -import "sync" - -type Collection struct { - items []*Server - sync.RWMutex -} - -// Create a new collection from a slice of servers. -func NewCollection(servers []*Server) *Collection { - return &Collection{ - items: servers, - } -} - -// Return all of the items in the collection. -func (c *Collection) All() []*Server { - c.RLock() - defer c.RUnlock() - - return c.items -} - -// Adds an item to the collection store. -func (c *Collection) Add(s *Server) { - c.Lock() - c.items = append(c.items, s) - c.Unlock() -} - -// Returns only those items matching the filter criteria. -func (c *Collection) Filter(filter func(*Server) bool) []*Server { - c.RLock() - defer c.RUnlock() - - r := make([]*Server, 0) - for _, v := range c.items { - if filter(v) { - r = append(r, v) - } - } - - return r -} - -// Returns a single element from the collection matching the filter. If nothing is -// found a nil result is returned. -func (c *Collection) Find(filter func(*Server) bool) *Server { - c.RLock() - defer c.RUnlock() - - for _, v := range c.items { - if filter(v) { - return v - } - } - - return nil -} - -// Removes all items from the collection that match the filter function. -// -// TODO: cancel the context? -func (c *Collection) Remove(filter func(*Server) bool) { - c.Lock() - defer c.Unlock() - - r := make([]*Server, 0) - for _, v := range c.items { - if !filter(v) { - r = append(r, v) - } - } - - c.items = r -} diff --git a/server/loader.go b/server/loader.go deleted file mode 100644 index 1a74e39..0000000 --- a/server/loader.go +++ /dev/null @@ -1,128 +0,0 @@ -package server - -import ( - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "runtime" - "time" - - "emperror.dev/errors" - "github.com/apex/log" - "github.com/gammazero/workerpool" - "github.com/pterodactyl/wings/api" - "github.com/pterodactyl/wings/config" - "github.com/pterodactyl/wings/environment" - "github.com/pterodactyl/wings/environment/docker" - "github.com/pterodactyl/wings/remote" - "github.com/pterodactyl/wings/server/filesystem" -) - -// Iterates over a given directory and loads all of the servers listed before returning -// them to the calling function. -func (m *manager) Initialize(serversPerPage int) error { - if len(m.servers.items) != 0 { - return errors.New("cannot call LoadDirectory with a non-nil collection") - } - - log.Info("fetching list of servers from API") - assignedServers, err := m.panelClient.GetServers(context.TODO(), serversPerPage) - if err != nil { - if !remote.IsRequestError(err) { - return err - } - - return errors.New(err.Error()) - } - - start := time.Now() - log.WithField("total_configs", len(assignedServers)).Info("processing servers returned by the API") - - pool := workerpool.New(runtime.NumCPU()) - log.Debugf("using %d workerpools to instantiate server instances", runtime.NumCPU()) - for _, data := range assignedServers { - pool.Submit(func() { - // Parse the json.RawMessage into an expected struct value. We do this here so that a single broken - // server does not cause the entire boot process to hang, and allows us to show more useful error - // messaging in the output. - d := api.ServerConfigurationResponse{ - Settings: data.Settings, - } - - log.WithField("server", data.Uuid).Info("creating new server object from API response") - if err := json.Unmarshal(data.ProcessConfiguration, &d.ProcessConfiguration); err != nil { - log.WithField("server", data.Uuid).WithField("error", err).Error("failed to parse server configuration from API response, skipping...") - return - } - - s, err := FromConfiguration(d) - if err != nil { - log.WithField("server", data.Uuid).WithField("error", err).Error("failed to load server, skipping...") - return - } - - m.Add(s) - }) - } - - // Wait until we've processed all of the configuration files in the directory - // before continuing. - pool.StopWait() - - diff := time.Now().Sub(start) - log.WithField("duration", fmt.Sprintf("%s", diff)).Info("finished processing server configurations") - - return nil -} - -// Initializes a server using a data byte array. This will be marshaled into the -// given struct using a YAML marshaler. This will also configure the given environment -// for a server. -func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) { - s, err := New() - if err != nil { - return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct") - } - if err := s.UpdateDataStructure(data.Settings); err != nil { - return nil, err - } - - s.Archiver = Archiver{Server: s} - s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace(), s.Config().Egg.FileDenylist) - - // Right now we only support a Docker based environment, so I'm going to hard code - // this logic in. When we're ready to support other environment we'll need to make - // some modifications here obviously. - settings := environment.Settings{ - Mounts: s.Mounts(), - Allocations: s.cfg.Allocations, - Limits: s.cfg.Build, - } - - envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables()) - meta := docker.Metadata{ - Image: s.Config().Container.Image, - } - - if env, err := docker.New(s.Id(), &meta, envCfg); err != nil { - return nil, err - } else { - s.Environment = env - s.StartEventListeners() - s.Throttler().StartTimer(s.Context()) - } - - // Forces the configuration to be synced with the panel. - if err := s.SyncWithConfiguration(data); err != nil { - return nil, err - } - - // If the server's data directory exists, force disk usage calculation. - if _, err := os.Stat(s.Filesystem().Path()); err == nil { - s.Filesystem().HasSpaceAvailable(true) - } - - return s, nil -} diff --git a/server/manager.go b/server/manager.go index 2ab36cb..4f92762 100644 --- a/server/manager.go +++ b/server/manager.go @@ -1,46 +1,203 @@ package server import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "runtime" + "sync" + "time" + + "emperror.dev/errors" + "github.com/apex/log" + "github.com/gammazero/workerpool" + "github.com/pterodactyl/wings/api" + "github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/remote" ) -type Manager interface { - // Initialize fetches all servers assigned to this node from the API. - Initialize(serversPerPage int) error - GetAll() []*Server - Get(uuid string) *Server - Add(s *Server) - Remove(s *Server) +type Manager struct { + mu sync.RWMutex + servers []*Server } -type manager struct { - servers Collection - - panelClient remote.Client +// NewManager returns a new server manager instance. This will boot up all of +// the servers that are currently present on the filesystem and set them into +// the manager. +func NewManager(ctx context.Context, client remote.Client) (*Manager, error) { + c := NewEmptyManager() + if err := c.initializeFromRemoteSource(ctx, client); err != nil { + return nil, err + } + return c, nil } -// NewManager creates a new server manager. -func NewManager(panelClient remote.Client) Manager { - return &manager{panelClient: panelClient} +// NewEmptyManager returns a new empty manager collection without actually +// loading any of the servers from the disk. This allows the caller to set their +// own servers into the collection as needed. +func NewEmptyManager() *Manager { + return &Manager{} } -func (m *manager) GetAll() []*Server { - return m.servers.items +// initializeFromRemoteSource iterates over a given directory and loads all of +// the servers listed before returning them to the calling function. +func (m *Manager) initializeFromRemoteSource(ctx context.Context, client remote.Client) error { + log.Info("fetching list of servers from API") + servers, err := client.GetServers(ctx, config.Get().RemoteQuery.BootServersPerPage) + if err != nil { + if !remote.IsRequestError(err) { + return errors.WithStackIf(err) + } + return errors.New(err.Error()) + } + + start := time.Now() + log.WithField("total_configs", len(servers)).Info("processing servers returned by the API") + + pool := workerpool.New(runtime.NumCPU()) + log.Debugf("using %d workerpools to instantiate server instances", runtime.NumCPU()) + for _, data := range servers { + pool.Submit(func() { + // Parse the json.RawMessage into an expected struct value. We do this here so that a single broken + // server does not cause the entire boot process to hang, and allows us to show more useful error + // messaging in the output. + d := api.ServerConfigurationResponse{ + Settings: data.Settings, + } + log.WithField("server", data.Uuid).Info("creating new server object from API response") + if err := json.Unmarshal(data.ProcessConfiguration, &d.ProcessConfiguration); err != nil { + log.WithField("server", data.Uuid).WithField("error", err).Error("failed to parse server configuration from API response, skipping...") + return + } + s, err := FromConfiguration(d) + if err != nil { + log.WithField("server", data.Uuid).WithField("error", err).Error("failed to load server, skipping...") + return + } + m.Add(s) + }) + } + + // Wait until we've processed all of the configuration files in the directory + // before continuing. + pool.StopWait() + + diff := time.Now().Sub(start) + log.WithField("duration", fmt.Sprintf("%s", diff)).Info("finished processing server configurations") + + return nil } -func (m *manager) Get(uuid string) *Server { - return m.servers.Find(func(s *Server) bool { - return s.Id() == uuid +// Put replaces all of the current values in the collection with the value that +// is passed through. +func (m *Manager) Put(s []*Server) { + m.mu.Lock() + m.servers = s + m.mu.Unlock() +} + +// All returns all of the items in the collection. +func (m *Manager) All() []*Server { + m.mu.RLock() + defer m.mu.RUnlock() + return m.servers +} + +// Add adds an item to the collection store. +func (m *Manager) Add(s *Server) { + m.mu.Lock() + m.servers = append(m.servers, s) + m.mu.Unlock() +} + +// Get returns a single server instance and a boolean value indicating if it was +// found in the global collection or not. +func (m *Manager) Get(uuid string) (*Server, bool) { + match := m.Find(func(server *Server) bool { + return server.Id() == uuid }) + return match, match != nil } -func (m *manager) Add(s *Server) { - s.manager = m - m.servers.Add(s) +// Filter returns only those items matching the filter criteria. +func (m *Manager) Filter(filter func(match *Server) bool) []*Server { + m.mu.RLock() + defer m.mu.RUnlock() + r := make([]*Server, 0) + for _, v := range m.servers { + if filter(v) { + r = append(r, v) + } + } + return r } -func (m *manager) Remove(s *Server) { - m.servers.Remove(func(sf *Server) bool { - return sf.Id() == s.Id() - }) +// Find returns a single element from the collection matching the filter. If +// nothing is found a nil result is returned. +func (m *Manager) Find(filter func(match *Server) bool) *Server { + m.mu.RLock() + defer m.mu.RUnlock() + for _, v := range m.servers { + if filter(v) { + return v + } + } + return nil } + +// Remove removes all items from the collection that match the filter function. +func (m *Manager) Remove(filter func(match *Server) bool) { + m.mu.Lock() + defer m.mu.Unlock() + r := make([]*Server, 0) + for _, v := range m.servers { + if !filter(v) { + r = append(r, v) + } + } + m.servers = r +} + +// PersistStates writes the current environment states to the disk for each +// server. This is generally called at a specific interval defined in the root +// runner command to avoid hammering disk I/O when tons of server switch states +// at once. It is fine if this file falls slightly out of sync, it is just here +// to make recovering from an unexpected system reboot a little easier. +func (m *Manager) PersistStates() error { + states := map[string]string{} + for _, s := range m.All() { + states[s.Id()] = s.Environment.State() + } + data, err := json.Marshal(states) + if err != nil { + return errors.WithStack(err) + } + if err := ioutil.WriteFile(config.Get().System.GetStatesPath(), data, 0644); err != nil { + return errors.WithStack(err) + } + return nil +} + +// ReadStates returns the state of the servers. +func (m *Manager) ReadStates() (map[string]string, error) { + f, err := os.OpenFile(config.Get().System.GetStatesPath(), os.O_RDONLY|os.O_CREATE, 0644) + if err != nil { + return nil, errors.WithStack(err) + } + defer f.Close() + var states map[string]string + if err := json.NewDecoder(f).Decode(&states); err != nil && err != io.EOF { + return nil, errors.WithStack(err) + } + out := make(map[string]string, 0) + // Only return states for servers that we're currently tracking in the system. + for id, state := range states { + if _, ok := m.Get(id); ok { + out[id] = state + } + } + return out, nil +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index 96773e3..6f2a607 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strings" "sync" @@ -20,7 +21,8 @@ import ( "golang.org/x/sync/semaphore" ) -// High level definition for a server instance being controlled by Wings. +// Server is the high level definition for a server instance being controlled +// by Wings. type Server struct { // Internal mutex used to block actions that need to occur sequentially, such as // writing the configuration to the disk. @@ -28,9 +30,6 @@ type Server struct { ctx context.Context ctxCancel *context.CancelFunc - // manager holds a reference to the manager responsible for the server - manager *manager - emitterLock sync.Mutex powerLock *semaphore.Weighted throttleOnce sync.Once @@ -248,4 +247,109 @@ func (s *Server) EnsureDataDirectoryExists() error { } } return nil -} \ No newline at end of file +} + +// Sets the state of the server internally. This function handles crash detection as +// well as reporting to event listeners for the server. +func (s *Server) OnStateChange() { + prevState := s.resources.State.Load() + + st := s.Environment.State() + // Update the currently tracked state for the server. + s.resources.State.Store(st) + + // Emit the event to any listeners that are currently registered. + if prevState != s.Environment.State() { + s.Log().WithField("status", st).Debug("saw server status change event") + s.Events().Publish(StatusEvent, st) + } + + // Reset the resource usage to 0 when the process fully stops so that all of the UI + // views in the Panel correctly display 0. + if st == environment.ProcessOfflineState { + s.resources.Reset() + s.emitProcUsage() + } + + // If server was in an online state, and is now in an offline state we should handle + // that as a crash event. In that scenario, check the last crash time, and the crash + // counter. + // + // In the event that we have passed the thresholds, don't do anything, otherwise + // automatically attempt to start the process back up for the user. This is done in a + // separate thread as to not block any actions currently taking place in the flow + // that called this function. + if (prevState == environment.ProcessStartingState || prevState == environment.ProcessRunningState) && s.Environment.State() == environment.ProcessOfflineState { + s.Log().Info("detected server as entering a crashed state; running crash handler") + + go func(server *Server) { + if err := server.handleServerCrash(); err != nil { + if IsTooFrequentCrashError(err) { + server.Log().Info("did not restart server after crash; occurred too soon after the last") + } else { + s.PublishConsoleOutputFromDaemon("Server crash was detected but an error occurred while handling it.") + server.Log().WithField("error", err).Error("failed to handle server crash") + } + } + }(s) + } +} + +// Determines if the server state is running or not. This is different than the +// environment state, it is simply the tracked state from this daemon instance, and +// not the response from Docker. +func (s *Server) IsRunning() bool { + st := s.Environment.State() + + return st == environment.ProcessRunningState || st == environment.ProcessStartingState +} + +// FromConfiguration initializes a server using a data byte array. This will be +// marshaled into the given struct using a YAML marshaler. This will also +// configure the given environment for a server. +func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) { + s, err := New() + if err != nil { + return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct") + } + if err := s.UpdateDataStructure(data.Settings); err != nil { + return nil, err + } + + s.Archiver = Archiver{Server: s} + s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace(), s.Config().Egg.FileDenylist) + + // Right now we only support a Docker based environment, so I'm going to hard code + // this logic in. When we're ready to support other environment we'll need to make + // some modifications here obviously. + settings := environment.Settings{ + Mounts: s.Mounts(), + Allocations: s.cfg.Allocations, + Limits: s.cfg.Build, + } + + envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables()) + meta := docker.Metadata{ + Image: s.Config().Container.Image, + } + + if env, err := docker.New(s.Id(), &meta, envCfg); err != nil { + return nil, err + } else { + s.Environment = env + s.StartEventListeners() + s.Throttler().StartTimer(s.Context()) + } + + // Forces the configuration to be synced with the panel. + if err := s.SyncWithConfiguration(data); err != nil { + return nil, err + } + + // If the server's data directory exists, force disk usage calculation. + if _, err := os.Stat(s.Filesystem().Path()); err == nil { + s.Filesystem().HasSpaceAvailable(true) + } + + return s, nil +} diff --git a/server/state.go b/server/state.go deleted file mode 100644 index 3e4378a..0000000 --- a/server/state.go +++ /dev/null @@ -1,137 +0,0 @@ -package server - -import ( - "encoding/json" - "io" - "io/ioutil" - "os" - "sync" - - "github.com/pterodactyl/wings/config" - "github.com/pterodactyl/wings/environment" -) - -var stateMutex sync.Mutex - -// Returns the state of the servers. -func CachedServerStates() (map[string]string, error) { - // Request a lock after we check if the file exists. - stateMutex.Lock() - defer stateMutex.Unlock() - - // Open the states file. - f, err := os.OpenFile(config.Get().System.GetStatesPath(), os.O_RDONLY|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - defer f.Close() - - // Convert the json object to a map. - states := map[string]string{} - if err := json.NewDecoder(f).Decode(&states); err != nil && err != io.EOF { - return nil, err - } - - return states, nil -} - -// saveServerStates . -func (m *manager) saveServerStates() error { - // Get the states of all servers on the daemon. - states := map[string]string{} - for _, s := range m.GetAll() { - states[s.Id()] = s.Environment.State() - } - - // Convert the map to a json object. - data, err := json.Marshal(states) - if err != nil { - return err - } - - stateMutex.Lock() - defer stateMutex.Unlock() - - // Write the data to the file - if err := ioutil.WriteFile(config.Get().System.GetStatesPath(), data, 0644); err != nil { - return err - } - - return nil -} - -// Sets the state of the server internally. This function handles crash detection as -// well as reporting to event listeners for the server. -func (s *Server) OnStateChange() { - prevState := s.resources.State.Load() - - st := s.Environment.State() - // Update the currently tracked state for the server. - s.resources.State.Store(st) - - // Emit the event to any listeners that are currently registered. - if prevState != s.Environment.State() { - s.Log().WithField("status", st).Debug("saw server status change event") - s.Events().Publish(StatusEvent, st) - } - - // Persist this change to the disk immediately so that should the Daemon be stopped or - // crash we can immediately restore the server state. - // - // This really only makes a difference if all of the Docker containers are also stopped, - // but this was a highly requested feature and isn't hard to work with, so lets do it. - // - // We also get the benefit of server status changes always propagating corrected configurations - // to the disk should we forget to do it elsewhere. - go func() { - if err := s.manager.saveServerStates(); err != nil { - s.Log().WithField("error", err).Warn("failed to write server states to disk") - } - }() - - // Reset the resource usage to 0 when the process fully stops so that all of the UI - // views in the Panel correctly display 0. - if st == environment.ProcessOfflineState { - s.resources.Reset() - s.emitProcUsage() - } - - // If server was in an online state, and is now in an offline state we should handle - // that as a crash event. In that scenario, check the last crash time, and the crash - // counter. - // - // In the event that we have passed the thresholds, don't do anything, otherwise - // automatically attempt to start the process back up for the user. This is done in a - // separate thread as to not block any actions currently taking place in the flow - // that called this function. - if (prevState == environment.ProcessStartingState || prevState == environment.ProcessRunningState) && s.Environment.State() == environment.ProcessOfflineState { - s.Log().Info("detected server as entering a crashed state; running crash handler") - - go func(server *Server) { - if err := server.handleServerCrash(); err != nil { - if IsTooFrequentCrashError(err) { - server.Log().Info("did not restart server after crash; occurred too soon after the last") - } else { - s.PublishConsoleOutputFromDaemon("Server crash was detected but an error occurred while handling it.") - server.Log().WithField("error", err).Error("failed to handle server crash") - } - } - }(s) - } -} - -// Returns the current state of the server in a race-safe manner. -// Deprecated -// use Environment.State() -func (s *Server) GetState() string { - return s.Environment.State() -} - -// Determines if the server state is running or not. This is different than the -// environment state, it is simply the tracked state from this daemon instance, and -// not the response from Docker. -func (s *Server) IsRunning() bool { - st := s.Environment.State() - - return st == environment.ProcessRunningState || st == environment.ProcessStartingState -} diff --git a/sftp/server.go b/sftp/server.go index b50650d..d2260f1 100644 --- a/sftp/server.go +++ b/sftp/server.go @@ -24,14 +24,16 @@ import ( //goland:noinspection GoNameStartsWithPackageName type SFTPServer struct { + manager *server.Manager BasePath string ReadOnly bool Listen string } -func New() *SFTPServer { +func New(m *server.Manager) *SFTPServer { cfg := config.Get().System return &SFTPServer{ + manager: m, BasePath: cfg.Data, ReadOnly: cfg.Sftp.ReadOnly, Listen: cfg.Sftp.Address + ":" + strconv.Itoa(cfg.Sftp.Port), @@ -81,7 +83,7 @@ func (c *SFTPServer) Run() error { // Handles an inbound connection to the instance and determines if we should serve the // request or not. -func (c SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) { +func (c *SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) { // Before beginning a handshake must be performed on the incoming net.Conn sconn, chans, reqs, err := ssh.NewServerConn(conn, config) if err != nil { @@ -119,7 +121,7 @@ func (c SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) { // This will also attempt to match a specific server out of the global server // store and return nil if there is no match. uuid := sconn.Permissions.Extensions["uuid"] - srv := server.GetServers().Find(func(s *server.Server) bool { + srv := c.manager.Find(func(s *server.Server) bool { if uuid == "" { return false }