re-refactor code

This commit is contained in:
Dane Everitt 2021-01-25 20:28:24 -08:00
parent ab86fb703a
commit f3a6ee7a45
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
18 changed files with 390 additions and 556 deletions

View File

@ -55,7 +55,7 @@ type RawServerData struct {
// be loaded. If so, those requests are spun-up in additional routines and the final resulting
// slice of all servers will be returned.
func (r *Request) GetServers() ([]RawServerData, error) {
resp, err := r.Get("/servers", Q{"per_page": strconv.Itoa(int(config.Get().RemoteQuery.BootServersPerPage))})
resp, err := r.Get("/servers", Q{"per_page": strconv.Itoa(config.Get().RemoteQuery.BootServersPerPage)})
if err != nil {
return nil, err
}

View File

@ -137,17 +137,15 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
"gid": config.Get().System.User.Gid,
}).Info("configured system user successfully")
panelClient := remote.CreateClient(
pclient := remote.CreateClient(
config.Get().PanelLocation,
config.Get().AuthenticationTokenId,
config.Get().AuthenticationToken,
remote.WithTimeout(time.Second*time.Duration(config.Get().RemoteQuery.Timeout)),
)
_ = panelClient
serverManager := server.NewManager(panelClient)
if err := serverManager.Initialize(int(c.RemoteQuery.BootServersPerPage)); err != nil {
manager, err := server.NewManager(cmd.Context(), pclient)
if err != nil {
log.WithField("error", err).Fatal("failed to load server configurations")
}
@ -160,20 +158,38 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
}
// Just for some nice log output.
for _, s := range serverManager.GetAll() {
log.WithField("server", s.Id()).Info("loaded configuration for server")
for _, s := range manager.All() {
log.WithField("server", s.Id()).Info("finished loading configuration for server")
}
states, err := server.CachedServerStates()
states, err := manager.ReadStates()
if err != nil {
log.WithField("error", err).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
}
ticker := time.NewTicker(time.Minute)
// Every minute, write the current server states to the disk to allow for a more
// seamless hard-reboot process in which wings will re-sync server states based
// on it's last tracked state.
go func() {
for {
select {
case <-ticker.C:
if err := manager.PersistStates(); err != nil {
log.WithField("error", err).Warn("failed to persist server states to disk")
}
case <-cmd.Context().Done():
ticker.Stop()
return
}
}
}()
// Create a new workerpool that limits us to 4 servers being bootstrapped at a time
// on Wings. This allows us to ensure the environment exists, write configurations,
// and reboot processes without causing a slow-down due to sequential booting.
pool := workerpool.New(4)
for _, serv := range serverManager.GetAll() {
for _, serv := range manager.All() {
s := serv
// For each server we encounter make sure the root data directory exists.
@ -184,7 +200,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
pool.Submit(func() {
s.Log().Info("configuring server environment and restoring to previous state")
var st string
if state, exists := states[s.Id()]; exists {
st = state
@ -236,14 +251,14 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
defer func() {
// Cancel the context on all of the running servers at this point, even though the
// program is just shutting down.
for _, s := range server.GetServers().All() {
for _, s := range manager.All() {
s.CtxCancel()
}
}()
go func() {
// Run the SFTP server.
if err := sftp.New().Run(); err != nil {
if err := sftp.New(manager).Run(); err != nil {
log.WithError(err).Fatal("failed to initialize the sftp server")
return
}
@ -278,7 +293,7 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
// and external clients.
s := &http.Server{
Addr: api.Host + ":" + strconv.Itoa(api.Port),
Handler: router.Configure(serverManager),
Handler: router.Configure(manager),
TLSConfig: config.DefaultTLSConfig,
}
@ -323,13 +338,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) {
}
}
// Cancel the context on all of the running servers at this point, even though the
// program is just shutting down.
for _, s := range serverManager.GetAll() {
s.CtxCancel()
}
}
// Reads the configuration from the disk and then sets up the global singleton
// with all of the configuration values.
func initConfig() {

View File

@ -99,7 +99,7 @@ type RemoteQueryConfiguration struct {
// are taking longer than 30 seconds to complete it is likely a performance issue that
// should be resolved on the Panel, and not something that should be resolved by upping this
// number.
Timeout uint `default:"30" yaml:"timeout"`
Timeout int `default:"30" yaml:"timeout"`
// The number of servers to load in a single request to the Panel API when booting the
// Wings instance. A single request is initially made to the Panel to get this number
@ -110,7 +110,7 @@ type RemoteQueryConfiguration struct {
// memory limits on your Panel instance. In the grand scheme of things 4 requests for
// 50 servers is likely just as quick as two for 100 or one for 400, and will certainly
// be less likely to cause performance issues on the Panel.
BootServersPerPage uint `default:"50" yaml:"boot_servers_per_page"`
BootServersPerPage int `default:"50" yaml:"boot_servers_per_page"`
}
// SystemConfiguration defines basic system configuration settings.

View File

@ -1,138 +1,16 @@
package router
import (
"io"
"net/http"
"strings"
"emperror.dev/errors"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/server"
)
type Middleware struct {
serverManager server.Manager
}
// A custom handler function allowing for errors bubbled up by c.Error() to be returned in a
// standardized format with tracking UUIDs on them for easier log searching.
func (m *Middleware) ErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
err := c.Errors.Last()
if err == nil || err.Err == nil {
return
}
tracked := NewTrackedError(err.Err)
// If there is a server in the context for this request pull it out so that we can
// track the error specifically for that server.
if s, ok := c.Get("server"); ok {
tracked = NewServerError(err.Err, s.(*server.Server))
}
// This error occurs if you submit invalid JSON data to an endpoint.
if err.Err.Error() == io.EOF.Error() {
c.JSON(c.Writer.Status(), gin.H{"error": "A JSON formatted body is required for this endpoint."})
return
}
tracked.Abort(c)
return
}
}
// Set the access request control headers on all of the requests.
func (m *Middleware) SetAccessControlHeaders() gin.HandlerFunc {
origins := config.Get().AllowedOrigins
location := config.Get().PanelLocation
return func(c *gin.Context) {
c.Header("Access-Control-Allow-Credentials", "true")
c.Header("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
c.Header("Access-Control-Allow-Headers", "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token")
o := c.GetHeader("Origin")
if o != location {
for _, origin := range origins {
if origin != "*" && o != origin {
continue
}
c.Header("Access-Control-Allow-Origin", origin)
c.Next()
return
}
}
c.Header("Access-Control-Allow-Origin", location)
c.Next()
}
}
// Authenticates the request token against the given permission string, ensuring that
// if it is a server permission, the token has control over that server. If it is a global
// token, this will ensure that the request is using a properly signed global token.
func (m *Middleware) RequireAuthorization() gin.HandlerFunc {
token := config.Get().AuthenticationToken
return func(c *gin.Context) {
auth := strings.SplitN(c.GetHeader("Authorization"), " ", 2)
if len(auth) != 2 || auth[0] != "Bearer" {
c.Header("WWW-Authenticate", "Bearer")
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "The required authorization heads were not present in the request.",
})
return
}
// All requests to Wings must be authorized with the authentication token present in
// the Wings configuration file. Remeber, all requests to Wings come from the Panel
// backend, or using a signed JWT for temporary authentication.
if auth[1] == token {
c.Next()
return
}
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "You are not authorized to access this endpoint.",
})
}
}
func (m *Middleware) WithServerManager() gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("servermanager", m.serverManager)
}
}
func ExtractServerManager(c *gin.Context) server.Manager {
if s, ok := c.Get("servermanager"); ok {
if srvs, ok := s.(server.Manager); ok {
return srvs
}
}
return nil
}
// Ensure that the requested server exists in this setup. Returns a 404 if we cannot
// locate it.
func (m *Middleware) ServerExists() gin.HandlerFunc {
return func(c *gin.Context) {
u, err := uuid.Parse(c.Param("server"))
if err == nil {
if s := m.serverManager.Get(u.String()); s != nil {
c.Set("server", s)
c.Next()
return
}
}
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The resource you requested does not exist.",
})
}
}
// Returns the server instance from the gin context. If there is no server set in the
// context (e.g. calling from a controller not protected by ServerExists) this function
// will panic.
// ExtractServer returns the server instance from the gin context. If there is
// no server set in the context (e.g. calling from a controller not protected
// by ServerExists) this function will panic.
//
// This function is deprecated. Use middleware.ExtractServer.
func ExtractServer(c *gin.Context) *server.Server {
if s, ok := c.Get("server"); ok {
return s.(*server.Server)
}
panic(errors.New("cannot extract server, missing on gin context"))
return middleware.ExtractServer(c)
}

View File

@ -159,6 +159,15 @@ func AttachRequestID() gin.HandlerFunc {
}
}
// AttachServerManager attaches the server manager to the request context which
// allows routes to access the underlying server collection.
func AttachServerManager(m *server.Manager) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("manager", m)
c.Next()
}
}
// CaptureAndAbort aborts the request and attaches the provided error to the gin
// context so it can be reported properly. If the error is missing a stacktrace
// at the time it is called the stack will be attached.
@ -239,9 +248,13 @@ func SetAccessControlHeaders() gin.HandlerFunc {
// the server ID in the fields list.
func ServerExists() gin.HandlerFunc {
return func(c *gin.Context) {
s := server.GetServers().Find(func(s *server.Server) bool {
return c.Param("server") == s.Id()
})
var s *server.Server
if c.Param("server") != "" {
manager := ExtractManager(c)
s = manager.Find(func(s *server.Server) bool {
return c.Param("server") == s.Id()
})
}
if s == nil {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"error": "The requested resource does not exist on this instance."})
return
@ -313,3 +326,11 @@ func ExtractServer(c *gin.Context) *server.Server {
}
return v.(*server.Server)
}
// ExtractManager returns the server manager instance set on the request context.
func ExtractManager(c *gin.Context) *server.Manager {
if v, ok := c.Get("manager"); ok {
return v.(*server.Manager)
}
panic("middleware/middleware: cannot extract server manager: not present in context")
}

View File

@ -4,21 +4,22 @@ import (
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/server"
)
// Configure configures the routing infrastructure for this daemon instance.
func Configure() *gin.Engine {
func Configure(m *server.Manager) *gin.Engine {
gin.SetMode("release")
router := gin.New()
router.Use(gin.Recovery())
router.Use(middleware.AttachRequestID(), middleware.CaptureErrors(), middleware.SetAccessControlHeaders())
router.Use(middleware.AttachServerManager(m))
// @todo log this into a different file so you can setup IP blocking for abusive requests and such.
// This should still dump requests in debug mode since it does help with understanding the request
// lifecycle and quickly seeing what was called leading to the logs. However, it isn't feasible to mix
// this output in production and still get meaningful logs from it since they'll likely just be a huge
// spamfest.
router.Use()
router.Use(gin.LoggerWithFormatter(func(params gin.LogFormatterParams) string {
log.WithFields(log.Fields{
"client_ip": params.ClientIP,

View File

@ -8,13 +8,14 @@ import (
"strconv"
"github.com/gin-gonic/gin"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server/backup"
)
// Handle a download request for a server backup.
func getDownloadBackup(c *gin.Context) {
serverManager := ExtractServerManager(c)
manager := middleware.ExtractManager(c)
token := tokens.BackupPayload{}
if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil {
@ -22,8 +23,8 @@ func getDownloadBackup(c *gin.Context) {
return
}
s := serverManager.Get(token.ServerUuid)
if s == nil || !token.IsUniqueRequest() {
s, ok := manager.Get(token.ServerUuid)
if !ok || !token.IsUniqueRequest() {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The requested resource was not found on this server.",
})
@ -59,16 +60,15 @@ func getDownloadBackup(c *gin.Context) {
// Handles downloading a specific file for a server.
func getDownloadFile(c *gin.Context) {
serverManager := ExtractServerManager(c)
manager := middleware.ExtractManager(c)
token := tokens.FilePayload{}
if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil {
NewTrackedError(err).Abort(c)
return
}
s := serverManager.Get(token.ServerUuid)
if s == nil || !token.IsUniqueRequest() {
s, ok := manager.Get(token.ServerUuid)
if !ok || !token.IsUniqueRequest() {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The requested resource was not found on this server.",
})

View File

@ -11,6 +11,7 @@ import (
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/pterodactyl/wings/router/downloader"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server"
)
@ -190,8 +191,7 @@ func postServerReinstall(c *gin.Context) {
// Deletes a server from the wings daemon and dissociate it's objects.
func deleteServer(c *gin.Context) {
s := ExtractServer(c)
sm := ExtractServerManager(c)
s := middleware.ExtractServer(c)
// Immediately suspend the server to prevent a user from attempting
// to start it while this process is running.
@ -235,7 +235,9 @@ func deleteServer(c *gin.Context) {
}
}(s.Filesystem().Path())
sm.Remove(s)
middleware.ExtractManager(c).Remove(func(server *server.Server) bool {
return server.Id() == s.Id()
})
// Deallocate the reference to this server.
s = nil

View File

@ -487,7 +487,7 @@ func postServerChmodFile(c *gin.Context) {
}
func postServerUploadFiles(c *gin.Context) {
serverManager := ExtractServerManager(c)
manager := middleware.ExtractManager(c)
token := tokens.UploadPayload{}
if err := tokens.ParseToken([]byte(c.Query("token")), &token); err != nil {
@ -495,8 +495,8 @@ func postServerUploadFiles(c *gin.Context) {
return
}
s := serverManager.Get(token.ServerUuid)
if s == nil || !token.IsUniqueRequest() {
s, ok := manager.Get(token.ServerUuid)
if !ok || !token.IsUniqueRequest() {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The requested resource was not found on this server.",
})

View File

@ -7,13 +7,14 @@ import (
"github.com/gin-gonic/gin"
ws "github.com/gorilla/websocket"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/router/websocket"
)
// Upgrades a connection to a websocket and passes events along between.
func getServerWebsocket(c *gin.Context) {
serverManager := ExtractServerManager(c)
s := serverManager.Get(c.Param("server"))
manager := middleware.ExtractManager(c)
s, _ := manager.Get(c.Param("server"))
handler, err := websocket.GetHandler(s, c.Writer, c.Request)
if err != nil {
NewServerError(err, s).Abort(c)

View File

@ -9,6 +9,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/installer"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/system"
)
@ -27,8 +28,7 @@ func getSystemInformation(c *gin.Context) {
// Returns all of the servers that are registered and configured correctly on
// this wings instance.
func getAllServers(c *gin.Context) {
serverManager := ExtractServerManager(c)
c.JSON(http.StatusOK, serverManager.GetAll())
c.JSON(http.StatusOK, middleware.ExtractManager(c).All())
}
// Creates a new server on the wings daemon and begins the installation process
@ -52,8 +52,8 @@ func postCreateServer(c *gin.Context) {
// Plop that server instance onto the request so that it can be referenced in
// requests from here-on out.
serverManager := ExtractServerManager(c)
serverManager.Add(install.Server())
manager := middleware.ExtractManager(c)
manager.Add(install.Server())
// Begin the installation process in the background to not block the request
// cycle. If there are any errors they will be logged and communicated back

View File

@ -25,6 +25,7 @@ import (
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/installer"
"github.com/pterodactyl/wings/router/middleware"
"github.com/pterodactyl/wings/router/tokens"
"github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/system"
@ -323,19 +324,20 @@ func postTransfer(c *gin.Context) {
i.Server().Events().Publish(server.TransferLogsEvent, output)
}
serverManager := ExtractServerManager(c)
manager := middleware.ExtractManager(c)
// Mark the server as transferring to prevent problems later on during the process and
// then push the server into the global server collection for this instance.
i.Server().SetTransferring(true)
serverManager.Add(i.Server())
manager.Add(i.Server())
defer func(s *server.Server) {
// In the event that this transfer call fails, remove the server from the global
// server tracking so that we don't have a dangling instance.
if err := data.sendTransferStatus(!hasError); hasError || err != nil {
sendTransferLog("Server transfer failed, check Wings logs for additional information.")
s.Events().Publish(server.TransferStatusEvent, "failure")
serverManager.Remove(s)
manager.Remove(func(match *server.Server) bool {
return match.Id() == s.Id()
})
// If the transfer status was successful but the request failed, act like the transfer failed.
if !hasError && err != nil {

View File

@ -1,77 +0,0 @@
package server
import "sync"
type Collection struct {
items []*Server
sync.RWMutex
}
// Create a new collection from a slice of servers.
func NewCollection(servers []*Server) *Collection {
return &Collection{
items: servers,
}
}
// Return all of the items in the collection.
func (c *Collection) All() []*Server {
c.RLock()
defer c.RUnlock()
return c.items
}
// Adds an item to the collection store.
func (c *Collection) Add(s *Server) {
c.Lock()
c.items = append(c.items, s)
c.Unlock()
}
// Returns only those items matching the filter criteria.
func (c *Collection) Filter(filter func(*Server) bool) []*Server {
c.RLock()
defer c.RUnlock()
r := make([]*Server, 0)
for _, v := range c.items {
if filter(v) {
r = append(r, v)
}
}
return r
}
// Returns a single element from the collection matching the filter. If nothing is
// found a nil result is returned.
func (c *Collection) Find(filter func(*Server) bool) *Server {
c.RLock()
defer c.RUnlock()
for _, v := range c.items {
if filter(v) {
return v
}
}
return nil
}
// Removes all items from the collection that match the filter function.
//
// TODO: cancel the context?
func (c *Collection) Remove(filter func(*Server) bool) {
c.Lock()
defer c.Unlock()
r := make([]*Server, 0)
for _, v := range c.items {
if !filter(v) {
r = append(r, v)
}
}
c.items = r
}

View File

@ -1,128 +0,0 @@
package server
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"time"
"emperror.dev/errors"
"github.com/apex/log"
"github.com/gammazero/workerpool"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/environment/docker"
"github.com/pterodactyl/wings/remote"
"github.com/pterodactyl/wings/server/filesystem"
)
// Iterates over a given directory and loads all of the servers listed before returning
// them to the calling function.
func (m *manager) Initialize(serversPerPage int) error {
if len(m.servers.items) != 0 {
return errors.New("cannot call LoadDirectory with a non-nil collection")
}
log.Info("fetching list of servers from API")
assignedServers, err := m.panelClient.GetServers(context.TODO(), serversPerPage)
if err != nil {
if !remote.IsRequestError(err) {
return err
}
return errors.New(err.Error())
}
start := time.Now()
log.WithField("total_configs", len(assignedServers)).Info("processing servers returned by the API")
pool := workerpool.New(runtime.NumCPU())
log.Debugf("using %d workerpools to instantiate server instances", runtime.NumCPU())
for _, data := range assignedServers {
pool.Submit(func() {
// Parse the json.RawMessage into an expected struct value. We do this here so that a single broken
// server does not cause the entire boot process to hang, and allows us to show more useful error
// messaging in the output.
d := api.ServerConfigurationResponse{
Settings: data.Settings,
}
log.WithField("server", data.Uuid).Info("creating new server object from API response")
if err := json.Unmarshal(data.ProcessConfiguration, &d.ProcessConfiguration); err != nil {
log.WithField("server", data.Uuid).WithField("error", err).Error("failed to parse server configuration from API response, skipping...")
return
}
s, err := FromConfiguration(d)
if err != nil {
log.WithField("server", data.Uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
m.Add(s)
})
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
pool.StopWait()
diff := time.Now().Sub(start)
log.WithField("duration", fmt.Sprintf("%s", diff)).Info("finished processing server configurations")
return nil
}
// Initializes a server using a data byte array. This will be marshaled into the
// given struct using a YAML marshaler. This will also configure the given environment
// for a server.
func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) {
s, err := New()
if err != nil {
return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct")
}
if err := s.UpdateDataStructure(data.Settings); err != nil {
return nil, err
}
s.Archiver = Archiver{Server: s}
s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace(), s.Config().Egg.FileDenylist)
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
settings := environment.Settings{
Mounts: s.Mounts(),
Allocations: s.cfg.Allocations,
Limits: s.cfg.Build,
}
envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables())
meta := docker.Metadata{
Image: s.Config().Container.Image,
}
if env, err := docker.New(s.Id(), &meta, envCfg); err != nil {
return nil, err
} else {
s.Environment = env
s.StartEventListeners()
s.Throttler().StartTimer(s.Context())
}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
// If the server's data directory exists, force disk usage calculation.
if _, err := os.Stat(s.Filesystem().Path()); err == nil {
s.Filesystem().HasSpaceAvailable(true)
}
return s, nil
}

View File

@ -1,46 +1,203 @@
package server
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
"time"
"emperror.dev/errors"
"github.com/apex/log"
"github.com/gammazero/workerpool"
"github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/remote"
)
type Manager interface {
// Initialize fetches all servers assigned to this node from the API.
Initialize(serversPerPage int) error
GetAll() []*Server
Get(uuid string) *Server
Add(s *Server)
Remove(s *Server)
type Manager struct {
mu sync.RWMutex
servers []*Server
}
type manager struct {
servers Collection
panelClient remote.Client
// NewManager returns a new server manager instance. This will boot up all of
// the servers that are currently present on the filesystem and set them into
// the manager.
func NewManager(ctx context.Context, client remote.Client) (*Manager, error) {
c := NewEmptyManager()
if err := c.initializeFromRemoteSource(ctx, client); err != nil {
return nil, err
}
return c, nil
}
// NewManager creates a new server manager.
func NewManager(panelClient remote.Client) Manager {
return &manager{panelClient: panelClient}
// NewEmptyManager returns a new empty manager collection without actually
// loading any of the servers from the disk. This allows the caller to set their
// own servers into the collection as needed.
func NewEmptyManager() *Manager {
return &Manager{}
}
func (m *manager) GetAll() []*Server {
return m.servers.items
// initializeFromRemoteSource iterates over a given directory and loads all of
// the servers listed before returning them to the calling function.
func (m *Manager) initializeFromRemoteSource(ctx context.Context, client remote.Client) error {
log.Info("fetching list of servers from API")
servers, err := client.GetServers(ctx, config.Get().RemoteQuery.BootServersPerPage)
if err != nil {
if !remote.IsRequestError(err) {
return errors.WithStackIf(err)
}
return errors.New(err.Error())
}
start := time.Now()
log.WithField("total_configs", len(servers)).Info("processing servers returned by the API")
pool := workerpool.New(runtime.NumCPU())
log.Debugf("using %d workerpools to instantiate server instances", runtime.NumCPU())
for _, data := range servers {
pool.Submit(func() {
// Parse the json.RawMessage into an expected struct value. We do this here so that a single broken
// server does not cause the entire boot process to hang, and allows us to show more useful error
// messaging in the output.
d := api.ServerConfigurationResponse{
Settings: data.Settings,
}
log.WithField("server", data.Uuid).Info("creating new server object from API response")
if err := json.Unmarshal(data.ProcessConfiguration, &d.ProcessConfiguration); err != nil {
log.WithField("server", data.Uuid).WithField("error", err).Error("failed to parse server configuration from API response, skipping...")
return
}
s, err := FromConfiguration(d)
if err != nil {
log.WithField("server", data.Uuid).WithField("error", err).Error("failed to load server, skipping...")
return
}
m.Add(s)
})
}
// Wait until we've processed all of the configuration files in the directory
// before continuing.
pool.StopWait()
diff := time.Now().Sub(start)
log.WithField("duration", fmt.Sprintf("%s", diff)).Info("finished processing server configurations")
return nil
}
func (m *manager) Get(uuid string) *Server {
return m.servers.Find(func(s *Server) bool {
return s.Id() == uuid
// Put replaces all of the current values in the collection with the value that
// is passed through.
func (m *Manager) Put(s []*Server) {
m.mu.Lock()
m.servers = s
m.mu.Unlock()
}
// All returns all of the items in the collection.
func (m *Manager) All() []*Server {
m.mu.RLock()
defer m.mu.RUnlock()
return m.servers
}
// Add adds an item to the collection store.
func (m *Manager) Add(s *Server) {
m.mu.Lock()
m.servers = append(m.servers, s)
m.mu.Unlock()
}
// Get returns a single server instance and a boolean value indicating if it was
// found in the global collection or not.
func (m *Manager) Get(uuid string) (*Server, bool) {
match := m.Find(func(server *Server) bool {
return server.Id() == uuid
})
return match, match != nil
}
func (m *manager) Add(s *Server) {
s.manager = m
m.servers.Add(s)
// Filter returns only those items matching the filter criteria.
func (m *Manager) Filter(filter func(match *Server) bool) []*Server {
m.mu.RLock()
defer m.mu.RUnlock()
r := make([]*Server, 0)
for _, v := range m.servers {
if filter(v) {
r = append(r, v)
}
}
return r
}
func (m *manager) Remove(s *Server) {
m.servers.Remove(func(sf *Server) bool {
return sf.Id() == s.Id()
})
// Find returns a single element from the collection matching the filter. If
// nothing is found a nil result is returned.
func (m *Manager) Find(filter func(match *Server) bool) *Server {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.servers {
if filter(v) {
return v
}
}
return nil
}
// Remove removes all items from the collection that match the filter function.
func (m *Manager) Remove(filter func(match *Server) bool) {
m.mu.Lock()
defer m.mu.Unlock()
r := make([]*Server, 0)
for _, v := range m.servers {
if !filter(v) {
r = append(r, v)
}
}
m.servers = r
}
// PersistStates writes the current environment states to the disk for each
// server. This is generally called at a specific interval defined in the root
// runner command to avoid hammering disk I/O when tons of server switch states
// at once. It is fine if this file falls slightly out of sync, it is just here
// to make recovering from an unexpected system reboot a little easier.
func (m *Manager) PersistStates() error {
states := map[string]string{}
for _, s := range m.All() {
states[s.Id()] = s.Environment.State()
}
data, err := json.Marshal(states)
if err != nil {
return errors.WithStack(err)
}
if err := ioutil.WriteFile(config.Get().System.GetStatesPath(), data, 0644); err != nil {
return errors.WithStack(err)
}
return nil
}
// ReadStates returns the state of the servers.
func (m *Manager) ReadStates() (map[string]string, error) {
f, err := os.OpenFile(config.Get().System.GetStatesPath(), os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
return nil, errors.WithStack(err)
}
defer f.Close()
var states map[string]string
if err := json.NewDecoder(f).Decode(&states); err != nil && err != io.EOF {
return nil, errors.WithStack(err)
}
out := make(map[string]string, 0)
// Only return states for servers that we're currently tracking in the system.
for id, state := range states {
if _, ok := m.Get(id); ok {
out[id] = state
}
}
return out, nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
@ -20,7 +21,8 @@ import (
"golang.org/x/sync/semaphore"
)
// High level definition for a server instance being controlled by Wings.
// Server is the high level definition for a server instance being controlled
// by Wings.
type Server struct {
// Internal mutex used to block actions that need to occur sequentially, such as
// writing the configuration to the disk.
@ -28,9 +30,6 @@ type Server struct {
ctx context.Context
ctxCancel *context.CancelFunc
// manager holds a reference to the manager responsible for the server
manager *manager
emitterLock sync.Mutex
powerLock *semaphore.Weighted
throttleOnce sync.Once
@ -249,3 +248,108 @@ func (s *Server) EnsureDataDirectoryExists() error {
}
return nil
}
// Sets the state of the server internally. This function handles crash detection as
// well as reporting to event listeners for the server.
func (s *Server) OnStateChange() {
prevState := s.resources.State.Load()
st := s.Environment.State()
// Update the currently tracked state for the server.
s.resources.State.Store(st)
// Emit the event to any listeners that are currently registered.
if prevState != s.Environment.State() {
s.Log().WithField("status", st).Debug("saw server status change event")
s.Events().Publish(StatusEvent, st)
}
// Reset the resource usage to 0 when the process fully stops so that all of the UI
// views in the Panel correctly display 0.
if st == environment.ProcessOfflineState {
s.resources.Reset()
s.emitProcUsage()
}
// If server was in an online state, and is now in an offline state we should handle
// that as a crash event. In that scenario, check the last crash time, and the crash
// counter.
//
// In the event that we have passed the thresholds, don't do anything, otherwise
// automatically attempt to start the process back up for the user. This is done in a
// separate thread as to not block any actions currently taking place in the flow
// that called this function.
if (prevState == environment.ProcessStartingState || prevState == environment.ProcessRunningState) && s.Environment.State() == environment.ProcessOfflineState {
s.Log().Info("detected server as entering a crashed state; running crash handler")
go func(server *Server) {
if err := server.handleServerCrash(); err != nil {
if IsTooFrequentCrashError(err) {
server.Log().Info("did not restart server after crash; occurred too soon after the last")
} else {
s.PublishConsoleOutputFromDaemon("Server crash was detected but an error occurred while handling it.")
server.Log().WithField("error", err).Error("failed to handle server crash")
}
}
}(s)
}
}
// Determines if the server state is running or not. This is different than the
// environment state, it is simply the tracked state from this daemon instance, and
// not the response from Docker.
func (s *Server) IsRunning() bool {
st := s.Environment.State()
return st == environment.ProcessRunningState || st == environment.ProcessStartingState
}
// FromConfiguration initializes a server using a data byte array. This will be
// marshaled into the given struct using a YAML marshaler. This will also
// configure the given environment for a server.
func FromConfiguration(data api.ServerConfigurationResponse) (*Server, error) {
s, err := New()
if err != nil {
return nil, errors.WithMessage(err, "loader: failed to instantiate empty server struct")
}
if err := s.UpdateDataStructure(data.Settings); err != nil {
return nil, err
}
s.Archiver = Archiver{Server: s}
s.fs = filesystem.New(filepath.Join(config.Get().System.Data, s.Id()), s.DiskSpace(), s.Config().Egg.FileDenylist)
// Right now we only support a Docker based environment, so I'm going to hard code
// this logic in. When we're ready to support other environment we'll need to make
// some modifications here obviously.
settings := environment.Settings{
Mounts: s.Mounts(),
Allocations: s.cfg.Allocations,
Limits: s.cfg.Build,
}
envCfg := environment.NewConfiguration(settings, s.GetEnvironmentVariables())
meta := docker.Metadata{
Image: s.Config().Container.Image,
}
if env, err := docker.New(s.Id(), &meta, envCfg); err != nil {
return nil, err
} else {
s.Environment = env
s.StartEventListeners()
s.Throttler().StartTimer(s.Context())
}
// Forces the configuration to be synced with the panel.
if err := s.SyncWithConfiguration(data); err != nil {
return nil, err
}
// If the server's data directory exists, force disk usage calculation.
if _, err := os.Stat(s.Filesystem().Path()); err == nil {
s.Filesystem().HasSpaceAvailable(true)
}
return s, nil
}

View File

@ -1,137 +0,0 @@
package server
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"sync"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
)
var stateMutex sync.Mutex
// Returns the state of the servers.
func CachedServerStates() (map[string]string, error) {
// Request a lock after we check if the file exists.
stateMutex.Lock()
defer stateMutex.Unlock()
// Open the states file.
f, err := os.OpenFile(config.Get().System.GetStatesPath(), os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
defer f.Close()
// Convert the json object to a map.
states := map[string]string{}
if err := json.NewDecoder(f).Decode(&states); err != nil && err != io.EOF {
return nil, err
}
return states, nil
}
// saveServerStates .
func (m *manager) saveServerStates() error {
// Get the states of all servers on the daemon.
states := map[string]string{}
for _, s := range m.GetAll() {
states[s.Id()] = s.Environment.State()
}
// Convert the map to a json object.
data, err := json.Marshal(states)
if err != nil {
return err
}
stateMutex.Lock()
defer stateMutex.Unlock()
// Write the data to the file
if err := ioutil.WriteFile(config.Get().System.GetStatesPath(), data, 0644); err != nil {
return err
}
return nil
}
// Sets the state of the server internally. This function handles crash detection as
// well as reporting to event listeners for the server.
func (s *Server) OnStateChange() {
prevState := s.resources.State.Load()
st := s.Environment.State()
// Update the currently tracked state for the server.
s.resources.State.Store(st)
// Emit the event to any listeners that are currently registered.
if prevState != s.Environment.State() {
s.Log().WithField("status", st).Debug("saw server status change event")
s.Events().Publish(StatusEvent, st)
}
// Persist this change to the disk immediately so that should the Daemon be stopped or
// crash we can immediately restore the server state.
//
// This really only makes a difference if all of the Docker containers are also stopped,
// but this was a highly requested feature and isn't hard to work with, so lets do it.
//
// We also get the benefit of server status changes always propagating corrected configurations
// to the disk should we forget to do it elsewhere.
go func() {
if err := s.manager.saveServerStates(); err != nil {
s.Log().WithField("error", err).Warn("failed to write server states to disk")
}
}()
// Reset the resource usage to 0 when the process fully stops so that all of the UI
// views in the Panel correctly display 0.
if st == environment.ProcessOfflineState {
s.resources.Reset()
s.emitProcUsage()
}
// If server was in an online state, and is now in an offline state we should handle
// that as a crash event. In that scenario, check the last crash time, and the crash
// counter.
//
// In the event that we have passed the thresholds, don't do anything, otherwise
// automatically attempt to start the process back up for the user. This is done in a
// separate thread as to not block any actions currently taking place in the flow
// that called this function.
if (prevState == environment.ProcessStartingState || prevState == environment.ProcessRunningState) && s.Environment.State() == environment.ProcessOfflineState {
s.Log().Info("detected server as entering a crashed state; running crash handler")
go func(server *Server) {
if err := server.handleServerCrash(); err != nil {
if IsTooFrequentCrashError(err) {
server.Log().Info("did not restart server after crash; occurred too soon after the last")
} else {
s.PublishConsoleOutputFromDaemon("Server crash was detected but an error occurred while handling it.")
server.Log().WithField("error", err).Error("failed to handle server crash")
}
}
}(s)
}
}
// Returns the current state of the server in a race-safe manner.
// Deprecated
// use Environment.State()
func (s *Server) GetState() string {
return s.Environment.State()
}
// Determines if the server state is running or not. This is different than the
// environment state, it is simply the tracked state from this daemon instance, and
// not the response from Docker.
func (s *Server) IsRunning() bool {
st := s.Environment.State()
return st == environment.ProcessRunningState || st == environment.ProcessStartingState
}

View File

@ -24,14 +24,16 @@ import (
//goland:noinspection GoNameStartsWithPackageName
type SFTPServer struct {
manager *server.Manager
BasePath string
ReadOnly bool
Listen string
}
func New() *SFTPServer {
func New(m *server.Manager) *SFTPServer {
cfg := config.Get().System
return &SFTPServer{
manager: m,
BasePath: cfg.Data,
ReadOnly: cfg.Sftp.ReadOnly,
Listen: cfg.Sftp.Address + ":" + strconv.Itoa(cfg.Sftp.Port),
@ -81,7 +83,7 @@ func (c *SFTPServer) Run() error {
// Handles an inbound connection to the instance and determines if we should serve the
// request or not.
func (c SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) {
func (c *SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) {
// Before beginning a handshake must be performed on the incoming net.Conn
sconn, chans, reqs, err := ssh.NewServerConn(conn, config)
if err != nil {
@ -119,7 +121,7 @@ func (c SFTPServer) AcceptInbound(conn net.Conn, config *ssh.ServerConfig) {
// This will also attempt to match a specific server out of the global server
// store and return nil if there is no match.
uuid := sconn.Permissions.Extensions["uuid"]
srv := server.GetServers().Find(func(s *server.Server) bool {
srv := c.manager.Find(func(s *server.Server) bool {
if uuid == "" {
return false
}