Compare commits
32 Commits
v1.0.0-rc.
...
v1.0.0-rc.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
033e8e7573 | ||
|
|
aa78071543 | ||
|
|
48aeeff818 | ||
|
|
864c37f17c | ||
|
|
c7405aebe5 | ||
|
|
9ff2d53466 | ||
|
|
6ba49df485 | ||
|
|
6b25ac3665 | ||
|
|
783832fc71 | ||
|
|
815539b3da | ||
|
|
6ba1b75696 | ||
|
|
ce76b9339e | ||
|
|
6ba15e9884 | ||
|
|
f2a6d6b3c5 | ||
|
|
0295603943 | ||
|
|
ce2659fdd7 | ||
|
|
be49e08f4f | ||
|
|
3ee76ea2bc | ||
|
|
d7fbf29cc1 | ||
|
|
d02e37620d | ||
|
|
53bd0d57ad | ||
|
|
b779c98717 | ||
|
|
4ac19bd29d | ||
|
|
8407ea21da | ||
|
|
fa6f56caa8 | ||
|
|
5a62f83ec8 | ||
|
|
8bcb3d7c62 | ||
|
|
b2eebcaf6d | ||
|
|
45bcb9cd68 | ||
|
|
e1ff4db330 | ||
|
|
606143b3ad | ||
|
|
57221bdd30 |
4
.github/workflows/build-test.yml
vendored
4
.github/workflows/build-test.yml
vendored
@@ -14,10 +14,10 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.15'
|
||||
go-version: '1.15.2'
|
||||
|
||||
- name: Build
|
||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
|
||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
|
||||
|
||||
- name: Test
|
||||
run: go test ./...
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -12,12 +12,12 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.15'
|
||||
go-version: '1.15.2'
|
||||
|
||||
- name: Build
|
||||
env:
|
||||
REF: ${{ github.ref }}
|
||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
|
||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
|
||||
|
||||
- name: Test
|
||||
run: go test ./...
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# Pterodactyl Panel Dockerfile
|
||||
# ----------------------------------
|
||||
|
||||
FROM golang:1.14-alpine
|
||||
FROM golang:1.15-alpine
|
||||
COPY . /go/wings/
|
||||
WORKDIR /go/wings/
|
||||
RUN apk add --no-cache upx \
|
||||
@@ -11,4 +11,4 @@ RUN apk add --no-cache upx \
|
||||
|
||||
FROM alpine:latest
|
||||
COPY --from=0 /go/wings/wings /usr/bin/
|
||||
CMD ["wings","--config", "/var/lib/pterodactyl/config.yml"]
|
||||
CMD ["wings","--config", "/etc/pterodactyl/config.yml"]
|
||||
@@ -1,6 +1,7 @@
|
||||
[](https://pterodactyl.io)
|
||||
|
||||
[](https://pterodactyl.io/discord)
|
||||
[](https://goreportcard.com/report/github.com/pterodactyl/wings)
|
||||
|
||||
# Pterodactyl Wings
|
||||
Wings is Pterodactyl's server control plane, built for the rapidly changing gaming industry and designed to be
|
||||
@@ -32,4 +33,4 @@ I would like to extend my sincere thanks to the following sponsors for helping f
|
||||
## Reporting Issues
|
||||
Please use the [pterodactyl/panel](https://github.com/pterodactyl/panel) repository to report any issues or make
|
||||
feature requests for Wings. In addition, the [security policy](https://github.com/pterodactyl/panel/security/policy) listed
|
||||
within that repository also applies to Wings.
|
||||
within that repository also applies to Wings.
|
||||
|
||||
18
api/api.go
18
api/api.go
@@ -137,14 +137,15 @@ func IsRequestError(err error) bool {
|
||||
}
|
||||
|
||||
type RequestError struct {
|
||||
Code string `json:"code"`
|
||||
Status string `json:"status"`
|
||||
Detail string `json:"detail"`
|
||||
response *http.Response
|
||||
Code string `json:"code"`
|
||||
Status string `json:"status"`
|
||||
Detail string `json:"detail"`
|
||||
}
|
||||
|
||||
// Returns the error response in a string form that can be more easily consumed.
|
||||
func (re *RequestError) Error() string {
|
||||
return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%s)", re.Code, re.Detail, re.Status)
|
||||
return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%d)", re.Code, re.Detail, re.response.StatusCode)
|
||||
}
|
||||
|
||||
func (re *RequestError) String() string {
|
||||
@@ -165,9 +166,12 @@ func (r *PanelRequest) Error() *RequestError {
|
||||
bag := RequestErrorBag{}
|
||||
json.Unmarshal(body, &bag)
|
||||
|
||||
if len(bag.Errors) == 0 {
|
||||
return new(RequestError)
|
||||
e := new(RequestError)
|
||||
if len(bag.Errors) > 0 {
|
||||
e = &bag.Errors[0]
|
||||
}
|
||||
|
||||
return &bag.Errors[0]
|
||||
e.response = r.Response
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
@@ -68,6 +68,12 @@ func (r *PanelRequest) ValidateSftpCredentials(request SftpAuthRequest) (*SftpAu
|
||||
|
||||
if r.HasError() {
|
||||
if r.HttpResponseCode() >= 400 && r.HttpResponseCode() < 500 {
|
||||
log.WithFields(log.Fields{
|
||||
"subsystem": "sftp",
|
||||
"username": request.User,
|
||||
"ip": request.IP,
|
||||
}).Warn(r.Error().String())
|
||||
|
||||
return nil, new(sftpInvalidCredentialsError)
|
||||
}
|
||||
|
||||
|
||||
44
cmd/root.go
44
cmd/root.go
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/NYTimes/logrotate"
|
||||
"github.com/apex/log/handlers/multi"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/gammazero/workerpool"
|
||||
"golang.org/x/crypto/acme"
|
||||
"net/http"
|
||||
@@ -172,6 +173,11 @@ func rootCmdRun(*cobra.Command, []string) {
|
||||
log.WithField("server", s.Id()).Info("loaded configuration for server")
|
||||
}
|
||||
|
||||
states, err := server.CachedServerStates()
|
||||
if err != nil {
|
||||
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
|
||||
}
|
||||
|
||||
// Create a new workerpool that limits us to 4 servers being bootstrapped at a time
|
||||
// on Wings. This allows us to ensure the environment exists, write configurations,
|
||||
// and reboot processes without causing a slow-down due to sequential booting.
|
||||
@@ -181,25 +187,39 @@ func rootCmdRun(*cobra.Command, []string) {
|
||||
s := serv
|
||||
|
||||
pool.Submit(func() {
|
||||
s.Log().Info("ensuring server environment exists")
|
||||
// Create a server environment if none exists currently. This allows us to recover from Docker
|
||||
// being reinstalled on the host system for example.
|
||||
if err := s.Environment.Create(); err != nil {
|
||||
s.Log().WithField("error", err).Error("failed to process environment")
|
||||
s.Log().Info("configuring server environment and restoring to previous state")
|
||||
|
||||
var st string
|
||||
if state, exists := states[s.Id()]; exists {
|
||||
st = state
|
||||
}
|
||||
|
||||
r, err := s.Environment.IsRunning()
|
||||
if err != nil {
|
||||
// We ignore missing containers because we don't want to actually block booting of wings at this
|
||||
// point. If we didn't do this and you pruned all of the images and then started wings you could
|
||||
// end up waiting a long period of time for all of the images to be re-pulled on Wings boot rather
|
||||
// than when the server itself is started.
|
||||
if err != nil && !client.IsErrNotFound(err) {
|
||||
s.Log().WithField("error", err).Error("error checking server environment status")
|
||||
}
|
||||
|
||||
// If the server is currently running on Docker, mark the process as being in that state.
|
||||
// We never want to stop an instance that is currently running external from Wings since
|
||||
// that is a good way of keeping things running even if Wings gets in a very corrupted state.
|
||||
// Check if the server was previously running. If so, attempt to start the server now so that Wings
|
||||
// can pick up where it left off. If the environment does not exist at all, just create it and then allow
|
||||
// the normal flow to execute.
|
||||
//
|
||||
// This will also validate that a server process is running if the last tracked state we have
|
||||
// is that it was running, but we see that the container process is not currently running.
|
||||
if r || (!r && s.IsRunning()) {
|
||||
// This does mean that booting wings after a catastrophic machine crash and wiping out the Docker images
|
||||
// as a result will result in a slow boot.
|
||||
if !r && (st == environment.ProcessRunningState || st == environment.ProcessStartingState) {
|
||||
if err := s.HandlePowerAction(server.PowerActionStart); err != nil {
|
||||
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to return server to running state")
|
||||
}
|
||||
} else if r || (!r && s.IsRunning()) {
|
||||
// If the server is currently running on Docker, mark the process as being in that state.
|
||||
// We never want to stop an instance that is currently running external from Wings since
|
||||
// that is a good way of keeping things running even if Wings gets in a very corrupted state.
|
||||
//
|
||||
// This will also validate that a server process is running if the last tracked state we have
|
||||
// is that it was running, but we see that the container process is not currently running.
|
||||
s.Log().Info("detected server is running, re-attaching to process...")
|
||||
|
||||
s.SetState(environment.ProcessRunningState)
|
||||
|
||||
@@ -36,6 +36,12 @@ type SystemConfiguration struct {
|
||||
Gid int
|
||||
}
|
||||
|
||||
// The amount of time in seconds that can elapse before a server's disk space calculation is
|
||||
// considered stale and a re-check should occur. DANGER: setting this value too low can seriously
|
||||
// impact system performance and cause massive I/O bottlenecks and high CPU usage for the Wings
|
||||
// process.
|
||||
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
|
||||
|
||||
// Determines if Wings should detect a server that stops with a normal exit code of
|
||||
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
|
||||
// the user did not press the stop button, but the process stopped cleanly.
|
||||
@@ -129,7 +135,21 @@ func (sc *SystemConfiguration) EnableLogRotation() error {
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
t, err := template.ParseFiles("templates/logrotate.tpl")
|
||||
t, err := template.New("logrotate").Parse(`
|
||||
{{.LogDirectory}}/wings.log {
|
||||
size 10M
|
||||
compress
|
||||
delaycompress
|
||||
dateext
|
||||
maxage 7
|
||||
missingok
|
||||
notifempty
|
||||
create 0640 {{.User.Uid}} {{.User.Gid}}
|
||||
postrotate
|
||||
killall -SIGHUP wings
|
||||
endscript
|
||||
}`)
|
||||
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@@ -4,20 +4,24 @@ type ConsoleThrottles struct {
|
||||
// Whether or not the throttler is enabled for this instance.
|
||||
Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
|
||||
|
||||
// The total number of throttle activations that must accumulate before a server is
|
||||
// forcibly stopped for violating these limits.
|
||||
KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"`
|
||||
|
||||
// The amount of time in milliseconds that a server process must go through without
|
||||
// triggering an output warning before the throttle activation count begins decreasing.
|
||||
// This time is measured in milliseconds.
|
||||
Decay uint64 `json:"decay" yaml:"decay" default:"10000"`
|
||||
|
||||
// The total number of lines that can be output in a given CheckInterval period before
|
||||
// The total number of lines that can be output in a given LineResetInterval period before
|
||||
// a warning is triggered and counted against the server.
|
||||
Lines uint64 `json:"lines" yaml:"lines" default:"1000"`
|
||||
Lines uint64 `json:"lines" yaml:"lines" default:"2000"`
|
||||
|
||||
// The amount of time that must pass between intervals before the count is reset. This
|
||||
// value is in milliseconds.
|
||||
CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"`
|
||||
// The total number of throttle activations that can accumulate before a server is considered
|
||||
// to be breaching and will be stopped. This value is decremented by one every DecayInterval.
|
||||
MaximumTriggerCount uint64 `json:"maximum_trigger_count" yaml:"maximum_trigger_count" default:"5"`
|
||||
|
||||
// The amount of time after which the number of lines processed is reset to 0. This runs in
|
||||
// a constant loop and is not affected by the current console output volumes. By default, this
|
||||
// will reset the processed line count back to 0 every 100ms.
|
||||
LineResetInterval uint64 `json:"line_reset_interval" yaml:"line_reset_interval" default:"100"`
|
||||
|
||||
// The amount of time in milliseconds that must pass without an output warning being triggered
|
||||
// before a throttle activation is decremented.
|
||||
DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"`
|
||||
|
||||
// The amount of time that a server is allowed to be stopping for before it is terminated
|
||||
// forfully if it triggers output throttles.
|
||||
StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"`
|
||||
}
|
||||
|
||||
@@ -1,26 +1,35 @@
|
||||
version: '3'
|
||||
version: '3.5'
|
||||
services:
|
||||
daemon:
|
||||
build: .
|
||||
restart: always
|
||||
hostname: daemon
|
||||
networks:
|
||||
- daemon0
|
||||
ports:
|
||||
- "8080:8080"
|
||||
- "2022:2022"
|
||||
tty: true
|
||||
environment:
|
||||
- "DEBUG=false"
|
||||
- "TZ=UTC" # change to the three letter timezone of your choosing
|
||||
volumes:
|
||||
- "/var/run/docker.sock:/var/run/docker.sock"
|
||||
- "/var/lib/docker/containers/:/var/lib/docker/containers/"
|
||||
- "/etc/pterodactyl/:/etc/pterodactyl/"
|
||||
- "/var/lib/pterodactyl/:/var/lib/pterodactyl/"
|
||||
- "/srv/daemon-data/:/srv/daemon-data/"
|
||||
- "/var/log/pterodactyl/:/var/log/pterodactyl/"
|
||||
- "/tmp/pterodactyl/:/tmp/pterodactyl/"
|
||||
- "/etc/timezone:/etc/timezone:ro"
|
||||
## you may need /srv/daemon-data if you are upgrading from an old daemon
|
||||
## - "/srv/daemon-data/:/srv/daemon-data/"
|
||||
## Required for ssl if you user let's encrypt. uncomment to use.
|
||||
## - "/etc/letsencrypt/:/etc/letsencrypt/"
|
||||
|
||||
networks:
|
||||
default:
|
||||
daemon0:
|
||||
name: daemon0
|
||||
driver: bridge
|
||||
ipam:
|
||||
config:
|
||||
- subnet: 172.21.0.0/16
|
||||
- subnet: "172.21.0.0/16"
|
||||
driver_opts:
|
||||
com.docker.network.bridge.name: daemon0
|
||||
@@ -3,6 +3,7 @@ package docker
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/docker/docker/api/types"
|
||||
@@ -19,6 +20,11 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type imagePullStatus struct {
|
||||
Status string `json:"status"`
|
||||
Progress string `json:"progress"`
|
||||
}
|
||||
|
||||
// Attaches to the docker container itself and ensures that we can pipe data in and out
|
||||
// of the process stream. This should not be used for reading console data as you *will*
|
||||
// miss important output at the beginning because of the time delay with attaching to the
|
||||
@@ -148,7 +154,7 @@ func (e *Environment) Create() error {
|
||||
// Convert 127.0.0.1 to the pterodactyl0 network interface if the environment is Docker
|
||||
// so that the server operates as expected.
|
||||
if v == "SERVER_IP=127.0.0.1" {
|
||||
evs[i] = "SERVER_IP="+config.Get().Docker.Network.Interface
|
||||
evs[i] = "SERVER_IP=" + config.Get().Docker.Network.Interface
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,6 +313,9 @@ func (e *Environment) followOutput() error {
|
||||
//
|
||||
// TODO: local images
|
||||
func (e *Environment) ensureImageExists(image string) error {
|
||||
e.Events().Publish(environment.DockerImagePullStarted, "")
|
||||
defer e.Events().Publish(environment.DockerImagePullCompleted, "")
|
||||
|
||||
// Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an
|
||||
// image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull
|
||||
// an image. Let me know when I am inevitably wrong here...
|
||||
@@ -374,12 +383,18 @@ func (e *Environment) ensureImageExists(image string) error {
|
||||
// is done being pulled, which is what we need.
|
||||
scanner := bufio.NewScanner(out)
|
||||
for scanner.Scan() {
|
||||
continue
|
||||
s := imagePullStatus{}
|
||||
fmt.Println(scanner.Text())
|
||||
if err := json.Unmarshal(scanner.Bytes(), &s); err == nil {
|
||||
e.Events().Publish(environment.DockerImagePullStatus, s.Status+" "+s.Progress)
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithField("image", image).Debug("completed docker image pull")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -176,3 +176,9 @@ func (e *Environment) SetStopConfiguration(c *api.ProcessStopConfiguration) {
|
||||
e.meta.Stop = c
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
func (e *Environment) SetImage(i string) {
|
||||
e.mu.Lock()
|
||||
e.meta.Image = i
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -183,13 +183,21 @@ func (e *Environment) WaitForStop(seconds uint, terminate bool) error {
|
||||
case <-ctx.Done():
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
if terminate {
|
||||
return e.Terminate(os.Kill)
|
||||
log.WithField("container_id", e.Id).Debug("server did not stop in time, executing process termination")
|
||||
|
||||
return errors.WithStack(e.Terminate(os.Kill))
|
||||
}
|
||||
|
||||
return errors.WithStack(ctxErr)
|
||||
}
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
if terminate {
|
||||
log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while waiting for container stop, attempting process termination")
|
||||
|
||||
return errors.WithStack(e.Terminate(os.Kill))
|
||||
}
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
case <-ok:
|
||||
|
||||
@@ -6,9 +6,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ConsoleOutputEvent = "console output"
|
||||
StateChangeEvent = "state change"
|
||||
ResourceEvent = "resources"
|
||||
ConsoleOutputEvent = "console output"
|
||||
StateChangeEvent = "state change"
|
||||
ResourceEvent = "resources"
|
||||
DockerImagePullStarted = "docker image pull started"
|
||||
DockerImagePullStatus = "docker image pull status"
|
||||
DockerImagePullCompleted = "docker image pull completed"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
101
events/events.go
101
events/events.go
@@ -2,6 +2,8 @@ package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/gammazero/workerpool"
|
||||
"github.com/pkg/errors"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
@@ -12,14 +14,13 @@ type Event struct {
|
||||
}
|
||||
|
||||
type EventBus struct {
|
||||
sync.RWMutex
|
||||
|
||||
subscribers map[string]map[chan Event]struct{}
|
||||
mu sync.RWMutex
|
||||
pools map[string]*CallbackPool
|
||||
}
|
||||
|
||||
func New() *EventBus {
|
||||
return &EventBus{
|
||||
subscribers: make(map[string]map[chan Event]struct{}),
|
||||
pools: make(map[string]*CallbackPool),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,25 +40,36 @@ func (e *EventBus) Publish(topic string, data string) {
|
||||
}
|
||||
}
|
||||
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
||||
// avoids a panic crash if the process tries to unregister the channel while this routine
|
||||
// is running.
|
||||
go func() {
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
|
||||
if ch, ok := e.subscribers[t]; ok {
|
||||
for channel := range ch {
|
||||
channel <- Event{Data: data, Topic: topic}
|
||||
}
|
||||
if cp, ok := e.pools[t]; ok {
|
||||
for _, callback := range cp.callbacks {
|
||||
c := *callback
|
||||
evt := Event{Data: data, Topic: topic}
|
||||
// Using the workerpool with one worker allows us to execute events in a FIFO manner. Running
|
||||
// this using goroutines would cause things such as console output to just output in random order
|
||||
// if more than one event is fired at the same time.
|
||||
//
|
||||
// However, the pool submission does not block the execution of this function itself, allowing
|
||||
// us to call publish without blocking any of the other pathways.
|
||||
//
|
||||
// @see https://github.com/pterodactyl/panel/issues/2303
|
||||
cp.pool.Submit(func() {
|
||||
c(evt)
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Publishes a JSON message to a given topic.
|
||||
func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
e.Publish(topic, string(b))
|
||||
@@ -65,41 +77,46 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to an emitter topic using a channel.
|
||||
func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// Register a callback function that will be executed each time one of the events using the topic
|
||||
// name is called.
|
||||
func (e *EventBus) On(topic string, callback *func(Event)) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if _, exists := e.subscribers[topic]; !exists {
|
||||
e.subscribers[topic] = make(map[chan Event]struct{})
|
||||
// Check if this topic has been registered at least once for the event listener, and if
|
||||
// not create an empty struct for the topic.
|
||||
if _, exists := e.pools[topic]; !exists {
|
||||
e.pools[topic] = &CallbackPool{
|
||||
callbacks: make([]*func(Event), 0),
|
||||
pool: workerpool.New(1),
|
||||
}
|
||||
}
|
||||
|
||||
// Only set the channel if there is not currently a matching one for this topic. This
|
||||
// avoids registering two identical listeners for the same topic and causing pain in
|
||||
// the unsubscribe functionality as well.
|
||||
if _, exists := e.subscribers[topic][ch]; !exists {
|
||||
e.subscribers[topic][ch] = struct{}{}
|
||||
// If this callback is not already registered as an event listener, go ahead and append
|
||||
// it to the array of callbacks for this topic.
|
||||
e.pools[topic].Add(callback)
|
||||
}
|
||||
|
||||
// Removes an event listener from the bus.
|
||||
func (e *EventBus) Off(topic string, callback *func(Event)) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if cp, ok := e.pools[topic]; ok {
|
||||
cp.Remove(callback)
|
||||
}
|
||||
}
|
||||
|
||||
// Unsubscribe a channel from a given topic.
|
||||
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// Removes all of the event listeners that have been registered for any topic. Also stops the worker
|
||||
// pool to close that routine.
|
||||
func (e *EventBus) Destroy() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if _, exists := e.subscribers[topic][ch]; exists {
|
||||
delete(e.subscribers[topic], ch)
|
||||
// Stop every pool that exists for a given callback topic.
|
||||
for _, cp := range e.pools {
|
||||
cp.pool.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Removes all of the event listeners for the server. This is used when a server
|
||||
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
|
||||
// should also check elsewhere and handle a server reference going nil, but this
|
||||
// won't hurt.
|
||||
func (e *EventBus) UnsubscribeAll() {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
// Reset the entire struct into an empty map.
|
||||
e.subscribers = make(map[string]map[chan Event]struct{})
|
||||
e.pools = make(map[string]*CallbackPool)
|
||||
}
|
||||
|
||||
49
events/pool.go
Normal file
49
events/pool.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"github.com/gammazero/workerpool"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type CallbackPool struct {
|
||||
callbacks []*func(Event)
|
||||
pool *workerpool.WorkerPool
|
||||
}
|
||||
|
||||
// Pushes a new callback into the array of listeners for the pool.
|
||||
func (cp *CallbackPool) Add(callback *func(Event)) {
|
||||
if cp.index(reflect.ValueOf(callback)) < 0 {
|
||||
cp.callbacks = append(cp.callbacks, callback)
|
||||
}
|
||||
}
|
||||
|
||||
// Removes a callback from the array of registered callbacks if it exists.
|
||||
func (cp *CallbackPool) Remove(callback *func(Event)) {
|
||||
i := cp.index(reflect.ValueOf(callback))
|
||||
|
||||
// If i < 0 it means there was no index found for the given callback, meaning it was
|
||||
// never registered or was already unregistered from the listeners. Also double check
|
||||
// that we didn't somehow escape the length of the topic callback (not sure how that
|
||||
// would happen, but lets avoid a panic condition).
|
||||
if i < 0 || i >= len(cp.callbacks) {
|
||||
return
|
||||
}
|
||||
|
||||
// We can assume that the topic still exists at this point since we acquire an exclusive
|
||||
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
|
||||
// no topic already existing.
|
||||
cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...)
|
||||
}
|
||||
|
||||
// Finds the index of a given callback in the topic by comparing all of the registered callback
|
||||
// pointers to the passed function. This function does not aquire a lock as it should only be called
|
||||
// within the confines of a function that has already acquired a lock for the duration of the lookup.
|
||||
func (cp *CallbackPool) index(v reflect.Value) int {
|
||||
for i, handler := range cp.callbacks {
|
||||
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
@@ -2,16 +2,12 @@ package installer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"github.com/asaskevich/govalidator"
|
||||
"github.com/buger/jsonparser"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/api"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/environment"
|
||||
"github.com/pterodactyl/wings/server"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
type Installer struct {
|
||||
@@ -95,33 +91,6 @@ func (i *Installer) Server() *server.Server {
|
||||
return i.server
|
||||
}
|
||||
|
||||
// Executes the installer process, creating the server and running through the
|
||||
// associated installation process based on the parameters passed through for
|
||||
// the server instance.
|
||||
func (i *Installer) Execute() {
|
||||
p := path.Join(config.Get().System.Data, i.Uuid())
|
||||
l := log.WithFields(log.Fields{"server": i.Uuid(), "process": "installer"})
|
||||
|
||||
l.WithField("path", p).Debug("creating required server data directory")
|
||||
if err := os.MkdirAll(p, 0755); err != nil {
|
||||
l.WithFields(log.Fields{"path": p, "error": errors.WithStack(err)}).Error("failed to create server data directory")
|
||||
return
|
||||
}
|
||||
|
||||
if err := os.Chown(p, config.Get().System.User.Uid, config.Get().System.User.Gid); err != nil {
|
||||
l.WithField("error", errors.WithStack(err)).Error("failed to chown server data directory")
|
||||
return
|
||||
}
|
||||
|
||||
l.Debug("creating required environment for server instance")
|
||||
if err := i.server.Environment.Create(); err != nil {
|
||||
l.WithField("error", err).Error("failed to create environment for server")
|
||||
return
|
||||
}
|
||||
|
||||
l.Info("successfully created environment for server during install process")
|
||||
}
|
||||
|
||||
// Returns a string value from the JSON data provided.
|
||||
func getString(data []byte, key ...string) string {
|
||||
value, _ := jsonparser.GetString(data, key...)
|
||||
|
||||
@@ -198,7 +198,8 @@ func deleteServer(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Unsubscribe all of the event listeners.
|
||||
s.Events().UnsubscribeAll()
|
||||
s.Events().Destroy()
|
||||
s.Throttler().StopTimer()
|
||||
|
||||
// Destroy the environment; in Docker this will handle a running container and
|
||||
// forcibly terminate it before removing the container, so we do not need to handle
|
||||
|
||||
@@ -339,10 +339,22 @@ func postServerDecompressFiles(c *gin.Context) {
|
||||
}
|
||||
|
||||
if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil {
|
||||
// Check if the file does not exist.
|
||||
// NOTE: os.IsNotExist() does not work if the error is wrapped.
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
c.Status(http.StatusNotFound)
|
||||
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
|
||||
"error": "The requested archive was not found.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// If the file is busy for some reason just return a nicer error to the user since there is not
|
||||
// much we specifically can do. They'll need to stop the running server process in order to overwrite
|
||||
// a file like this.
|
||||
if strings.Contains(err.Error(), "text file busy") {
|
||||
s.Log().WithField("error", err).Warn("failed to decompress file due to busy text file")
|
||||
|
||||
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
|
||||
"error": "One or more files this archive is attempting to overwrite are currently in use by another process. Please try again.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,11 @@ func postCreateServer(c *gin.Context) {
|
||||
// cycle. If there are any errors they will be logged and communicated back
|
||||
// to the Panel where a reinstall may take place.
|
||||
go func(i *installer.Installer) {
|
||||
i.Execute()
|
||||
err := i.Server().CreateEnvironment()
|
||||
if err != nil {
|
||||
i.Server().Log().WithField("error", err).Error("failed to create server environment during install process")
|
||||
return
|
||||
}
|
||||
|
||||
if err := i.Server().Install(false); err != nil {
|
||||
log.WithFields(log.Fields{"server": i.Uuid(), "error": err}).Error("failed to run install process for server")
|
||||
|
||||
@@ -277,7 +277,10 @@ func postTransfer(c *gin.Context) {
|
||||
server.GetServers().Add(i.Server())
|
||||
|
||||
// Create the server's environment (note this does not execute the install script)
|
||||
i.Execute()
|
||||
if err := i.Server().CreateEnvironment(); err != nil {
|
||||
l.WithField("error", err).Error("failed to create server environment")
|
||||
return
|
||||
}
|
||||
|
||||
// Un-archive the archive. That sounds weird..
|
||||
if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem.Path()); err != nil {
|
||||
|
||||
@@ -50,24 +50,26 @@ var e = []string{
|
||||
// Listens for different events happening on a server and sends them along
|
||||
// to the connected websocket.
|
||||
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
||||
eventChannel := make(chan events.Event)
|
||||
for _, event := range e {
|
||||
h.server.Events().Subscribe(event, eventChannel)
|
||||
}
|
||||
|
||||
for d := range eventChannel {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
for _, event := range e {
|
||||
h.server.Events().Unsubscribe(event, eventChannel)
|
||||
}
|
||||
|
||||
close(eventChannel)
|
||||
default:
|
||||
_ = h.SendJson(&Message{
|
||||
Event: d.Topic,
|
||||
Args: []string{d.Data},
|
||||
})
|
||||
h.server.Log().Debug("listening for server events over websocket")
|
||||
callback := func(e events.Event) {
|
||||
if err := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); err != nil {
|
||||
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to all of the events with the same callback that will push the data out over the
|
||||
// websocket for the server.
|
||||
for _, evt := range e {
|
||||
h.server.Events().On(evt, &callback)
|
||||
}
|
||||
|
||||
go func(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Once this context is stopped, de-register all of the listeners that have been registered.
|
||||
for _, evt := range e {
|
||||
h.server.Events().Off(evt, &callback)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
@@ -12,12 +12,7 @@ const (
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
// The event to perform. Should be one of the following that are supported:
|
||||
//
|
||||
// - status : Returns the server's power state.
|
||||
// - logs : Returns the server log data at the time of the request.
|
||||
// - power : Performs a power action against the server based the data.
|
||||
// - command : Performs a command on a server using the data field.
|
||||
// The event to perform.
|
||||
Event string `json:"event"`
|
||||
|
||||
// The data to pass along, only used by power/command currently. Other requests
|
||||
|
||||
@@ -37,6 +37,19 @@ type Handler struct {
|
||||
server *server.Server
|
||||
}
|
||||
|
||||
var (
|
||||
ErrJwtNotPresent = errors.New("jwt: no jwt present")
|
||||
ErrJwtNoConnectPerm = errors.New("jwt: missing connect permission")
|
||||
ErrJwtUuidMismatch = errors.New("jwt: server uuid mismatch")
|
||||
)
|
||||
|
||||
func IsJwtError(err error) bool {
|
||||
return errors.Is(err, ErrJwtNotPresent) ||
|
||||
errors.Is(err, ErrJwtNoConnectPerm) ||
|
||||
errors.Is(err, ErrJwtUuidMismatch) ||
|
||||
errors.Is(err, jwt.ErrExpValidation)
|
||||
}
|
||||
|
||||
// Parses a JWT into a websocket token payload.
|
||||
func NewTokenPayload(token []byte) (*tokens.WebsocketPayload, error) {
|
||||
payload := tokens.WebsocketPayload{}
|
||||
@@ -92,9 +105,13 @@ func GetHandler(s *server.Server, w http.ResponseWriter, r *http.Request) (*Hand
|
||||
}
|
||||
|
||||
func (h *Handler) SendJson(v *Message) error {
|
||||
// Do not send JSON down the line if the JWT on the connection is not
|
||||
// valid!
|
||||
// Do not send JSON down the line if the JWT on the connection is not valid!
|
||||
if err := h.TokenValid(); err != nil {
|
||||
h.unsafeSendJson(Message{
|
||||
Event: ErrorEvent,
|
||||
Args: []string{"could not authenticate client: " + err.Error()},
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -134,7 +151,7 @@ func (h *Handler) unsafeSendJson(v interface{}) error {
|
||||
func (h *Handler) TokenValid() error {
|
||||
j := h.GetJwt()
|
||||
if j == nil {
|
||||
return errors.New("no jwt present")
|
||||
return ErrJwtNotPresent
|
||||
}
|
||||
|
||||
if err := jwt.ExpirationTimeValidator(time.Now())(&j.Payload); err != nil {
|
||||
@@ -142,11 +159,11 @@ func (h *Handler) TokenValid() error {
|
||||
}
|
||||
|
||||
if !j.HasPermission(PermissionConnect) {
|
||||
return errors.New("jwt does not have connect permission")
|
||||
return ErrJwtNoConnectPerm
|
||||
}
|
||||
|
||||
if h.server.Id() != j.GetServerUuid() {
|
||||
return errors.New("jwt server uuid mismatch")
|
||||
return ErrJwtUuidMismatch
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -157,9 +174,10 @@ func (h *Handler) TokenValid() error {
|
||||
// error message, otherwise we just send back a standard error message.
|
||||
func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
|
||||
j := h.GetJwt()
|
||||
expected := errors.Is(err, server.ErrSuspended) || errors.Is(err, server.ErrIsRunning)
|
||||
|
||||
message := "an unexpected error was encountered while handling this request"
|
||||
if server.IsSuspendedError(err) || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
|
||||
if expected || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
|
||||
message = err.Error()
|
||||
}
|
||||
|
||||
@@ -169,7 +187,7 @@ func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error
|
||||
wsm.Args = []string{m}
|
||||
|
||||
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
|
||||
if !server.IsSuspendedError(err) {
|
||||
if !expected && !IsJwtError(err) {
|
||||
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
|
||||
Error("failed to handle websocket process; an error was encountered processing an event")
|
||||
}
|
||||
@@ -206,8 +224,6 @@ func (h *Handler) GetJwt() *tokens.WebsocketPayload {
|
||||
func (h *Handler) HandleInbound(m Message) error {
|
||||
if m.Event != AuthenticationEvent {
|
||||
if err := h.TokenValid(); err != nil {
|
||||
log.WithField("message", err.Error()).Debug("jwt for server websocket is no longer valid")
|
||||
|
||||
h.unsafeSendJson(Message{
|
||||
Event: ErrorEvent,
|
||||
Args: []string{"could not authenticate client: " + err.Error()},
|
||||
@@ -313,7 +329,7 @@ func (h *Handler) HandleInbound(m Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
logs, err := h.server.Environment.Readlog(1024 * 16)
|
||||
logs, err := h.server.Environment.Readlog(100)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,60 +1,111 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/mitchellh/colorstring"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/system"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrTooMuchConsoleData = errors.New("console is outputting too much data")
|
||||
|
||||
type ConsoleThrottler struct {
|
||||
sync.RWMutex
|
||||
mu sync.Mutex
|
||||
config.ConsoleThrottles
|
||||
|
||||
// The total number of activations that have occurred thus far.
|
||||
activations uint64
|
||||
|
||||
// The total number of lines that have been sent since the last reset timer period.
|
||||
count uint64
|
||||
|
||||
// Wether or not the console output is being throttled. It is up to calling code to
|
||||
// determine what to do if it is.
|
||||
isThrottled system.AtomicBool
|
||||
|
||||
// The total number of lines processed so far during the given time period.
|
||||
lines uint64
|
||||
|
||||
lastIntervalTime *time.Time
|
||||
lastDecayTime *time.Time
|
||||
timerCancel *context.CancelFunc
|
||||
}
|
||||
|
||||
// Increments the number of activations for a server.
|
||||
func (ct *ConsoleThrottler) AddActivation() uint64 {
|
||||
ct.Lock()
|
||||
defer ct.Unlock()
|
||||
|
||||
ct.activations += 1
|
||||
|
||||
return ct.activations
|
||||
// Resets the state of the throttler.
|
||||
func (ct *ConsoleThrottler) Reset() {
|
||||
atomic.StoreUint64(&ct.count, 0)
|
||||
atomic.StoreUint64(&ct.activations, 0)
|
||||
ct.isThrottled.Set(false)
|
||||
}
|
||||
|
||||
// Decrements the number of activations for a server.
|
||||
func (ct *ConsoleThrottler) RemoveActivation() uint64 {
|
||||
ct.Lock()
|
||||
defer ct.Unlock()
|
||||
// Triggers an activation for a server. You can also decrement the number of activations
|
||||
// by passing a negative number.
|
||||
func (ct *ConsoleThrottler) markActivation(increment bool) uint64 {
|
||||
if !increment {
|
||||
if atomic.LoadUint64(&ct.activations) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
if ct.activations == 0 {
|
||||
return 0
|
||||
// This weird dohicky subtracts 1 from the activation count.
|
||||
return atomic.AddUint64(&ct.activations, ^uint64(0))
|
||||
}
|
||||
|
||||
ct.activations -= 1
|
||||
|
||||
return ct.activations
|
||||
return atomic.AddUint64(&ct.activations, 1)
|
||||
}
|
||||
|
||||
// Increment the total count of lines that we have processed so far.
|
||||
func (ct *ConsoleThrottler) IncrementLineCount() uint64 {
|
||||
return atomic.AddUint64(&ct.lines, 1)
|
||||
// Determines if the console is currently being throttled. Calls to this function can be used to
|
||||
// determine if output should be funneled along to the websocket processes.
|
||||
func (ct *ConsoleThrottler) Throttled() bool {
|
||||
return ct.isThrottled.Get()
|
||||
}
|
||||
|
||||
// Reset the line count to zero.
|
||||
func (ct *ConsoleThrottler) ResetLineCount() {
|
||||
atomic.SwapUint64(&ct.lines, 0)
|
||||
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed
|
||||
// and number of activations, regardless of the current console message volume.
|
||||
func (ct *ConsoleThrottler) StartTimer() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond)
|
||||
decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
reset.Stop()
|
||||
return
|
||||
case <-reset.C:
|
||||
ct.isThrottled.Set(false)
|
||||
atomic.StoreUint64(&ct.count, 0)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
decay.Stop()
|
||||
return
|
||||
case <-decay.C:
|
||||
ct.markActivation(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
ct.timerCancel = &cancel
|
||||
}
|
||||
|
||||
// Stops a running timer processes if one exists. This is only called when the server is deleted since
|
||||
// we want this to always be running. If there is no process currently running nothing will really happen.
|
||||
func (ct *ConsoleThrottler) StopTimer() {
|
||||
ct.mu.Lock()
|
||||
defer ct.mu.Unlock()
|
||||
if ct.timerCancel != nil {
|
||||
c := *ct.timerCancel
|
||||
c()
|
||||
ct.timerCancel = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Handles output from a server's console. This code ensures that a server is not outputting
|
||||
@@ -70,30 +121,41 @@ func (ct *ConsoleThrottler) ResetLineCount() {
|
||||
// data all at once. These values are all configurable via the wings configuration file, however the
|
||||
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite
|
||||
// confident in them.
|
||||
func (ct *ConsoleThrottler) Handle() {
|
||||
//
|
||||
// This function returns an error if the server should be stopped due to violating throttle constraints
|
||||
// and a boolean value indicating if a throttle is being violated when it is checked.
|
||||
func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
|
||||
if !ct.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Increment the line count and if we have now output more lines than are allowed, trigger a throttle
|
||||
// activation. Once the throttle is triggered and has passed the kill at value we will trigger a server
|
||||
// stop automatically.
|
||||
if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() {
|
||||
ct.isThrottled.Set(true)
|
||||
if ct.markActivation(true) >= ct.MaximumTriggerCount {
|
||||
return ErrTooMuchConsoleData
|
||||
}
|
||||
|
||||
onTrigger()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns the throttler instance for the server or creates a new one.
|
||||
func (s *Server) Throttler() *ConsoleThrottler {
|
||||
s.throttleLock.RLock()
|
||||
s.throttleLock.Lock()
|
||||
defer s.throttleLock.Unlock()
|
||||
|
||||
if s.throttler == nil {
|
||||
// Release the read lock so that we can acquire a normal lock on the process and
|
||||
// make modifications to the throttler.
|
||||
s.throttleLock.RUnlock()
|
||||
|
||||
s.throttleLock.Lock()
|
||||
s.throttler = &ConsoleThrottler{
|
||||
ConsoleThrottles: config.Get().Throttles,
|
||||
}
|
||||
s.throttleLock.Unlock()
|
||||
|
||||
return s.throttler
|
||||
} else {
|
||||
defer s.throttleLock.RUnlock()
|
||||
return s.throttler
|
||||
}
|
||||
|
||||
return s.throttler
|
||||
}
|
||||
|
||||
// Sends output to the server console formatted to appear correctly as being sent
|
||||
|
||||
@@ -1,17 +1,9 @@
|
||||
package server
|
||||
|
||||
type suspendedError struct {
|
||||
}
|
||||
import "github.com/pkg/errors"
|
||||
|
||||
func (e *suspendedError) Error() string {
|
||||
return "server is currently in a suspended state"
|
||||
}
|
||||
|
||||
func IsSuspendedError(err error) bool {
|
||||
_, ok := err.(*suspendedError)
|
||||
|
||||
return ok
|
||||
}
|
||||
var ErrIsRunning = errors.New("server is running")
|
||||
var ErrSuspended = errors.New("server is currently in a suspended state")
|
||||
|
||||
type crashTooFrequent struct {
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ import (
|
||||
// Error returned when there is a bad path provided to one of the FS calls.
|
||||
type PathResolutionError struct{}
|
||||
|
||||
var ErrNotEnoughDiskSpace = errors.New("not enough disk space is available to perform this operation")
|
||||
|
||||
// Returns the error response in a string form that can be more easily consumed.
|
||||
func (pre PathResolutionError) Error() string {
|
||||
return "invalid path resolution"
|
||||
@@ -257,7 +259,7 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
|
||||
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
|
||||
// Check if cache is expired.
|
||||
fs.lookupTimeMu.RLock()
|
||||
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10))
|
||||
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * time.Duration(-1*config.Get().System.DiskCheckInterval)))
|
||||
fs.lookupTimeMu.RUnlock()
|
||||
|
||||
if !isValidInCache {
|
||||
@@ -400,9 +402,10 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
|
||||
currentSize = stat.Size()
|
||||
}
|
||||
|
||||
o := &fileOpener{}
|
||||
// This will either create the file if it does not already exist, or open and
|
||||
// truncate the existing file.
|
||||
file, err := os.OpenFile(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
file, err := o.open(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
@@ -782,6 +785,10 @@ func (fs *Filesystem) EnsureDataDirectory() error {
|
||||
if err := os.MkdirAll(fs.Path(), 0700); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := fs.Chown("/"); err != nil {
|
||||
fs.Server.Log().WithField("error", err).Warn("failed to chown server data directory")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -933,3 +940,29 @@ func (fs *Filesystem) handleWalkerError(err error, f os.FileInfo) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type fileOpener struct {
|
||||
busy uint
|
||||
}
|
||||
|
||||
// Attempts to open a given file up to "attempts" number of times, using a backoff. If the file
|
||||
// cannot be opened because of a "text file busy" error, we will attempt until the number of attempts
|
||||
// has been exhaused, at which point we will abort with an error.
|
||||
func (fo *fileOpener) open(path string, flags int, perm os.FileMode) (*os.File, error) {
|
||||
for {
|
||||
f, err := os.OpenFile(path, flags, perm)
|
||||
|
||||
// If there is an error because the text file is busy, go ahead and sleep for a few
|
||||
// hundred milliseconds and then try again up to three times before just returning the
|
||||
// error back to the caller.
|
||||
//
|
||||
// Based on code from: https://github.com/golang/go/issues/22220#issuecomment-336458122
|
||||
if err != nil && fo.busy < 3 && strings.Contains(err.Error(), "text file busy") {
|
||||
time.Sleep(100 * time.Millisecond << fo.busy)
|
||||
fo.busy++
|
||||
continue
|
||||
}
|
||||
|
||||
return f, err
|
||||
}
|
||||
}
|
||||
@@ -32,14 +32,17 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
|
||||
dirSize, err := fs.DiskUsage(false)
|
||||
|
||||
var size int64
|
||||
var max = fs.Server.DiskSpace() * 1000.0 * 1000.0
|
||||
// Walk over the archive and figure out just how large the final output would be from unarchiving it.
|
||||
archiver.Walk(source, func(f archiver.File) error {
|
||||
atomic.AddInt64(&size, f.Size())
|
||||
err = archiver.Walk(source, func(f archiver.File) error {
|
||||
if atomic.AddInt64(&size, f.Size()) + dirSize > max {
|
||||
return errors.WithStack(ErrNotEnoughDiskSpace)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), errors.WithStack(err)
|
||||
return err == nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Decompress a file in a given directory by using the archiver tool to infer the file
|
||||
@@ -80,6 +83,11 @@ func (fs *Filesystem) DecompressFile(dir string, file string) error {
|
||||
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
|
||||
}
|
||||
|
||||
return errors.Wrap(fs.Writefile(name, f), "could not extract file from archive")
|
||||
p, err := fs.SafePath(filepath.Join(dir, name))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to generate a safe path to server file")
|
||||
}
|
||||
|
||||
return errors.Wrap(fs.Writefile(p, f), "could not extract file from archive")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,58 +5,104 @@ import (
|
||||
"github.com/apex/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/api"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/environment"
|
||||
"github.com/pterodactyl/wings/events"
|
||||
"regexp"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Adds all of the internal event listeners we want to use for a server.
|
||||
var dockerEvents = []string{
|
||||
environment.DockerImagePullStatus,
|
||||
environment.DockerImagePullStarted,
|
||||
environment.DockerImagePullCompleted,
|
||||
}
|
||||
|
||||
// Adds all of the internal event listeners we want to use for a server. These listeners can only be
|
||||
// removed by deleting the server as they should last for the duration of the process' lifetime.
|
||||
func (s *Server) StartEventListeners() {
|
||||
console := make(chan events.Event)
|
||||
state := make(chan events.Event)
|
||||
stats := make(chan events.Event)
|
||||
console := func(e events.Event) {
|
||||
t := s.Throttler()
|
||||
err := t.Increment(func () {
|
||||
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
|
||||
})
|
||||
|
||||
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console)
|
||||
s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
|
||||
s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
|
||||
// An error is only returned if the server has breached the thresholds set.
|
||||
if err != nil {
|
||||
// If the process is already stopping, just let it continue with that action rather than attempting
|
||||
// to terminate again.
|
||||
if s.GetState() != environment.ProcessStoppingState {
|
||||
s.SetState(environment.ProcessStoppingState)
|
||||
go func() {
|
||||
s.Log().Warn("stopping server instance, violating throttle limits")
|
||||
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.")
|
||||
// Completely skip over server power actions and terminate the running instance. This gives the
|
||||
// server 15 seconds to finish stopping gracefully before it is forcefully terminated.
|
||||
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil {
|
||||
// If there is an error set the process back to running so that this throttler is called
|
||||
// again and hopefully kills the server.
|
||||
if s.GetState() != environment.ProcessOfflineState {
|
||||
s.SetState(environment.ProcessRunningState)
|
||||
}
|
||||
|
||||
go func(console chan events.Event) {
|
||||
for data := range console {
|
||||
// Immediately emit this event back over the server event stream since it is
|
||||
// being called from the environment event stream and things probably aren't
|
||||
// listening to that event.
|
||||
s.Events().Publish(ConsoleOutputEvent, data.Data)
|
||||
|
||||
// Also pass the data along to the console output channel.
|
||||
s.onConsoleOutput(data.Data)
|
||||
}
|
||||
}(console)
|
||||
|
||||
go func(state chan events.Event) {
|
||||
for data := range state {
|
||||
s.SetState(data.Data)
|
||||
}
|
||||
}(state)
|
||||
|
||||
go func(stats chan events.Event) {
|
||||
for data := range stats {
|
||||
st := new(environment.Stats)
|
||||
if err := json.Unmarshal([]byte(data.Data), st); err != nil {
|
||||
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
||||
continue
|
||||
s.Log().WithField("error", errors.WithStack(err)).Error("failed to terminate environment after triggering throttle")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Update the server resource tracking object with the resources we got here.
|
||||
s.resources.mu.Lock()
|
||||
s.resources.Stats = *st
|
||||
s.resources.mu.Unlock()
|
||||
|
||||
s.Filesystem.HasSpaceAvailable(true)
|
||||
|
||||
s.emitProcUsage()
|
||||
}
|
||||
}(stats)
|
||||
|
||||
// If we are not throttled, go ahead and output the data.
|
||||
if !t.Throttled() {
|
||||
s.Events().Publish(ConsoleOutputEvent, e.Data)
|
||||
}
|
||||
|
||||
// Also pass the data along to the console output channel.
|
||||
s.onConsoleOutput(e.Data)
|
||||
}
|
||||
|
||||
state := func(e events.Event) {
|
||||
// Reset the throttler when the process is started.
|
||||
if e.Data == environment.ProcessStartingState {
|
||||
s.Throttler().Reset()
|
||||
}
|
||||
|
||||
s.SetState(e.Data)
|
||||
}
|
||||
|
||||
stats := func(e events.Event) {
|
||||
st := new(environment.Stats)
|
||||
if err := json.Unmarshal([]byte(e.Data), st); err != nil {
|
||||
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
||||
return
|
||||
}
|
||||
|
||||
// Update the server resource tracking object with the resources we got here.
|
||||
s.resources.mu.Lock()
|
||||
s.resources.Stats = *st
|
||||
s.resources.mu.Unlock()
|
||||
|
||||
s.Filesystem.HasSpaceAvailable(true)
|
||||
|
||||
s.emitProcUsage()
|
||||
}
|
||||
|
||||
docker := func(e events.Event) {
|
||||
if e.Topic == environment.DockerImagePullStatus {
|
||||
s.Events().Publish(InstallOutputEvent, e.Data)
|
||||
} else if e.Topic == environment.DockerImagePullStarted {
|
||||
s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...")
|
||||
} else {
|
||||
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
|
||||
}
|
||||
}
|
||||
|
||||
s.Log().Info("registering event listeners: console, state, resources...")
|
||||
s.Environment.Events().On(environment.ConsoleOutputEvent, &console)
|
||||
s.Environment.Events().On(environment.StateChangeEvent, &state)
|
||||
s.Environment.Events().On(environment.ResourceEvent, &stats)
|
||||
for _, evt := range dockerEvents {
|
||||
s.Environment.Events().On(evt, &docker)
|
||||
}
|
||||
}
|
||||
|
||||
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
||||
|
||||
@@ -37,12 +37,6 @@ func LoadDirectory() error {
|
||||
return errors.New(rerr.String())
|
||||
}
|
||||
|
||||
log.Debug("retrieving cached server states from disk")
|
||||
states, err := getServerStates()
|
||||
if err != nil {
|
||||
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
log.WithField("total_configs", len(configs)).Info("processing servers returned by the API")
|
||||
|
||||
@@ -59,11 +53,6 @@ func LoadDirectory() error {
|
||||
return
|
||||
}
|
||||
|
||||
if state, exists := states[s.Id()]; exists {
|
||||
s.Log().WithField("state", state).Debug("found existing server state in cache file; re-instantiating server state")
|
||||
s.SetState(state)
|
||||
}
|
||||
|
||||
servers.Add(s)
|
||||
})
|
||||
}
|
||||
@@ -97,6 +86,9 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.resources = ResourceUsage{}
|
||||
defaults.Set(&s.resources)
|
||||
|
||||
s.Archiver = Archiver{Server: s}
|
||||
s.Filesystem = Filesystem{Server: s}
|
||||
|
||||
@@ -118,7 +110,8 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
||||
return nil, err
|
||||
} else {
|
||||
s.Environment = env
|
||||
go s.StartEventListeners()
|
||||
s.StartEventListeners()
|
||||
s.Throttler().StartTimer()
|
||||
}
|
||||
|
||||
// Forces the configuration to be synced with the panel.
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pterodactyl/wings/config"
|
||||
"github.com/pterodactyl/wings/environment"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"os"
|
||||
"time"
|
||||
@@ -87,6 +88,10 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
|
||||
|
||||
switch action {
|
||||
case PowerActionStart:
|
||||
if s.GetState() != environment.ProcessOfflineState {
|
||||
return ErrIsRunning
|
||||
}
|
||||
|
||||
// Run the pre-boot logic for the server before processing the environment start.
|
||||
if err := s.onBeforeStart(); err != nil {
|
||||
return err
|
||||
@@ -134,7 +139,7 @@ func (s *Server) onBeforeStart() error {
|
||||
// Disallow start & restart if the server is suspended. Do this check after performing a sync
|
||||
// action with the Panel to ensure that we have the most up-to-date information for that server.
|
||||
if s.IsSuspended() {
|
||||
return new(suspendedError)
|
||||
return ErrSuspended
|
||||
}
|
||||
|
||||
// Ensure we sync the server information with the environment so that any new environment variables
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/pterodactyl/wings/environment"
|
||||
"sync"
|
||||
)
|
||||
@@ -37,14 +36,10 @@ func (s *Server) Proc() *ResourceUsage {
|
||||
|
||||
func (s *Server) emitProcUsage() {
|
||||
s.resources.mu.RLock()
|
||||
defer s.resources.mu.RUnlock()
|
||||
|
||||
b, err := json.Marshal(s.resources)
|
||||
if err == nil {
|
||||
s.Events().Publish(StatsEvent, string(b))
|
||||
if err := s.Events().PublishJson(StatsEvent, s.resources); err != nil {
|
||||
s.Log().WithField("error", err).Warn("error while emitting server resource usage to listeners")
|
||||
}
|
||||
|
||||
// TODO: This might be a good place to add a debug log if stats are not sending.
|
||||
s.resources.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Returns the servers current state.
|
||||
|
||||
@@ -22,7 +22,7 @@ type Server struct {
|
||||
sync.RWMutex
|
||||
emitterLock sync.Mutex
|
||||
powerLock *semaphore.Weighted
|
||||
throttleLock sync.RWMutex
|
||||
throttleLock sync.Mutex
|
||||
|
||||
// Maintains the configuration for the server. This is the data that gets returned by the Panel
|
||||
// such as build settings and container images.
|
||||
@@ -137,6 +137,7 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
|
||||
// the process isn't just terminated when a user requests it be stopped.
|
||||
if e, ok := s.Environment.(*docker.Environment); ok {
|
||||
s.Log().Debug("syncing stop configuration with configured docker environment")
|
||||
e.SetImage(s.Config().Container.Image)
|
||||
e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
var stateMutex sync.Mutex
|
||||
|
||||
// Returns the state of the servers.
|
||||
func getServerStates() (map[string]string, error) {
|
||||
func CachedServerStates() (map[string]string, error) {
|
||||
// Request a lock after we check if the file exists.
|
||||
stateMutex.Lock()
|
||||
defer stateMutex.Unlock()
|
||||
@@ -78,8 +78,8 @@ func (s *Server) SetState(state string) error {
|
||||
|
||||
// Emit the event to any listeners that are currently registered.
|
||||
if prevState != state {
|
||||
s.Log().WithField("status", s.Proc().State).Debug("saw server status change event")
|
||||
s.Events().Publish(StatusEvent, s.Proc().State)
|
||||
s.Log().WithField("status", s.Proc().getInternalState()).Debug("saw server status change event")
|
||||
s.Events().Publish(StatusEvent, s.Proc().getInternalState())
|
||||
}
|
||||
|
||||
// Persist this change to the disk immediately so that should the Daemon be stopped or
|
||||
|
||||
20
system/bool.go
Normal file
20
system/bool.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package system
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
type AtomicBool struct {
|
||||
flag uint32
|
||||
}
|
||||
|
||||
func (ab *AtomicBool) Set(v bool) {
|
||||
i := 0
|
||||
if v {
|
||||
i = 1
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&ab.flag, uint32(i))
|
||||
}
|
||||
|
||||
func (ab *AtomicBool) Get() bool {
|
||||
return atomic.LoadUint32(&ab.flag) == 1
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
{{.LogDirectory}}/wings.log {
|
||||
size 10M
|
||||
compress
|
||||
delaycompress
|
||||
dateext
|
||||
maxage 7
|
||||
missingok
|
||||
notifempty
|
||||
create 0640 {{.User.Uid}} {{.User.Gid}}
|
||||
postrotate
|
||||
killall -SIGHUP wings
|
||||
endscript
|
||||
}
|
||||
Reference in New Issue
Block a user