Compare commits

...

32 Commits

Author SHA1 Message Date
Charles Morgan
033e8e7573 Add GoReportcard badge (#57)
Adds GoReportcard Badge
2020-09-17 20:48:09 -07:00
Michael (Parker) Parker
aa78071543 update docker configs (#50)
* update docker configs

dockerfile with an updated start command

docker-compose file adds custom network name so it can be used in firewalld commands.

* update compose file

mount changes
remove /srv/daemon-data
remove /etc/timezone

variable changes
add TZ

* add note about old data folder.

* update to go 1.15

Update base image to go version 1.15
2020-09-17 20:48:01 -07:00
Dane Everitt
48aeeff818 Merge branch 'develop' of https://github.com/pterodactyl/wings into develop 2020-09-17 20:45:19 -07:00
Dane Everitt
864c37f17c Use 2k lines as the per loop limit 2020-09-17 20:45:13 -07:00
Matthew Penner
c7405aebe5 Update release.yml to use go1.15.2 2020-09-17 21:39:48 -06:00
Matthew Penner
9ff2d53466 Update build-test.yml to use go1.15.2 2020-09-17 21:39:20 -06:00
Dane Everitt
6ba49df485 Protect against zip bombs; closes pterodactyl/panel#883 2020-09-17 20:37:34 -07:00
Dane Everitt
6b25ac3665 Fix websocket error spam; only send known JWT issues along to the socket itself, not to wings console; closes pterodactyl/panel#2354 2020-09-17 20:30:51 -07:00
Dane Everitt
783832fc71 Set the docker image correctly when a server is updated; closes pterodactyl/panel#2356 2020-09-17 20:20:39 -07:00
Dane Everitt
815539b3da Fix log rotation error due to missing templates dir in compiled build 2020-09-17 20:16:27 -07:00
Dane Everitt
6ba1b75696 Add console throttling; closes pterodactyl/panel#2214 (#60) 2020-09-17 20:13:04 -07:00
Dane Everitt
ce76b9339e better error handling for busy files; closes pterodactyl/panel#2332 2020-09-15 19:53:00 -07:00
Dane Everitt
6ba15e9884 Better error handling from responses 2020-09-13 13:55:40 -07:00
Dane Everitt
f2a6d6b3c5 Websocket cleanup 2020-09-12 22:12:23 -07:00
Dane Everitt
0295603943 Speed up wings boot when restoring from cleared docker environment
Doesn't pull images for any server that is not supposed to be running at boot time
2020-09-12 22:08:50 -07:00
Dane Everitt
ce2659fdd7 Simplify environment creation for server 2020-09-12 21:48:04 -07:00
Dane Everitt
be49e08f4f Show note in console when image is being pulled, show pull status to admins 2020-09-12 21:37:48 -07:00
Dane Everitt
3ee76ea2bc Cleanup 2020-09-12 20:26:02 -07:00
Dane Everitt
d7fbf29cc1 Remove debug lines 2020-09-12 20:17:36 -07:00
Dane Everitt
d02e37620d Use workerpools to enforce FIFO without blocking other topics 2020-09-12 20:13:59 -07:00
Dane Everitt
53bd0d57ad Replace logic with PublishJson 2020-09-12 20:13:48 -07:00
Dane Everitt
b779c98717 Fix files unarchiving to the root; closes pterodactyl/panel#2333 2020-09-12 19:25:29 -07:00
Dane Everitt
4ac19bd29d Refactor confusing & fragile event bus logic to use callbacks and not channels; ref pterodactyl/panel#2298 2020-09-12 09:26:17 -07:00
Dane Everitt
8407ea21da Fix state retrevial race condition 2020-09-12 09:12:30 -07:00
Dane Everitt
fa6f56caa8 Remove pointless debug log 2020-09-11 23:18:51 -07:00
Dane Everitt
5a62f83ec8 Don't run pre-boot actions if the server is already running 2020-09-11 23:11:57 -07:00
Dane Everitt
8bcb3d7c62 Remove deadlock specific code 2020-09-11 23:03:35 -07:00
Dane Everitt
b2eebcaf6d Fix deadlocks in event listener system; closes pterodactyl/panel#2298
Fixes deadlocks that occurred when events were registered while other events were being unsubscribed and data was being flooded to these listeners. A complete mess, I hate this code, it is going to break again, but jesus I'm so tired.
2020-09-11 23:01:54 -07:00
Matthew Penner
45bcb9cd68 Lets not attempt to pull 16384 log lines 2020-09-11 22:52:07 -06:00
Dane Everitt
e1ff4db330 Also fix builds for non-releases 2020-09-11 21:00:37 -07:00
Dane Everitt
606143b3ad Fix flags for workflow, strips 8MB off final binary size 2020-09-11 20:59:39 -07:00
Dane Everitt
57221bdd30 Make disk checking timeout configurable 2020-09-11 20:24:23 -07:00
36 changed files with 609 additions and 303 deletions

View File

@@ -14,10 +14,10 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: '^1.15' go-version: '1.15.2'
- name: Build - name: Build
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
- name: Test - name: Test
run: go test ./... run: go test ./...

View File

@@ -12,12 +12,12 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: '^1.15' go-version: '1.15.2'
- name: Build - name: Build
env: env:
REF: ${{ github.ref }} REF: ${{ github.ref }}
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
- name: Test - name: Test
run: go test ./... run: go test ./...

View File

@@ -2,7 +2,7 @@
# Pterodactyl Panel Dockerfile # Pterodactyl Panel Dockerfile
# ---------------------------------- # ----------------------------------
FROM golang:1.14-alpine FROM golang:1.15-alpine
COPY . /go/wings/ COPY . /go/wings/
WORKDIR /go/wings/ WORKDIR /go/wings/
RUN apk add --no-cache upx \ RUN apk add --no-cache upx \
@@ -11,4 +11,4 @@ RUN apk add --no-cache upx \
FROM alpine:latest FROM alpine:latest
COPY --from=0 /go/wings/wings /usr/bin/ COPY --from=0 /go/wings/wings /usr/bin/
CMD ["wings","--config", "/var/lib/pterodactyl/config.yml"] CMD ["wings","--config", "/etc/pterodactyl/config.yml"]

View File

@@ -1,6 +1,7 @@
[![Logo Image](https://cdn.pterodactyl.io/logos/new/pterodactyl_logo.png)](https://pterodactyl.io) [![Logo Image](https://cdn.pterodactyl.io/logos/new/pterodactyl_logo.png)](https://pterodactyl.io)
[![Discord](https://img.shields.io/discord/122900397965705216.svg?style=flat-square&label=Discord)](https://pterodactyl.io/discord) [![Discord](https://img.shields.io/discord/122900397965705216.svg?style=flat-square&label=Discord)](https://pterodactyl.io/discord)
[![Go Report Card](https://goreportcard.com/badge/github.com/pterodactyl/wings)](https://goreportcard.com/report/github.com/pterodactyl/wings)
# Pterodactyl Wings # Pterodactyl Wings
Wings is Pterodactyl's server control plane, built for the rapidly changing gaming industry and designed to be Wings is Pterodactyl's server control plane, built for the rapidly changing gaming industry and designed to be

View File

@@ -137,6 +137,7 @@ func IsRequestError(err error) bool {
} }
type RequestError struct { type RequestError struct {
response *http.Response
Code string `json:"code"` Code string `json:"code"`
Status string `json:"status"` Status string `json:"status"`
Detail string `json:"detail"` Detail string `json:"detail"`
@@ -144,7 +145,7 @@ type RequestError struct {
// Returns the error response in a string form that can be more easily consumed. // Returns the error response in a string form that can be more easily consumed.
func (re *RequestError) Error() string { func (re *RequestError) Error() string {
return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%s)", re.Code, re.Detail, re.Status) return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%d)", re.Code, re.Detail, re.response.StatusCode)
} }
func (re *RequestError) String() string { func (re *RequestError) String() string {
@@ -165,9 +166,12 @@ func (r *PanelRequest) Error() *RequestError {
bag := RequestErrorBag{} bag := RequestErrorBag{}
json.Unmarshal(body, &bag) json.Unmarshal(body, &bag)
if len(bag.Errors) == 0 { e := new(RequestError)
return new(RequestError) if len(bag.Errors) > 0 {
e = &bag.Errors[0]
} }
return &bag.Errors[0] e.response = r.Response
return e
} }

View File

@@ -68,6 +68,12 @@ func (r *PanelRequest) ValidateSftpCredentials(request SftpAuthRequest) (*SftpAu
if r.HasError() { if r.HasError() {
if r.HttpResponseCode() >= 400 && r.HttpResponseCode() < 500 { if r.HttpResponseCode() >= 400 && r.HttpResponseCode() < 500 {
log.WithFields(log.Fields{
"subsystem": "sftp",
"username": request.User,
"ip": request.IP,
}).Warn(r.Error().String())
return nil, new(sftpInvalidCredentialsError) return nil, new(sftpInvalidCredentialsError)
} }

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/NYTimes/logrotate" "github.com/NYTimes/logrotate"
"github.com/apex/log/handlers/multi" "github.com/apex/log/handlers/multi"
"github.com/docker/docker/client"
"github.com/gammazero/workerpool" "github.com/gammazero/workerpool"
"golang.org/x/crypto/acme" "golang.org/x/crypto/acme"
"net/http" "net/http"
@@ -172,6 +173,11 @@ func rootCmdRun(*cobra.Command, []string) {
log.WithField("server", s.Id()).Info("loaded configuration for server") log.WithField("server", s.Id()).Info("loaded configuration for server")
} }
states, err := server.CachedServerStates()
if err != nil {
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
}
// Create a new workerpool that limits us to 4 servers being bootstrapped at a time // Create a new workerpool that limits us to 4 servers being bootstrapped at a time
// on Wings. This allows us to ensure the environment exists, write configurations, // on Wings. This allows us to ensure the environment exists, write configurations,
// and reboot processes without causing a slow-down due to sequential booting. // and reboot processes without causing a slow-down due to sequential booting.
@@ -181,25 +187,39 @@ func rootCmdRun(*cobra.Command, []string) {
s := serv s := serv
pool.Submit(func() { pool.Submit(func() {
s.Log().Info("ensuring server environment exists") s.Log().Info("configuring server environment and restoring to previous state")
// Create a server environment if none exists currently. This allows us to recover from Docker
// being reinstalled on the host system for example. var st string
if err := s.Environment.Create(); err != nil { if state, exists := states[s.Id()]; exists {
s.Log().WithField("error", err).Error("failed to process environment") st = state
} }
r, err := s.Environment.IsRunning() r, err := s.Environment.IsRunning()
if err != nil { // We ignore missing containers because we don't want to actually block booting of wings at this
// point. If we didn't do this and you pruned all of the images and then started wings you could
// end up waiting a long period of time for all of the images to be re-pulled on Wings boot rather
// than when the server itself is started.
if err != nil && !client.IsErrNotFound(err) {
s.Log().WithField("error", err).Error("error checking server environment status") s.Log().WithField("error", err).Error("error checking server environment status")
} }
// Check if the server was previously running. If so, attempt to start the server now so that Wings
// can pick up where it left off. If the environment does not exist at all, just create it and then allow
// the normal flow to execute.
//
// This does mean that booting wings after a catastrophic machine crash and wiping out the Docker images
// as a result will result in a slow boot.
if !r && (st == environment.ProcessRunningState || st == environment.ProcessStartingState) {
if err := s.HandlePowerAction(server.PowerActionStart); err != nil {
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to return server to running state")
}
} else if r || (!r && s.IsRunning()) {
// If the server is currently running on Docker, mark the process as being in that state. // If the server is currently running on Docker, mark the process as being in that state.
// We never want to stop an instance that is currently running external from Wings since // We never want to stop an instance that is currently running external from Wings since
// that is a good way of keeping things running even if Wings gets in a very corrupted state. // that is a good way of keeping things running even if Wings gets in a very corrupted state.
// //
// This will also validate that a server process is running if the last tracked state we have // This will also validate that a server process is running if the last tracked state we have
// is that it was running, but we see that the container process is not currently running. // is that it was running, but we see that the container process is not currently running.
if r || (!r && s.IsRunning()) {
s.Log().Info("detected server is running, re-attaching to process...") s.Log().Info("detected server is running, re-attaching to process...")
s.SetState(environment.ProcessRunningState) s.SetState(environment.ProcessRunningState)

View File

@@ -36,6 +36,12 @@ type SystemConfiguration struct {
Gid int Gid int
} }
// The amount of time in seconds that can elapse before a server's disk space calculation is
// considered stale and a re-check should occur. DANGER: setting this value too low can seriously
// impact system performance and cause massive I/O bottlenecks and high CPU usage for the Wings
// process.
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
// Determines if Wings should detect a server that stops with a normal exit code of // Determines if Wings should detect a server that stops with a normal exit code of
// "0" as being crashed if the process stopped without any Wings interaction. E.g. // "0" as being crashed if the process stopped without any Wings interaction. E.g.
// the user did not press the stop button, but the process stopped cleanly. // the user did not press the stop button, but the process stopped cleanly.
@@ -129,7 +135,21 @@ func (sc *SystemConfiguration) EnableLogRotation() error {
} }
defer f.Close() defer f.Close()
t, err := template.ParseFiles("templates/logrotate.tpl") t, err := template.New("logrotate").Parse(`
{{.LogDirectory}}/wings.log {
size 10M
compress
delaycompress
dateext
maxage 7
missingok
notifempty
create 0640 {{.User.Uid}} {{.User.Gid}}
postrotate
killall -SIGHUP wings
endscript
}`)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }

View File

@@ -4,20 +4,24 @@ type ConsoleThrottles struct {
// Whether or not the throttler is enabled for this instance. // Whether or not the throttler is enabled for this instance.
Enabled bool `json:"enabled" yaml:"enabled" default:"true"` Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
// The total number of throttle activations that must accumulate before a server is // The total number of lines that can be output in a given LineResetInterval period before
// forcibly stopped for violating these limits.
KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"`
// The amount of time in milliseconds that a server process must go through without
// triggering an output warning before the throttle activation count begins decreasing.
// This time is measured in milliseconds.
Decay uint64 `json:"decay" yaml:"decay" default:"10000"`
// The total number of lines that can be output in a given CheckInterval period before
// a warning is triggered and counted against the server. // a warning is triggered and counted against the server.
Lines uint64 `json:"lines" yaml:"lines" default:"1000"` Lines uint64 `json:"lines" yaml:"lines" default:"2000"`
// The amount of time that must pass between intervals before the count is reset. This // The total number of throttle activations that can accumulate before a server is considered
// value is in milliseconds. // to be breaching and will be stopped. This value is decremented by one every DecayInterval.
CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"` MaximumTriggerCount uint64 `json:"maximum_trigger_count" yaml:"maximum_trigger_count" default:"5"`
// The amount of time after which the number of lines processed is reset to 0. This runs in
// a constant loop and is not affected by the current console output volumes. By default, this
// will reset the processed line count back to 0 every 100ms.
LineResetInterval uint64 `json:"line_reset_interval" yaml:"line_reset_interval" default:"100"`
// The amount of time in milliseconds that must pass without an output warning being triggered
// before a throttle activation is decremented.
DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"`
// The amount of time that a server is allowed to be stopping for before it is terminated
// forfully if it triggers output throttles.
StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"`
} }

View File

@@ -1,26 +1,35 @@
version: '3' version: '3.5'
services: services:
daemon: daemon:
build: . build: .
restart: always restart: always
hostname: daemon networks:
- daemon0
ports: ports:
- "8080:8080" - "8080:8080"
- "2022:2022" - "2022:2022"
tty: true tty: true
environment: environment:
- "DEBUG=false" - "DEBUG=false"
- "TZ=UTC" # change to the three letter timezone of your choosing
volumes: volumes:
- "/var/run/docker.sock:/var/run/docker.sock" - "/var/run/docker.sock:/var/run/docker.sock"
- "/var/lib/docker/containers/:/var/lib/docker/containers/" - "/var/lib/docker/containers/:/var/lib/docker/containers/"
- "/etc/pterodactyl/:/etc/pterodactyl/"
- "/var/lib/pterodactyl/:/var/lib/pterodactyl/" - "/var/lib/pterodactyl/:/var/lib/pterodactyl/"
- "/srv/daemon-data/:/srv/daemon-data/" - "/var/log/pterodactyl/:/var/log/pterodactyl/"
- "/tmp/pterodactyl/:/tmp/pterodactyl/" - "/tmp/pterodactyl/:/tmp/pterodactyl/"
- "/etc/timezone:/etc/timezone:ro" ## you may need /srv/daemon-data if you are upgrading from an old daemon
## - "/srv/daemon-data/:/srv/daemon-data/"
## Required for ssl if you user let's encrypt. uncomment to use. ## Required for ssl if you user let's encrypt. uncomment to use.
## - "/etc/letsencrypt/:/etc/letsencrypt/" ## - "/etc/letsencrypt/:/etc/letsencrypt/"
networks: networks:
default: daemon0:
name: daemon0
driver: bridge
ipam: ipam:
config: config:
- subnet: 172.21.0.0/16 - subnet: "172.21.0.0/16"
driver_opts:
com.docker.network.bridge.name: daemon0

View File

@@ -3,6 +3,7 @@ package docker
import ( import (
"bufio" "bufio"
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/apex/log" "github.com/apex/log"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
@@ -19,6 +20,11 @@ import (
"time" "time"
) )
type imagePullStatus struct {
Status string `json:"status"`
Progress string `json:"progress"`
}
// Attaches to the docker container itself and ensures that we can pipe data in and out // Attaches to the docker container itself and ensures that we can pipe data in and out
// of the process stream. This should not be used for reading console data as you *will* // of the process stream. This should not be used for reading console data as you *will*
// miss important output at the beginning because of the time delay with attaching to the // miss important output at the beginning because of the time delay with attaching to the
@@ -307,6 +313,9 @@ func (e *Environment) followOutput() error {
// //
// TODO: local images // TODO: local images
func (e *Environment) ensureImageExists(image string) error { func (e *Environment) ensureImageExists(image string) error {
e.Events().Publish(environment.DockerImagePullStarted, "")
defer e.Events().Publish(environment.DockerImagePullCompleted, "")
// Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an // Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an
// image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull // image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull
// an image. Let me know when I am inevitably wrong here... // an image. Let me know when I am inevitably wrong here...
@@ -374,12 +383,18 @@ func (e *Environment) ensureImageExists(image string) error {
// is done being pulled, which is what we need. // is done being pulled, which is what we need.
scanner := bufio.NewScanner(out) scanner := bufio.NewScanner(out)
for scanner.Scan() { for scanner.Scan() {
continue s := imagePullStatus{}
fmt.Println(scanner.Text())
if err := json.Unmarshal(scanner.Bytes(), &s); err == nil {
e.Events().Publish(environment.DockerImagePullStatus, s.Status+" "+s.Progress)
}
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return err return err
} }
log.WithField("image", image).Debug("completed docker image pull")
return nil return nil
} }

View File

@@ -176,3 +176,9 @@ func (e *Environment) SetStopConfiguration(c *api.ProcessStopConfiguration) {
e.meta.Stop = c e.meta.Stop = c
e.mu.Unlock() e.mu.Unlock()
} }
func (e *Environment) SetImage(i string) {
e.mu.Lock()
e.meta.Image = i
e.mu.Unlock()
}

View File

@@ -183,13 +183,21 @@ func (e *Environment) WaitForStop(seconds uint, terminate bool) error {
case <-ctx.Done(): case <-ctx.Done():
if ctxErr := ctx.Err(); ctxErr != nil { if ctxErr := ctx.Err(); ctxErr != nil {
if terminate { if terminate {
return e.Terminate(os.Kill) log.WithField("container_id", e.Id).Debug("server did not stop in time, executing process termination")
return errors.WithStack(e.Terminate(os.Kill))
} }
return errors.WithStack(ctxErr) return errors.WithStack(ctxErr)
} }
case err := <-errChan: case err := <-errChan:
if err != nil { if err != nil {
if terminate {
log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while waiting for container stop, attempting process termination")
return errors.WithStack(e.Terminate(os.Kill))
}
return errors.WithStack(err) return errors.WithStack(err)
} }
case <-ok: case <-ok:

View File

@@ -9,6 +9,9 @@ const (
ConsoleOutputEvent = "console output" ConsoleOutputEvent = "console output"
StateChangeEvent = "state change" StateChangeEvent = "state change"
ResourceEvent = "resources" ResourceEvent = "resources"
DockerImagePullStarted = "docker image pull started"
DockerImagePullStatus = "docker image pull status"
DockerImagePullCompleted = "docker image pull completed"
) )
const ( const (

View File

@@ -2,6 +2,8 @@ package events
import ( import (
"encoding/json" "encoding/json"
"github.com/gammazero/workerpool"
"github.com/pkg/errors"
"strings" "strings"
"sync" "sync"
) )
@@ -12,14 +14,13 @@ type Event struct {
} }
type EventBus struct { type EventBus struct {
sync.RWMutex mu sync.RWMutex
pools map[string]*CallbackPool
subscribers map[string]map[chan Event]struct{}
} }
func New() *EventBus { func New() *EventBus {
return &EventBus{ return &EventBus{
subscribers: make(map[string]map[chan Event]struct{}), pools: make(map[string]*CallbackPool),
} }
} }
@@ -39,25 +40,36 @@ func (e *EventBus) Publish(topic string, data string) {
} }
} }
e.mu.RLock()
defer e.mu.RUnlock()
// Acquire a read lock and loop over all of the channels registered for the topic. This // Acquire a read lock and loop over all of the channels registered for the topic. This
// avoids a panic crash if the process tries to unregister the channel while this routine // avoids a panic crash if the process tries to unregister the channel while this routine
// is running. // is running.
go func() { if cp, ok := e.pools[t]; ok {
e.RLock() for _, callback := range cp.callbacks {
defer e.RUnlock() c := *callback
evt := Event{Data: data, Topic: topic}
if ch, ok := e.subscribers[t]; ok { // Using the workerpool with one worker allows us to execute events in a FIFO manner. Running
for channel := range ch { // this using goroutines would cause things such as console output to just output in random order
channel <- Event{Data: data, Topic: topic} // if more than one event is fired at the same time.
//
// However, the pool submission does not block the execution of this function itself, allowing
// us to call publish without blocking any of the other pathways.
//
// @see https://github.com/pterodactyl/panel/issues/2303
cp.pool.Submit(func() {
c(evt)
})
} }
} }
}()
} }
// Publishes a JSON message to a given topic.
func (e *EventBus) PublishJson(topic string, data interface{}) error { func (e *EventBus) PublishJson(topic string, data interface{}) error {
b, err := json.Marshal(data) b, err := json.Marshal(data)
if err != nil { if err != nil {
return err return errors.WithStack(err)
} }
e.Publish(topic, string(b)) e.Publish(topic, string(b))
@@ -65,41 +77,46 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
return nil return nil
} }
// Subscribe to an emitter topic using a channel. // Register a callback function that will be executed each time one of the events using the topic
func (e *EventBus) Subscribe(topic string, ch chan Event) { // name is called.
e.Lock() func (e *EventBus) On(topic string, callback *func(Event)) {
defer e.Unlock() e.mu.Lock()
defer e.mu.Unlock()
if _, exists := e.subscribers[topic]; !exists { // Check if this topic has been registered at least once for the event listener, and if
e.subscribers[topic] = make(map[chan Event]struct{}) // not create an empty struct for the topic.
} if _, exists := e.pools[topic]; !exists {
e.pools[topic] = &CallbackPool{
// Only set the channel if there is not currently a matching one for this topic. This callbacks: make([]*func(Event), 0),
// avoids registering two identical listeners for the same topic and causing pain in pool: workerpool.New(1),
// the unsubscribe functionality as well.
if _, exists := e.subscribers[topic][ch]; !exists {
e.subscribers[topic][ch] = struct{}{}
} }
} }
// Unsubscribe a channel from a given topic. // If this callback is not already registered as an event listener, go ahead and append
func (e *EventBus) Unsubscribe(topic string, ch chan Event) { // it to the array of callbacks for this topic.
e.Lock() e.pools[topic].Add(callback)
defer e.Unlock() }
if _, exists := e.subscribers[topic][ch]; exists { // Removes an event listener from the bus.
delete(e.subscribers[topic], ch) func (e *EventBus) Off(topic string, callback *func(Event)) {
e.mu.Lock()
defer e.mu.Unlock()
if cp, ok := e.pools[topic]; ok {
cp.Remove(callback)
} }
} }
// Removes all of the event listeners for the server. This is used when a server // Removes all of the event listeners that have been registered for any topic. Also stops the worker
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously // pool to close that routine.
// should also check elsewhere and handle a server reference going nil, but this func (e *EventBus) Destroy() {
// won't hurt. e.mu.Lock()
func (e *EventBus) UnsubscribeAll() { defer e.mu.Unlock()
e.Lock()
defer e.Unlock()
// Reset the entire struct into an empty map. // Stop every pool that exists for a given callback topic.
e.subscribers = make(map[string]map[chan Event]struct{}) for _, cp := range e.pools {
cp.pool.Stop()
}
e.pools = make(map[string]*CallbackPool)
} }

49
events/pool.go Normal file
View File

@@ -0,0 +1,49 @@
package events
import (
"github.com/gammazero/workerpool"
"reflect"
)
type CallbackPool struct {
callbacks []*func(Event)
pool *workerpool.WorkerPool
}
// Pushes a new callback into the array of listeners for the pool.
func (cp *CallbackPool) Add(callback *func(Event)) {
if cp.index(reflect.ValueOf(callback)) < 0 {
cp.callbacks = append(cp.callbacks, callback)
}
}
// Removes a callback from the array of registered callbacks if it exists.
func (cp *CallbackPool) Remove(callback *func(Event)) {
i := cp.index(reflect.ValueOf(callback))
// If i < 0 it means there was no index found for the given callback, meaning it was
// never registered or was already unregistered from the listeners. Also double check
// that we didn't somehow escape the length of the topic callback (not sure how that
// would happen, but lets avoid a panic condition).
if i < 0 || i >= len(cp.callbacks) {
return
}
// We can assume that the topic still exists at this point since we acquire an exclusive
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
// no topic already existing.
cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...)
}
// Finds the index of a given callback in the topic by comparing all of the registered callback
// pointers to the passed function. This function does not aquire a lock as it should only be called
// within the confines of a function that has already acquired a lock for the duration of the lookup.
func (cp *CallbackPool) index(v reflect.Value) int {
for i, handler := range cp.callbacks {
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
return i
}
}
return -1
}

View File

@@ -2,16 +2,12 @@ package installer
import ( import (
"encoding/json" "encoding/json"
"github.com/apex/log"
"github.com/asaskevich/govalidator" "github.com/asaskevich/govalidator"
"github.com/buger/jsonparser" "github.com/buger/jsonparser"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/server" "github.com/pterodactyl/wings/server"
"os"
"path"
) )
type Installer struct { type Installer struct {
@@ -95,33 +91,6 @@ func (i *Installer) Server() *server.Server {
return i.server return i.server
} }
// Executes the installer process, creating the server and running through the
// associated installation process based on the parameters passed through for
// the server instance.
func (i *Installer) Execute() {
p := path.Join(config.Get().System.Data, i.Uuid())
l := log.WithFields(log.Fields{"server": i.Uuid(), "process": "installer"})
l.WithField("path", p).Debug("creating required server data directory")
if err := os.MkdirAll(p, 0755); err != nil {
l.WithFields(log.Fields{"path": p, "error": errors.WithStack(err)}).Error("failed to create server data directory")
return
}
if err := os.Chown(p, config.Get().System.User.Uid, config.Get().System.User.Gid); err != nil {
l.WithField("error", errors.WithStack(err)).Error("failed to chown server data directory")
return
}
l.Debug("creating required environment for server instance")
if err := i.server.Environment.Create(); err != nil {
l.WithField("error", err).Error("failed to create environment for server")
return
}
l.Info("successfully created environment for server during install process")
}
// Returns a string value from the JSON data provided. // Returns a string value from the JSON data provided.
func getString(data []byte, key ...string) string { func getString(data []byte, key ...string) string {
value, _ := jsonparser.GetString(data, key...) value, _ := jsonparser.GetString(data, key...)

View File

@@ -198,7 +198,8 @@ func deleteServer(c *gin.Context) {
} }
// Unsubscribe all of the event listeners. // Unsubscribe all of the event listeners.
s.Events().UnsubscribeAll() s.Events().Destroy()
s.Throttler().StopTimer()
// Destroy the environment; in Docker this will handle a running container and // Destroy the environment; in Docker this will handle a running container and
// forcibly terminate it before removing the container, so we do not need to handle // forcibly terminate it before removing the container, so we do not need to handle

View File

@@ -339,10 +339,22 @@ func postServerDecompressFiles(c *gin.Context) {
} }
if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil { if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil {
// Check if the file does not exist.
// NOTE: os.IsNotExist() does not work if the error is wrapped.
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
c.Status(http.StatusNotFound) c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
"error": "The requested archive was not found.",
})
return
}
// If the file is busy for some reason just return a nicer error to the user since there is not
// much we specifically can do. They'll need to stop the running server process in order to overwrite
// a file like this.
if strings.Contains(err.Error(), "text file busy") {
s.Log().WithField("error", err).Warn("failed to decompress file due to busy text file")
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "One or more files this archive is attempting to overwrite are currently in use by another process. Please try again.",
})
return return
} }

View File

@@ -57,7 +57,11 @@ func postCreateServer(c *gin.Context) {
// cycle. If there are any errors they will be logged and communicated back // cycle. If there are any errors they will be logged and communicated back
// to the Panel where a reinstall may take place. // to the Panel where a reinstall may take place.
go func(i *installer.Installer) { go func(i *installer.Installer) {
i.Execute() err := i.Server().CreateEnvironment()
if err != nil {
i.Server().Log().WithField("error", err).Error("failed to create server environment during install process")
return
}
if err := i.Server().Install(false); err != nil { if err := i.Server().Install(false); err != nil {
log.WithFields(log.Fields{"server": i.Uuid(), "error": err}).Error("failed to run install process for server") log.WithFields(log.Fields{"server": i.Uuid(), "error": err}).Error("failed to run install process for server")

View File

@@ -277,7 +277,10 @@ func postTransfer(c *gin.Context) {
server.GetServers().Add(i.Server()) server.GetServers().Add(i.Server())
// Create the server's environment (note this does not execute the install script) // Create the server's environment (note this does not execute the install script)
i.Execute() if err := i.Server().CreateEnvironment(); err != nil {
l.WithField("error", err).Error("failed to create server environment")
return
}
// Un-archive the archive. That sounds weird.. // Un-archive the archive. That sounds weird..
if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem.Path()); err != nil { if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem.Path()); err != nil {

View File

@@ -50,24 +50,26 @@ var e = []string{
// Listens for different events happening on a server and sends them along // Listens for different events happening on a server and sends them along
// to the connected websocket. // to the connected websocket.
func (h *Handler) ListenForServerEvents(ctx context.Context) { func (h *Handler) ListenForServerEvents(ctx context.Context) {
eventChannel := make(chan events.Event) h.server.Log().Debug("listening for server events over websocket")
for _, event := range e { callback := func(e events.Event) {
h.server.Events().Subscribe(event, eventChannel) if err := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); err != nil {
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
}
} }
for d := range eventChannel { // Subscribe to all of the events with the same callback that will push the data out over the
// websocket for the server.
for _, evt := range e {
h.server.Events().On(evt, &callback)
}
go func(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
for _, event := range e { // Once this context is stopped, de-register all of the listeners that have been registered.
h.server.Events().Unsubscribe(event, eventChannel) for _, evt := range e {
} h.server.Events().Off(evt, &callback)
close(eventChannel)
default:
_ = h.SendJson(&Message{
Event: d.Topic,
Args: []string{d.Data},
})
} }
} }
}(ctx)
} }

View File

@@ -12,12 +12,7 @@ const (
) )
type Message struct { type Message struct {
// The event to perform. Should be one of the following that are supported: // The event to perform.
//
// - status : Returns the server's power state.
// - logs : Returns the server log data at the time of the request.
// - power : Performs a power action against the server based the data.
// - command : Performs a command on a server using the data field.
Event string `json:"event"` Event string `json:"event"`
// The data to pass along, only used by power/command currently. Other requests // The data to pass along, only used by power/command currently. Other requests

View File

@@ -37,6 +37,19 @@ type Handler struct {
server *server.Server server *server.Server
} }
var (
ErrJwtNotPresent = errors.New("jwt: no jwt present")
ErrJwtNoConnectPerm = errors.New("jwt: missing connect permission")
ErrJwtUuidMismatch = errors.New("jwt: server uuid mismatch")
)
func IsJwtError(err error) bool {
return errors.Is(err, ErrJwtNotPresent) ||
errors.Is(err, ErrJwtNoConnectPerm) ||
errors.Is(err, ErrJwtUuidMismatch) ||
errors.Is(err, jwt.ErrExpValidation)
}
// Parses a JWT into a websocket token payload. // Parses a JWT into a websocket token payload.
func NewTokenPayload(token []byte) (*tokens.WebsocketPayload, error) { func NewTokenPayload(token []byte) (*tokens.WebsocketPayload, error) {
payload := tokens.WebsocketPayload{} payload := tokens.WebsocketPayload{}
@@ -92,9 +105,13 @@ func GetHandler(s *server.Server, w http.ResponseWriter, r *http.Request) (*Hand
} }
func (h *Handler) SendJson(v *Message) error { func (h *Handler) SendJson(v *Message) error {
// Do not send JSON down the line if the JWT on the connection is not // Do not send JSON down the line if the JWT on the connection is not valid!
// valid!
if err := h.TokenValid(); err != nil { if err := h.TokenValid(); err != nil {
h.unsafeSendJson(Message{
Event: ErrorEvent,
Args: []string{"could not authenticate client: " + err.Error()},
})
return nil return nil
} }
@@ -134,7 +151,7 @@ func (h *Handler) unsafeSendJson(v interface{}) error {
func (h *Handler) TokenValid() error { func (h *Handler) TokenValid() error {
j := h.GetJwt() j := h.GetJwt()
if j == nil { if j == nil {
return errors.New("no jwt present") return ErrJwtNotPresent
} }
if err := jwt.ExpirationTimeValidator(time.Now())(&j.Payload); err != nil { if err := jwt.ExpirationTimeValidator(time.Now())(&j.Payload); err != nil {
@@ -142,11 +159,11 @@ func (h *Handler) TokenValid() error {
} }
if !j.HasPermission(PermissionConnect) { if !j.HasPermission(PermissionConnect) {
return errors.New("jwt does not have connect permission") return ErrJwtNoConnectPerm
} }
if h.server.Id() != j.GetServerUuid() { if h.server.Id() != j.GetServerUuid() {
return errors.New("jwt server uuid mismatch") return ErrJwtUuidMismatch
} }
return nil return nil
@@ -157,9 +174,10 @@ func (h *Handler) TokenValid() error {
// error message, otherwise we just send back a standard error message. // error message, otherwise we just send back a standard error message.
func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error { func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
j := h.GetJwt() j := h.GetJwt()
expected := errors.Is(err, server.ErrSuspended) || errors.Is(err, server.ErrIsRunning)
message := "an unexpected error was encountered while handling this request" message := "an unexpected error was encountered while handling this request"
if server.IsSuspendedError(err) || (j != nil && j.HasPermission(PermissionReceiveErrors)) { if expected || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
message = err.Error() message = err.Error()
} }
@@ -169,7 +187,7 @@ func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error
wsm.Args = []string{m} wsm.Args = []string{m}
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) { if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
if !server.IsSuspendedError(err) { if !expected && !IsJwtError(err) {
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}). h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
Error("failed to handle websocket process; an error was encountered processing an event") Error("failed to handle websocket process; an error was encountered processing an event")
} }
@@ -206,8 +224,6 @@ func (h *Handler) GetJwt() *tokens.WebsocketPayload {
func (h *Handler) HandleInbound(m Message) error { func (h *Handler) HandleInbound(m Message) error {
if m.Event != AuthenticationEvent { if m.Event != AuthenticationEvent {
if err := h.TokenValid(); err != nil { if err := h.TokenValid(); err != nil {
log.WithField("message", err.Error()).Debug("jwt for server websocket is no longer valid")
h.unsafeSendJson(Message{ h.unsafeSendJson(Message{
Event: ErrorEvent, Event: ErrorEvent,
Args: []string{"could not authenticate client: " + err.Error()}, Args: []string{"could not authenticate client: " + err.Error()},
@@ -313,7 +329,7 @@ func (h *Handler) HandleInbound(m Message) error {
return nil return nil
} }
logs, err := h.server.Environment.Readlog(1024 * 16) logs, err := h.server.Environment.Readlog(100)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -1,60 +1,111 @@
package server package server
import ( import (
"context"
"fmt" "fmt"
"github.com/mitchellh/colorstring" "github.com/mitchellh/colorstring"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/system"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
var ErrTooMuchConsoleData = errors.New("console is outputting too much data")
type ConsoleThrottler struct { type ConsoleThrottler struct {
sync.RWMutex mu sync.Mutex
config.ConsoleThrottles config.ConsoleThrottles
// The total number of activations that have occurred thus far. // The total number of activations that have occurred thus far.
activations uint64 activations uint64
// The total number of lines that have been sent since the last reset timer period.
count uint64
// Wether or not the console output is being throttled. It is up to calling code to
// determine what to do if it is.
isThrottled system.AtomicBool
// The total number of lines processed so far during the given time period. // The total number of lines processed so far during the given time period.
lines uint64 timerCancel *context.CancelFunc
lastIntervalTime *time.Time
lastDecayTime *time.Time
} }
// Increments the number of activations for a server. // Resets the state of the throttler.
func (ct *ConsoleThrottler) AddActivation() uint64 { func (ct *ConsoleThrottler) Reset() {
ct.Lock() atomic.StoreUint64(&ct.count, 0)
defer ct.Unlock() atomic.StoreUint64(&ct.activations, 0)
ct.isThrottled.Set(false)
ct.activations += 1
return ct.activations
} }
// Decrements the number of activations for a server. // Triggers an activation for a server. You can also decrement the number of activations
func (ct *ConsoleThrottler) RemoveActivation() uint64 { // by passing a negative number.
ct.Lock() func (ct *ConsoleThrottler) markActivation(increment bool) uint64 {
defer ct.Unlock() if !increment {
if atomic.LoadUint64(&ct.activations) == 0 {
if ct.activations == 0 {
return 0 return 0
} }
ct.activations -= 1 // This weird dohicky subtracts 1 from the activation count.
return atomic.AddUint64(&ct.activations, ^uint64(0))
return ct.activations
} }
// Increment the total count of lines that we have processed so far. return atomic.AddUint64(&ct.activations, 1)
func (ct *ConsoleThrottler) IncrementLineCount() uint64 {
return atomic.AddUint64(&ct.lines, 1)
} }
// Reset the line count to zero. // Determines if the console is currently being throttled. Calls to this function can be used to
func (ct *ConsoleThrottler) ResetLineCount() { // determine if output should be funneled along to the websocket processes.
atomic.SwapUint64(&ct.lines, 0) func (ct *ConsoleThrottler) Throttled() bool {
return ct.isThrottled.Get()
}
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed
// and number of activations, regardless of the current console message volume.
func (ct *ConsoleThrottler) StartTimer() {
ctx, cancel := context.WithCancel(context.Background())
reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond)
decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond)
go func() {
for {
select {
case <-ctx.Done():
reset.Stop()
return
case <-reset.C:
ct.isThrottled.Set(false)
atomic.StoreUint64(&ct.count, 0)
}
}
}()
go func() {
for {
select {
case <-ctx.Done():
decay.Stop()
return
case <-decay.C:
ct.markActivation(false)
}
}
}()
ct.timerCancel = &cancel
}
// Stops a running timer processes if one exists. This is only called when the server is deleted since
// we want this to always be running. If there is no process currently running nothing will really happen.
func (ct *ConsoleThrottler) StopTimer() {
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.timerCancel != nil {
c := *ct.timerCancel
c()
ct.timerCancel = nil
}
} }
// Handles output from a server's console. This code ensures that a server is not outputting // Handles output from a server's console. This code ensures that a server is not outputting
@@ -70,30 +121,41 @@ func (ct *ConsoleThrottler) ResetLineCount() {
// data all at once. These values are all configurable via the wings configuration file, however the // data all at once. These values are all configurable via the wings configuration file, however the
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite // defaults have been in the wild for almost two years at the time of this writing, so I feel quite
// confident in them. // confident in them.
func (ct *ConsoleThrottler) Handle() { //
// This function returns an error if the server should be stopped due to violating throttle constraints
// and a boolean value indicating if a throttle is being violated when it is checked.
func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
if !ct.Enabled {
return nil
}
// Increment the line count and if we have now output more lines than are allowed, trigger a throttle
// activation. Once the throttle is triggered and has passed the kill at value we will trigger a server
// stop automatically.
if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() {
ct.isThrottled.Set(true)
if ct.markActivation(true) >= ct.MaximumTriggerCount {
return ErrTooMuchConsoleData
}
onTrigger()
}
return nil
} }
// Returns the throttler instance for the server or creates a new one. // Returns the throttler instance for the server or creates a new one.
func (s *Server) Throttler() *ConsoleThrottler { func (s *Server) Throttler() *ConsoleThrottler {
s.throttleLock.RLock() s.throttleLock.Lock()
defer s.throttleLock.Unlock()
if s.throttler == nil { if s.throttler == nil {
// Release the read lock so that we can acquire a normal lock on the process and
// make modifications to the throttler.
s.throttleLock.RUnlock()
s.throttleLock.Lock()
s.throttler = &ConsoleThrottler{ s.throttler = &ConsoleThrottler{
ConsoleThrottles: config.Get().Throttles, ConsoleThrottles: config.Get().Throttles,
} }
s.throttleLock.Unlock() }
return s.throttler return s.throttler
} else {
defer s.throttleLock.RUnlock()
return s.throttler
}
} }
// Sends output to the server console formatted to appear correctly as being sent // Sends output to the server console formatted to appear correctly as being sent

View File

@@ -1,17 +1,9 @@
package server package server
type suspendedError struct { import "github.com/pkg/errors"
}
func (e *suspendedError) Error() string { var ErrIsRunning = errors.New("server is running")
return "server is currently in a suspended state" var ErrSuspended = errors.New("server is currently in a suspended state")
}
func IsSuspendedError(err error) bool {
_, ok := err.(*suspendedError)
return ok
}
type crashTooFrequent struct { type crashTooFrequent struct {
} }

View File

@@ -29,6 +29,8 @@ import (
// Error returned when there is a bad path provided to one of the FS calls. // Error returned when there is a bad path provided to one of the FS calls.
type PathResolutionError struct{} type PathResolutionError struct{}
var ErrNotEnoughDiskSpace = errors.New("not enough disk space is available to perform this operation")
// Returns the error response in a string form that can be more easily consumed. // Returns the error response in a string form that can be more easily consumed.
func (pre PathResolutionError) Error() string { func (pre PathResolutionError) Error() string {
return "invalid path resolution" return "invalid path resolution"
@@ -257,7 +259,7 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) { func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
// Check if cache is expired. // Check if cache is expired.
fs.lookupTimeMu.RLock() fs.lookupTimeMu.RLock()
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10)) isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * time.Duration(-1*config.Get().System.DiskCheckInterval)))
fs.lookupTimeMu.RUnlock() fs.lookupTimeMu.RUnlock()
if !isValidInCache { if !isValidInCache {
@@ -400,9 +402,10 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
currentSize = stat.Size() currentSize = stat.Size()
} }
o := &fileOpener{}
// This will either create the file if it does not already exist, or open and // This will either create the file if it does not already exist, or open and
// truncate the existing file. // truncate the existing file.
file, err := os.OpenFile(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) file, err := o.open(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@@ -782,6 +785,10 @@ func (fs *Filesystem) EnsureDataDirectory() error {
if err := os.MkdirAll(fs.Path(), 0700); err != nil { if err := os.MkdirAll(fs.Path(), 0700); err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
if err := fs.Chown("/"); err != nil {
fs.Server.Log().WithField("error", err).Warn("failed to chown server data directory")
}
} }
return nil return nil
@@ -933,3 +940,29 @@ func (fs *Filesystem) handleWalkerError(err error, f os.FileInfo) error {
return nil return nil
} }
type fileOpener struct {
busy uint
}
// Attempts to open a given file up to "attempts" number of times, using a backoff. If the file
// cannot be opened because of a "text file busy" error, we will attempt until the number of attempts
// has been exhaused, at which point we will abort with an error.
func (fo *fileOpener) open(path string, flags int, perm os.FileMode) (*os.File, error) {
for {
f, err := os.OpenFile(path, flags, perm)
// If there is an error because the text file is busy, go ahead and sleep for a few
// hundred milliseconds and then try again up to three times before just returning the
// error back to the caller.
//
// Based on code from: https://github.com/golang/go/issues/22220#issuecomment-336458122
if err != nil && fo.busy < 3 && strings.Contains(err.Error(), "text file busy") {
time.Sleep(100 * time.Millisecond << fo.busy)
fo.busy++
continue
}
return f, err
}
}

View File

@@ -32,14 +32,17 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
dirSize, err := fs.DiskUsage(false) dirSize, err := fs.DiskUsage(false)
var size int64 var size int64
var max = fs.Server.DiskSpace() * 1000.0 * 1000.0
// Walk over the archive and figure out just how large the final output would be from unarchiving it. // Walk over the archive and figure out just how large the final output would be from unarchiving it.
archiver.Walk(source, func(f archiver.File) error { err = archiver.Walk(source, func(f archiver.File) error {
atomic.AddInt64(&size, f.Size()) if atomic.AddInt64(&size, f.Size()) + dirSize > max {
return errors.WithStack(ErrNotEnoughDiskSpace)
}
return nil return nil
}) })
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), errors.WithStack(err) return err == nil, errors.WithStack(err)
} }
// Decompress a file in a given directory by using the archiver tool to infer the file // Decompress a file in a given directory by using the archiver tool to infer the file
@@ -80,6 +83,11 @@ func (fs *Filesystem) DecompressFile(dir string, file string) error {
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String())) return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
} }
return errors.Wrap(fs.Writefile(name, f), "could not extract file from archive") p, err := fs.SafePath(filepath.Join(dir, name))
if err != nil {
return errors.Wrap(err, "failed to generate a safe path to server file")
}
return errors.Wrap(fs.Writefile(p, f), "could not extract file from archive")
}) })
} }

View File

@@ -5,46 +5,75 @@ import (
"github.com/apex/log" "github.com/apex/log"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/api" "github.com/pterodactyl/wings/api"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/events" "github.com/pterodactyl/wings/events"
"regexp" "regexp"
"strconv" "strconv"
) )
// Adds all of the internal event listeners we want to use for a server. var dockerEvents = []string{
environment.DockerImagePullStatus,
environment.DockerImagePullStarted,
environment.DockerImagePullCompleted,
}
// Adds all of the internal event listeners we want to use for a server. These listeners can only be
// removed by deleting the server as they should last for the duration of the process' lifetime.
func (s *Server) StartEventListeners() { func (s *Server) StartEventListeners() {
console := make(chan events.Event) console := func(e events.Event) {
state := make(chan events.Event) t := s.Throttler()
stats := make(chan events.Event) err := t.Increment(func () {
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
})
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console) // An error is only returned if the server has breached the thresholds set.
s.Environment.Events().Subscribe(environment.StateChangeEvent, state) if err != nil {
s.Environment.Events().Subscribe(environment.ResourceEvent, stats) // If the process is already stopping, just let it continue with that action rather than attempting
// to terminate again.
if s.GetState() != environment.ProcessStoppingState {
s.SetState(environment.ProcessStoppingState)
go func() {
s.Log().Warn("stopping server instance, violating throttle limits")
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.")
// Completely skip over server power actions and terminate the running instance. This gives the
// server 15 seconds to finish stopping gracefully before it is forcefully terminated.
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil {
// If there is an error set the process back to running so that this throttler is called
// again and hopefully kills the server.
if s.GetState() != environment.ProcessOfflineState {
s.SetState(environment.ProcessRunningState)
}
go func(console chan events.Event) { s.Log().WithField("error", errors.WithStack(err)).Error("failed to terminate environment after triggering throttle")
for data := range console { }
// Immediately emit this event back over the server event stream since it is }()
// being called from the environment event stream and things probably aren't }
// listening to that event. }
s.Events().Publish(ConsoleOutputEvent, data.Data)
// If we are not throttled, go ahead and output the data.
if !t.Throttled() {
s.Events().Publish(ConsoleOutputEvent, e.Data)
}
// Also pass the data along to the console output channel. // Also pass the data along to the console output channel.
s.onConsoleOutput(data.Data) s.onConsoleOutput(e.Data)
} }
}(console)
go func(state chan events.Event) { state := func(e events.Event) {
for data := range state { // Reset the throttler when the process is started.
s.SetState(data.Data) if e.Data == environment.ProcessStartingState {
s.Throttler().Reset()
} }
}(state)
go func(stats chan events.Event) { s.SetState(e.Data)
for data := range stats { }
stats := func(e events.Event) {
st := new(environment.Stats) st := new(environment.Stats)
if err := json.Unmarshal([]byte(data.Data), st); err != nil { if err := json.Unmarshal([]byte(e.Data), st); err != nil {
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats") s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
continue return
} }
// Update the server resource tracking object with the resources we got here. // Update the server resource tracking object with the resources we got here.
@@ -56,7 +85,24 @@ func (s *Server) StartEventListeners() {
s.emitProcUsage() s.emitProcUsage()
} }
}(stats)
docker := func(e events.Event) {
if e.Topic == environment.DockerImagePullStatus {
s.Events().Publish(InstallOutputEvent, e.Data)
} else if e.Topic == environment.DockerImagePullStarted {
s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...")
} else {
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
}
}
s.Log().Info("registering event listeners: console, state, resources...")
s.Environment.Events().On(environment.ConsoleOutputEvent, &console)
s.Environment.Events().On(environment.StateChangeEvent, &state)
s.Environment.Events().On(environment.ResourceEvent, &stats)
for _, evt := range dockerEvents {
s.Environment.Events().On(evt, &docker)
}
} }
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")

View File

@@ -37,12 +37,6 @@ func LoadDirectory() error {
return errors.New(rerr.String()) return errors.New(rerr.String())
} }
log.Debug("retrieving cached server states from disk")
states, err := getServerStates()
if err != nil {
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
}
start := time.Now() start := time.Now()
log.WithField("total_configs", len(configs)).Info("processing servers returned by the API") log.WithField("total_configs", len(configs)).Info("processing servers returned by the API")
@@ -59,11 +53,6 @@ func LoadDirectory() error {
return return
} }
if state, exists := states[s.Id()]; exists {
s.Log().WithField("state", state).Debug("found existing server state in cache file; re-instantiating server state")
s.SetState(state)
}
servers.Add(s) servers.Add(s)
}) })
} }
@@ -97,6 +86,9 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
return nil, err return nil, err
} }
s.resources = ResourceUsage{}
defaults.Set(&s.resources)
s.Archiver = Archiver{Server: s} s.Archiver = Archiver{Server: s}
s.Filesystem = Filesystem{Server: s} s.Filesystem = Filesystem{Server: s}
@@ -118,7 +110,8 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
return nil, err return nil, err
} else { } else {
s.Environment = env s.Environment = env
go s.StartEventListeners() s.StartEventListeners()
s.Throttler().StartTimer()
} }
// Forces the configuration to be synced with the panel. // Forces the configuration to be synced with the panel.

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"os" "os"
"time" "time"
@@ -87,6 +88,10 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
switch action { switch action {
case PowerActionStart: case PowerActionStart:
if s.GetState() != environment.ProcessOfflineState {
return ErrIsRunning
}
// Run the pre-boot logic for the server before processing the environment start. // Run the pre-boot logic for the server before processing the environment start.
if err := s.onBeforeStart(); err != nil { if err := s.onBeforeStart(); err != nil {
return err return err
@@ -134,7 +139,7 @@ func (s *Server) onBeforeStart() error {
// Disallow start & restart if the server is suspended. Do this check after performing a sync // Disallow start & restart if the server is suspended. Do this check after performing a sync
// action with the Panel to ensure that we have the most up-to-date information for that server. // action with the Panel to ensure that we have the most up-to-date information for that server.
if s.IsSuspended() { if s.IsSuspended() {
return new(suspendedError) return ErrSuspended
} }
// Ensure we sync the server information with the environment so that any new environment variables // Ensure we sync the server information with the environment so that any new environment variables

View File

@@ -1,7 +1,6 @@
package server package server
import ( import (
"encoding/json"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"sync" "sync"
) )
@@ -37,14 +36,10 @@ func (s *Server) Proc() *ResourceUsage {
func (s *Server) emitProcUsage() { func (s *Server) emitProcUsage() {
s.resources.mu.RLock() s.resources.mu.RLock()
defer s.resources.mu.RUnlock() if err := s.Events().PublishJson(StatsEvent, s.resources); err != nil {
s.Log().WithField("error", err).Warn("error while emitting server resource usage to listeners")
b, err := json.Marshal(s.resources)
if err == nil {
s.Events().Publish(StatsEvent, string(b))
} }
s.resources.mu.RUnlock()
// TODO: This might be a good place to add a debug log if stats are not sending.
} }
// Returns the servers current state. // Returns the servers current state.

View File

@@ -22,7 +22,7 @@ type Server struct {
sync.RWMutex sync.RWMutex
emitterLock sync.Mutex emitterLock sync.Mutex
powerLock *semaphore.Weighted powerLock *semaphore.Weighted
throttleLock sync.RWMutex throttleLock sync.Mutex
// Maintains the configuration for the server. This is the data that gets returned by the Panel // Maintains the configuration for the server. This is the data that gets returned by the Panel
// such as build settings and container images. // such as build settings and container images.
@@ -137,6 +137,7 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
// the process isn't just terminated when a user requests it be stopped. // the process isn't just terminated when a user requests it be stopped.
if e, ok := s.Environment.(*docker.Environment); ok { if e, ok := s.Environment.(*docker.Environment); ok {
s.Log().Debug("syncing stop configuration with configured docker environment") s.Log().Debug("syncing stop configuration with configured docker environment")
e.SetImage(s.Config().Container.Image)
e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop) e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop)
} }

View File

@@ -15,7 +15,7 @@ import (
var stateMutex sync.Mutex var stateMutex sync.Mutex
// Returns the state of the servers. // Returns the state of the servers.
func getServerStates() (map[string]string, error) { func CachedServerStates() (map[string]string, error) {
// Request a lock after we check if the file exists. // Request a lock after we check if the file exists.
stateMutex.Lock() stateMutex.Lock()
defer stateMutex.Unlock() defer stateMutex.Unlock()
@@ -78,8 +78,8 @@ func (s *Server) SetState(state string) error {
// Emit the event to any listeners that are currently registered. // Emit the event to any listeners that are currently registered.
if prevState != state { if prevState != state {
s.Log().WithField("status", s.Proc().State).Debug("saw server status change event") s.Log().WithField("status", s.Proc().getInternalState()).Debug("saw server status change event")
s.Events().Publish(StatusEvent, s.Proc().State) s.Events().Publish(StatusEvent, s.Proc().getInternalState())
} }
// Persist this change to the disk immediately so that should the Daemon be stopped or // Persist this change to the disk immediately so that should the Daemon be stopped or

20
system/bool.go Normal file
View File

@@ -0,0 +1,20 @@
package system
import "sync/atomic"
type AtomicBool struct {
flag uint32
}
func (ab *AtomicBool) Set(v bool) {
i := 0
if v {
i = 1
}
atomic.StoreUint32(&ab.flag, uint32(i))
}
func (ab *AtomicBool) Get() bool {
return atomic.LoadUint32(&ab.flag) == 1
}

View File

@@ -1,13 +0,0 @@
{{.LogDirectory}}/wings.log {
size 10M
compress
delaycompress
dateext
maxage 7
missingok
notifempty
create 0640 {{.User.Uid}} {{.User.Gid}}
postrotate
killall -SIGHUP wings
endscript
}