Compare commits
32 Commits
v1.0.0-rc.
...
v1.0.0-rc.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
033e8e7573 | ||
|
|
aa78071543 | ||
|
|
48aeeff818 | ||
|
|
864c37f17c | ||
|
|
c7405aebe5 | ||
|
|
9ff2d53466 | ||
|
|
6ba49df485 | ||
|
|
6b25ac3665 | ||
|
|
783832fc71 | ||
|
|
815539b3da | ||
|
|
6ba1b75696 | ||
|
|
ce76b9339e | ||
|
|
6ba15e9884 | ||
|
|
f2a6d6b3c5 | ||
|
|
0295603943 | ||
|
|
ce2659fdd7 | ||
|
|
be49e08f4f | ||
|
|
3ee76ea2bc | ||
|
|
d7fbf29cc1 | ||
|
|
d02e37620d | ||
|
|
53bd0d57ad | ||
|
|
b779c98717 | ||
|
|
4ac19bd29d | ||
|
|
8407ea21da | ||
|
|
fa6f56caa8 | ||
|
|
5a62f83ec8 | ||
|
|
8bcb3d7c62 | ||
|
|
b2eebcaf6d | ||
|
|
45bcb9cd68 | ||
|
|
e1ff4db330 | ||
|
|
606143b3ad | ||
|
|
57221bdd30 |
4
.github/workflows/build-test.yml
vendored
4
.github/workflows/build-test.yml
vendored
@@ -14,10 +14,10 @@ jobs:
|
|||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-go@v2
|
- uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: '^1.15'
|
go-version: '1.15.2'
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
|
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
|
||||||
|
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test ./...
|
run: go test ./...
|
||||||
|
|||||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -12,12 +12,12 @@ jobs:
|
|||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-go@v2
|
- uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: '^1.15'
|
go-version: '1.15.2'
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
env:
|
env:
|
||||||
REF: ${{ github.ref }}
|
REF: ${{ github.ref }}
|
||||||
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
|
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
|
||||||
|
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test ./...
|
run: go test ./...
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
# Pterodactyl Panel Dockerfile
|
# Pterodactyl Panel Dockerfile
|
||||||
# ----------------------------------
|
# ----------------------------------
|
||||||
|
|
||||||
FROM golang:1.14-alpine
|
FROM golang:1.15-alpine
|
||||||
COPY . /go/wings/
|
COPY . /go/wings/
|
||||||
WORKDIR /go/wings/
|
WORKDIR /go/wings/
|
||||||
RUN apk add --no-cache upx \
|
RUN apk add --no-cache upx \
|
||||||
@@ -11,4 +11,4 @@ RUN apk add --no-cache upx \
|
|||||||
|
|
||||||
FROM alpine:latest
|
FROM alpine:latest
|
||||||
COPY --from=0 /go/wings/wings /usr/bin/
|
COPY --from=0 /go/wings/wings /usr/bin/
|
||||||
CMD ["wings","--config", "/var/lib/pterodactyl/config.yml"]
|
CMD ["wings","--config", "/etc/pterodactyl/config.yml"]
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
[](https://pterodactyl.io)
|
[](https://pterodactyl.io)
|
||||||
|
|
||||||
[](https://pterodactyl.io/discord)
|
[](https://pterodactyl.io/discord)
|
||||||
|
[](https://goreportcard.com/report/github.com/pterodactyl/wings)
|
||||||
|
|
||||||
# Pterodactyl Wings
|
# Pterodactyl Wings
|
||||||
Wings is Pterodactyl's server control plane, built for the rapidly changing gaming industry and designed to be
|
Wings is Pterodactyl's server control plane, built for the rapidly changing gaming industry and designed to be
|
||||||
|
|||||||
18
api/api.go
18
api/api.go
@@ -137,14 +137,15 @@ func IsRequestError(err error) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RequestError struct {
|
type RequestError struct {
|
||||||
Code string `json:"code"`
|
response *http.Response
|
||||||
Status string `json:"status"`
|
Code string `json:"code"`
|
||||||
Detail string `json:"detail"`
|
Status string `json:"status"`
|
||||||
|
Detail string `json:"detail"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the error response in a string form that can be more easily consumed.
|
// Returns the error response in a string form that can be more easily consumed.
|
||||||
func (re *RequestError) Error() string {
|
func (re *RequestError) Error() string {
|
||||||
return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%s)", re.Code, re.Detail, re.Status)
|
return fmt.Sprintf("Error response from Panel: %s: %s (HTTP/%d)", re.Code, re.Detail, re.response.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (re *RequestError) String() string {
|
func (re *RequestError) String() string {
|
||||||
@@ -165,9 +166,12 @@ func (r *PanelRequest) Error() *RequestError {
|
|||||||
bag := RequestErrorBag{}
|
bag := RequestErrorBag{}
|
||||||
json.Unmarshal(body, &bag)
|
json.Unmarshal(body, &bag)
|
||||||
|
|
||||||
if len(bag.Errors) == 0 {
|
e := new(RequestError)
|
||||||
return new(RequestError)
|
if len(bag.Errors) > 0 {
|
||||||
|
e = &bag.Errors[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
return &bag.Errors[0]
|
e.response = r.Response
|
||||||
|
|
||||||
|
return e
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,6 +68,12 @@ func (r *PanelRequest) ValidateSftpCredentials(request SftpAuthRequest) (*SftpAu
|
|||||||
|
|
||||||
if r.HasError() {
|
if r.HasError() {
|
||||||
if r.HttpResponseCode() >= 400 && r.HttpResponseCode() < 500 {
|
if r.HttpResponseCode() >= 400 && r.HttpResponseCode() < 500 {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"subsystem": "sftp",
|
||||||
|
"username": request.User,
|
||||||
|
"ip": request.IP,
|
||||||
|
}).Warn(r.Error().String())
|
||||||
|
|
||||||
return nil, new(sftpInvalidCredentialsError)
|
return nil, new(sftpInvalidCredentialsError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
44
cmd/root.go
44
cmd/root.go
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/NYTimes/logrotate"
|
"github.com/NYTimes/logrotate"
|
||||||
"github.com/apex/log/handlers/multi"
|
"github.com/apex/log/handlers/multi"
|
||||||
|
"github.com/docker/docker/client"
|
||||||
"github.com/gammazero/workerpool"
|
"github.com/gammazero/workerpool"
|
||||||
"golang.org/x/crypto/acme"
|
"golang.org/x/crypto/acme"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -172,6 +173,11 @@ func rootCmdRun(*cobra.Command, []string) {
|
|||||||
log.WithField("server", s.Id()).Info("loaded configuration for server")
|
log.WithField("server", s.Id()).Info("loaded configuration for server")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
states, err := server.CachedServerStates()
|
||||||
|
if err != nil {
|
||||||
|
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
|
||||||
|
}
|
||||||
|
|
||||||
// Create a new workerpool that limits us to 4 servers being bootstrapped at a time
|
// Create a new workerpool that limits us to 4 servers being bootstrapped at a time
|
||||||
// on Wings. This allows us to ensure the environment exists, write configurations,
|
// on Wings. This allows us to ensure the environment exists, write configurations,
|
||||||
// and reboot processes without causing a slow-down due to sequential booting.
|
// and reboot processes without causing a slow-down due to sequential booting.
|
||||||
@@ -181,25 +187,39 @@ func rootCmdRun(*cobra.Command, []string) {
|
|||||||
s := serv
|
s := serv
|
||||||
|
|
||||||
pool.Submit(func() {
|
pool.Submit(func() {
|
||||||
s.Log().Info("ensuring server environment exists")
|
s.Log().Info("configuring server environment and restoring to previous state")
|
||||||
// Create a server environment if none exists currently. This allows us to recover from Docker
|
|
||||||
// being reinstalled on the host system for example.
|
var st string
|
||||||
if err := s.Environment.Create(); err != nil {
|
if state, exists := states[s.Id()]; exists {
|
||||||
s.Log().WithField("error", err).Error("failed to process environment")
|
st = state
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := s.Environment.IsRunning()
|
r, err := s.Environment.IsRunning()
|
||||||
if err != nil {
|
// We ignore missing containers because we don't want to actually block booting of wings at this
|
||||||
|
// point. If we didn't do this and you pruned all of the images and then started wings you could
|
||||||
|
// end up waiting a long period of time for all of the images to be re-pulled on Wings boot rather
|
||||||
|
// than when the server itself is started.
|
||||||
|
if err != nil && !client.IsErrNotFound(err) {
|
||||||
s.Log().WithField("error", err).Error("error checking server environment status")
|
s.Log().WithField("error", err).Error("error checking server environment status")
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the server is currently running on Docker, mark the process as being in that state.
|
// Check if the server was previously running. If so, attempt to start the server now so that Wings
|
||||||
// We never want to stop an instance that is currently running external from Wings since
|
// can pick up where it left off. If the environment does not exist at all, just create it and then allow
|
||||||
// that is a good way of keeping things running even if Wings gets in a very corrupted state.
|
// the normal flow to execute.
|
||||||
//
|
//
|
||||||
// This will also validate that a server process is running if the last tracked state we have
|
// This does mean that booting wings after a catastrophic machine crash and wiping out the Docker images
|
||||||
// is that it was running, but we see that the container process is not currently running.
|
// as a result will result in a slow boot.
|
||||||
if r || (!r && s.IsRunning()) {
|
if !r && (st == environment.ProcessRunningState || st == environment.ProcessStartingState) {
|
||||||
|
if err := s.HandlePowerAction(server.PowerActionStart); err != nil {
|
||||||
|
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to return server to running state")
|
||||||
|
}
|
||||||
|
} else if r || (!r && s.IsRunning()) {
|
||||||
|
// If the server is currently running on Docker, mark the process as being in that state.
|
||||||
|
// We never want to stop an instance that is currently running external from Wings since
|
||||||
|
// that is a good way of keeping things running even if Wings gets in a very corrupted state.
|
||||||
|
//
|
||||||
|
// This will also validate that a server process is running if the last tracked state we have
|
||||||
|
// is that it was running, but we see that the container process is not currently running.
|
||||||
s.Log().Info("detected server is running, re-attaching to process...")
|
s.Log().Info("detected server is running, re-attaching to process...")
|
||||||
|
|
||||||
s.SetState(environment.ProcessRunningState)
|
s.SetState(environment.ProcessRunningState)
|
||||||
|
|||||||
@@ -36,6 +36,12 @@ type SystemConfiguration struct {
|
|||||||
Gid int
|
Gid int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The amount of time in seconds that can elapse before a server's disk space calculation is
|
||||||
|
// considered stale and a re-check should occur. DANGER: setting this value too low can seriously
|
||||||
|
// impact system performance and cause massive I/O bottlenecks and high CPU usage for the Wings
|
||||||
|
// process.
|
||||||
|
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
|
||||||
|
|
||||||
// Determines if Wings should detect a server that stops with a normal exit code of
|
// Determines if Wings should detect a server that stops with a normal exit code of
|
||||||
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
|
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
|
||||||
// the user did not press the stop button, but the process stopped cleanly.
|
// the user did not press the stop button, but the process stopped cleanly.
|
||||||
@@ -129,7 +135,21 @@ func (sc *SystemConfiguration) EnableLogRotation() error {
|
|||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
t, err := template.ParseFiles("templates/logrotate.tpl")
|
t, err := template.New("logrotate").Parse(`
|
||||||
|
{{.LogDirectory}}/wings.log {
|
||||||
|
size 10M
|
||||||
|
compress
|
||||||
|
delaycompress
|
||||||
|
dateext
|
||||||
|
maxage 7
|
||||||
|
missingok
|
||||||
|
notifempty
|
||||||
|
create 0640 {{.User.Uid}} {{.User.Gid}}
|
||||||
|
postrotate
|
||||||
|
killall -SIGHUP wings
|
||||||
|
endscript
|
||||||
|
}`)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,20 +4,24 @@ type ConsoleThrottles struct {
|
|||||||
// Whether or not the throttler is enabled for this instance.
|
// Whether or not the throttler is enabled for this instance.
|
||||||
Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
|
Enabled bool `json:"enabled" yaml:"enabled" default:"true"`
|
||||||
|
|
||||||
// The total number of throttle activations that must accumulate before a server is
|
// The total number of lines that can be output in a given LineResetInterval period before
|
||||||
// forcibly stopped for violating these limits.
|
|
||||||
KillAtCount uint64 `json:"kill_at_count" yaml:"kill_at_count" default:"5"`
|
|
||||||
|
|
||||||
// The amount of time in milliseconds that a server process must go through without
|
|
||||||
// triggering an output warning before the throttle activation count begins decreasing.
|
|
||||||
// This time is measured in milliseconds.
|
|
||||||
Decay uint64 `json:"decay" yaml:"decay" default:"10000"`
|
|
||||||
|
|
||||||
// The total number of lines that can be output in a given CheckInterval period before
|
|
||||||
// a warning is triggered and counted against the server.
|
// a warning is triggered and counted against the server.
|
||||||
Lines uint64 `json:"lines" yaml:"lines" default:"1000"`
|
Lines uint64 `json:"lines" yaml:"lines" default:"2000"`
|
||||||
|
|
||||||
// The amount of time that must pass between intervals before the count is reset. This
|
// The total number of throttle activations that can accumulate before a server is considered
|
||||||
// value is in milliseconds.
|
// to be breaching and will be stopped. This value is decremented by one every DecayInterval.
|
||||||
CheckInterval uint64 `json:"check_interval" yaml:"check_interval" default:"100"`
|
MaximumTriggerCount uint64 `json:"maximum_trigger_count" yaml:"maximum_trigger_count" default:"5"`
|
||||||
|
|
||||||
|
// The amount of time after which the number of lines processed is reset to 0. This runs in
|
||||||
|
// a constant loop and is not affected by the current console output volumes. By default, this
|
||||||
|
// will reset the processed line count back to 0 every 100ms.
|
||||||
|
LineResetInterval uint64 `json:"line_reset_interval" yaml:"line_reset_interval" default:"100"`
|
||||||
|
|
||||||
|
// The amount of time in milliseconds that must pass without an output warning being triggered
|
||||||
|
// before a throttle activation is decremented.
|
||||||
|
DecayInterval uint64 `json:"decay_interval" yaml:"decay_interval" default:"10000"`
|
||||||
|
|
||||||
|
// The amount of time that a server is allowed to be stopping for before it is terminated
|
||||||
|
// forfully if it triggers output throttles.
|
||||||
|
StopGracePeriod uint `json:"stop_grace_period" yaml:"stop_grace_period" default:"15"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,26 +1,35 @@
|
|||||||
version: '3'
|
version: '3.5'
|
||||||
services:
|
services:
|
||||||
daemon:
|
daemon:
|
||||||
build: .
|
build: .
|
||||||
restart: always
|
restart: always
|
||||||
hostname: daemon
|
networks:
|
||||||
|
- daemon0
|
||||||
ports:
|
ports:
|
||||||
- "8080:8080"
|
- "8080:8080"
|
||||||
- "2022:2022"
|
- "2022:2022"
|
||||||
tty: true
|
tty: true
|
||||||
environment:
|
environment:
|
||||||
- "DEBUG=false"
|
- "DEBUG=false"
|
||||||
|
- "TZ=UTC" # change to the three letter timezone of your choosing
|
||||||
volumes:
|
volumes:
|
||||||
- "/var/run/docker.sock:/var/run/docker.sock"
|
- "/var/run/docker.sock:/var/run/docker.sock"
|
||||||
- "/var/lib/docker/containers/:/var/lib/docker/containers/"
|
- "/var/lib/docker/containers/:/var/lib/docker/containers/"
|
||||||
|
- "/etc/pterodactyl/:/etc/pterodactyl/"
|
||||||
- "/var/lib/pterodactyl/:/var/lib/pterodactyl/"
|
- "/var/lib/pterodactyl/:/var/lib/pterodactyl/"
|
||||||
- "/srv/daemon-data/:/srv/daemon-data/"
|
- "/var/log/pterodactyl/:/var/log/pterodactyl/"
|
||||||
- "/tmp/pterodactyl/:/tmp/pterodactyl/"
|
- "/tmp/pterodactyl/:/tmp/pterodactyl/"
|
||||||
- "/etc/timezone:/etc/timezone:ro"
|
## you may need /srv/daemon-data if you are upgrading from an old daemon
|
||||||
|
## - "/srv/daemon-data/:/srv/daemon-data/"
|
||||||
## Required for ssl if you user let's encrypt. uncomment to use.
|
## Required for ssl if you user let's encrypt. uncomment to use.
|
||||||
## - "/etc/letsencrypt/:/etc/letsencrypt/"
|
## - "/etc/letsencrypt/:/etc/letsencrypt/"
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
default:
|
daemon0:
|
||||||
|
name: daemon0
|
||||||
|
driver: bridge
|
||||||
ipam:
|
ipam:
|
||||||
config:
|
config:
|
||||||
- subnet: 172.21.0.0/16
|
- subnet: "172.21.0.0/16"
|
||||||
|
driver_opts:
|
||||||
|
com.docker.network.bridge.name: daemon0
|
||||||
@@ -3,6 +3,7 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
@@ -19,6 +20,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type imagePullStatus struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Progress string `json:"progress"`
|
||||||
|
}
|
||||||
|
|
||||||
// Attaches to the docker container itself and ensures that we can pipe data in and out
|
// Attaches to the docker container itself and ensures that we can pipe data in and out
|
||||||
// of the process stream. This should not be used for reading console data as you *will*
|
// of the process stream. This should not be used for reading console data as you *will*
|
||||||
// miss important output at the beginning because of the time delay with attaching to the
|
// miss important output at the beginning because of the time delay with attaching to the
|
||||||
@@ -148,7 +154,7 @@ func (e *Environment) Create() error {
|
|||||||
// Convert 127.0.0.1 to the pterodactyl0 network interface if the environment is Docker
|
// Convert 127.0.0.1 to the pterodactyl0 network interface if the environment is Docker
|
||||||
// so that the server operates as expected.
|
// so that the server operates as expected.
|
||||||
if v == "SERVER_IP=127.0.0.1" {
|
if v == "SERVER_IP=127.0.0.1" {
|
||||||
evs[i] = "SERVER_IP="+config.Get().Docker.Network.Interface
|
evs[i] = "SERVER_IP=" + config.Get().Docker.Network.Interface
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -307,6 +313,9 @@ func (e *Environment) followOutput() error {
|
|||||||
//
|
//
|
||||||
// TODO: local images
|
// TODO: local images
|
||||||
func (e *Environment) ensureImageExists(image string) error {
|
func (e *Environment) ensureImageExists(image string) error {
|
||||||
|
e.Events().Publish(environment.DockerImagePullStarted, "")
|
||||||
|
defer e.Events().Publish(environment.DockerImagePullCompleted, "")
|
||||||
|
|
||||||
// Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an
|
// Give it up to 15 minutes to pull the image. I think this should cover 99.8% of cases where an
|
||||||
// image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull
|
// image pull might fail. I can't imagine it will ever take more than 15 minutes to fully pull
|
||||||
// an image. Let me know when I am inevitably wrong here...
|
// an image. Let me know when I am inevitably wrong here...
|
||||||
@@ -374,12 +383,18 @@ func (e *Environment) ensureImageExists(image string) error {
|
|||||||
// is done being pulled, which is what we need.
|
// is done being pulled, which is what we need.
|
||||||
scanner := bufio.NewScanner(out)
|
scanner := bufio.NewScanner(out)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
continue
|
s := imagePullStatus{}
|
||||||
|
fmt.Println(scanner.Text())
|
||||||
|
if err := json.Unmarshal(scanner.Bytes(), &s); err == nil {
|
||||||
|
e.Events().Publish(environment.DockerImagePullStatus, s.Status+" "+s.Progress)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("image", image).Debug("completed docker image pull")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -176,3 +176,9 @@ func (e *Environment) SetStopConfiguration(c *api.ProcessStopConfiguration) {
|
|||||||
e.meta.Stop = c
|
e.meta.Stop = c
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Environment) SetImage(i string) {
|
||||||
|
e.mu.Lock()
|
||||||
|
e.meta.Image = i
|
||||||
|
e.mu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
@@ -183,13 +183,21 @@ func (e *Environment) WaitForStop(seconds uint, terminate bool) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||||
if terminate {
|
if terminate {
|
||||||
return e.Terminate(os.Kill)
|
log.WithField("container_id", e.Id).Debug("server did not stop in time, executing process termination")
|
||||||
|
|
||||||
|
return errors.WithStack(e.Terminate(os.Kill))
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.WithStack(ctxErr)
|
return errors.WithStack(ctxErr)
|
||||||
}
|
}
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if terminate {
|
||||||
|
log.WithField("container_id", e.Id).WithField("error", errors.WithStack(err)).Warn("error while waiting for container stop, attempting process termination")
|
||||||
|
|
||||||
|
return errors.WithStack(e.Terminate(os.Kill))
|
||||||
|
}
|
||||||
|
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
case <-ok:
|
case <-ok:
|
||||||
|
|||||||
@@ -6,9 +6,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ConsoleOutputEvent = "console output"
|
ConsoleOutputEvent = "console output"
|
||||||
StateChangeEvent = "state change"
|
StateChangeEvent = "state change"
|
||||||
ResourceEvent = "resources"
|
ResourceEvent = "resources"
|
||||||
|
DockerImagePullStarted = "docker image pull started"
|
||||||
|
DockerImagePullStatus = "docker image pull status"
|
||||||
|
DockerImagePullCompleted = "docker image pull completed"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
101
events/events.go
101
events/events.go
@@ -2,6 +2,8 @@ package events
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/gammazero/workerpool"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@@ -12,14 +14,13 @@ type Event struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
pools map[string]*CallbackPool
|
||||||
subscribers map[string]map[chan Event]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *EventBus {
|
func New() *EventBus {
|
||||||
return &EventBus{
|
return &EventBus{
|
||||||
subscribers: make(map[string]map[chan Event]struct{}),
|
pools: make(map[string]*CallbackPool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,25 +40,36 @@ func (e *EventBus) Publish(topic string, data string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
|
||||||
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
// Acquire a read lock and loop over all of the channels registered for the topic. This
|
||||||
// avoids a panic crash if the process tries to unregister the channel while this routine
|
// avoids a panic crash if the process tries to unregister the channel while this routine
|
||||||
// is running.
|
// is running.
|
||||||
go func() {
|
if cp, ok := e.pools[t]; ok {
|
||||||
e.RLock()
|
for _, callback := range cp.callbacks {
|
||||||
defer e.RUnlock()
|
c := *callback
|
||||||
|
evt := Event{Data: data, Topic: topic}
|
||||||
if ch, ok := e.subscribers[t]; ok {
|
// Using the workerpool with one worker allows us to execute events in a FIFO manner. Running
|
||||||
for channel := range ch {
|
// this using goroutines would cause things such as console output to just output in random order
|
||||||
channel <- Event{Data: data, Topic: topic}
|
// if more than one event is fired at the same time.
|
||||||
}
|
//
|
||||||
|
// However, the pool submission does not block the execution of this function itself, allowing
|
||||||
|
// us to call publish without blocking any of the other pathways.
|
||||||
|
//
|
||||||
|
// @see https://github.com/pterodactyl/panel/issues/2303
|
||||||
|
cp.pool.Submit(func() {
|
||||||
|
c(evt)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publishes a JSON message to a given topic.
|
||||||
func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||||
b, err := json.Marshal(data)
|
b, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Publish(topic, string(b))
|
e.Publish(topic, string(b))
|
||||||
@@ -65,41 +77,46 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to an emitter topic using a channel.
|
// Register a callback function that will be executed each time one of the events using the topic
|
||||||
func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
// name is called.
|
||||||
e.Lock()
|
func (e *EventBus) On(topic string, callback *func(Event)) {
|
||||||
defer e.Unlock()
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
if _, exists := e.subscribers[topic]; !exists {
|
// Check if this topic has been registered at least once for the event listener, and if
|
||||||
e.subscribers[topic] = make(map[chan Event]struct{})
|
// not create an empty struct for the topic.
|
||||||
|
if _, exists := e.pools[topic]; !exists {
|
||||||
|
e.pools[topic] = &CallbackPool{
|
||||||
|
callbacks: make([]*func(Event), 0),
|
||||||
|
pool: workerpool.New(1),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only set the channel if there is not currently a matching one for this topic. This
|
// If this callback is not already registered as an event listener, go ahead and append
|
||||||
// avoids registering two identical listeners for the same topic and causing pain in
|
// it to the array of callbacks for this topic.
|
||||||
// the unsubscribe functionality as well.
|
e.pools[topic].Add(callback)
|
||||||
if _, exists := e.subscribers[topic][ch]; !exists {
|
}
|
||||||
e.subscribers[topic][ch] = struct{}{}
|
|
||||||
|
// Removes an event listener from the bus.
|
||||||
|
func (e *EventBus) Off(topic string, callback *func(Event)) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if cp, ok := e.pools[topic]; ok {
|
||||||
|
cp.Remove(callback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe a channel from a given topic.
|
// Removes all of the event listeners that have been registered for any topic. Also stops the worker
|
||||||
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
// pool to close that routine.
|
||||||
e.Lock()
|
func (e *EventBus) Destroy() {
|
||||||
defer e.Unlock()
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
if _, exists := e.subscribers[topic][ch]; exists {
|
// Stop every pool that exists for a given callback topic.
|
||||||
delete(e.subscribers[topic], ch)
|
for _, cp := range e.pools {
|
||||||
|
cp.pool.Stop()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Removes all of the event listeners for the server. This is used when a server
|
e.pools = make(map[string]*CallbackPool)
|
||||||
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
|
|
||||||
// should also check elsewhere and handle a server reference going nil, but this
|
|
||||||
// won't hurt.
|
|
||||||
func (e *EventBus) UnsubscribeAll() {
|
|
||||||
e.Lock()
|
|
||||||
defer e.Unlock()
|
|
||||||
|
|
||||||
// Reset the entire struct into an empty map.
|
|
||||||
e.subscribers = make(map[string]map[chan Event]struct{})
|
|
||||||
}
|
}
|
||||||
|
|||||||
49
events/pool.go
Normal file
49
events/pool.go
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/gammazero/workerpool"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CallbackPool struct {
|
||||||
|
callbacks []*func(Event)
|
||||||
|
pool *workerpool.WorkerPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pushes a new callback into the array of listeners for the pool.
|
||||||
|
func (cp *CallbackPool) Add(callback *func(Event)) {
|
||||||
|
if cp.index(reflect.ValueOf(callback)) < 0 {
|
||||||
|
cp.callbacks = append(cp.callbacks, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Removes a callback from the array of registered callbacks if it exists.
|
||||||
|
func (cp *CallbackPool) Remove(callback *func(Event)) {
|
||||||
|
i := cp.index(reflect.ValueOf(callback))
|
||||||
|
|
||||||
|
// If i < 0 it means there was no index found for the given callback, meaning it was
|
||||||
|
// never registered or was already unregistered from the listeners. Also double check
|
||||||
|
// that we didn't somehow escape the length of the topic callback (not sure how that
|
||||||
|
// would happen, but lets avoid a panic condition).
|
||||||
|
if i < 0 || i >= len(cp.callbacks) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can assume that the topic still exists at this point since we acquire an exclusive
|
||||||
|
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
|
||||||
|
// no topic already existing.
|
||||||
|
cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finds the index of a given callback in the topic by comparing all of the registered callback
|
||||||
|
// pointers to the passed function. This function does not aquire a lock as it should only be called
|
||||||
|
// within the confines of a function that has already acquired a lock for the duration of the lookup.
|
||||||
|
func (cp *CallbackPool) index(v reflect.Value) int {
|
||||||
|
for i, handler := range cp.callbacks {
|
||||||
|
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1
|
||||||
|
}
|
||||||
@@ -2,16 +2,12 @@ package installer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/apex/log"
|
|
||||||
"github.com/asaskevich/govalidator"
|
"github.com/asaskevich/govalidator"
|
||||||
"github.com/buger/jsonparser"
|
"github.com/buger/jsonparser"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/pterodactyl/wings/api"
|
"github.com/pterodactyl/wings/api"
|
||||||
"github.com/pterodactyl/wings/config"
|
|
||||||
"github.com/pterodactyl/wings/environment"
|
"github.com/pterodactyl/wings/environment"
|
||||||
"github.com/pterodactyl/wings/server"
|
"github.com/pterodactyl/wings/server"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Installer struct {
|
type Installer struct {
|
||||||
@@ -95,33 +91,6 @@ func (i *Installer) Server() *server.Server {
|
|||||||
return i.server
|
return i.server
|
||||||
}
|
}
|
||||||
|
|
||||||
// Executes the installer process, creating the server and running through the
|
|
||||||
// associated installation process based on the parameters passed through for
|
|
||||||
// the server instance.
|
|
||||||
func (i *Installer) Execute() {
|
|
||||||
p := path.Join(config.Get().System.Data, i.Uuid())
|
|
||||||
l := log.WithFields(log.Fields{"server": i.Uuid(), "process": "installer"})
|
|
||||||
|
|
||||||
l.WithField("path", p).Debug("creating required server data directory")
|
|
||||||
if err := os.MkdirAll(p, 0755); err != nil {
|
|
||||||
l.WithFields(log.Fields{"path": p, "error": errors.WithStack(err)}).Error("failed to create server data directory")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.Chown(p, config.Get().System.User.Uid, config.Get().System.User.Gid); err != nil {
|
|
||||||
l.WithField("error", errors.WithStack(err)).Error("failed to chown server data directory")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
l.Debug("creating required environment for server instance")
|
|
||||||
if err := i.server.Environment.Create(); err != nil {
|
|
||||||
l.WithField("error", err).Error("failed to create environment for server")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
l.Info("successfully created environment for server during install process")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a string value from the JSON data provided.
|
// Returns a string value from the JSON data provided.
|
||||||
func getString(data []byte, key ...string) string {
|
func getString(data []byte, key ...string) string {
|
||||||
value, _ := jsonparser.GetString(data, key...)
|
value, _ := jsonparser.GetString(data, key...)
|
||||||
|
|||||||
@@ -198,7 +198,8 @@ func deleteServer(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe all of the event listeners.
|
// Unsubscribe all of the event listeners.
|
||||||
s.Events().UnsubscribeAll()
|
s.Events().Destroy()
|
||||||
|
s.Throttler().StopTimer()
|
||||||
|
|
||||||
// Destroy the environment; in Docker this will handle a running container and
|
// Destroy the environment; in Docker this will handle a running container and
|
||||||
// forcibly terminate it before removing the container, so we do not need to handle
|
// forcibly terminate it before removing the container, so we do not need to handle
|
||||||
|
|||||||
@@ -339,10 +339,22 @@ func postServerDecompressFiles(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil {
|
if err := s.Filesystem.DecompressFile(data.RootPath, data.File); err != nil {
|
||||||
// Check if the file does not exist.
|
|
||||||
// NOTE: os.IsNotExist() does not work if the error is wrapped.
|
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
c.Status(http.StatusNotFound)
|
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{
|
||||||
|
"error": "The requested archive was not found.",
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the file is busy for some reason just return a nicer error to the user since there is not
|
||||||
|
// much we specifically can do. They'll need to stop the running server process in order to overwrite
|
||||||
|
// a file like this.
|
||||||
|
if strings.Contains(err.Error(), "text file busy") {
|
||||||
|
s.Log().WithField("error", err).Warn("failed to decompress file due to busy text file")
|
||||||
|
|
||||||
|
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
|
||||||
|
"error": "One or more files this archive is attempting to overwrite are currently in use by another process. Please try again.",
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -57,7 +57,11 @@ func postCreateServer(c *gin.Context) {
|
|||||||
// cycle. If there are any errors they will be logged and communicated back
|
// cycle. If there are any errors they will be logged and communicated back
|
||||||
// to the Panel where a reinstall may take place.
|
// to the Panel where a reinstall may take place.
|
||||||
go func(i *installer.Installer) {
|
go func(i *installer.Installer) {
|
||||||
i.Execute()
|
err := i.Server().CreateEnvironment()
|
||||||
|
if err != nil {
|
||||||
|
i.Server().Log().WithField("error", err).Error("failed to create server environment during install process")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := i.Server().Install(false); err != nil {
|
if err := i.Server().Install(false); err != nil {
|
||||||
log.WithFields(log.Fields{"server": i.Uuid(), "error": err}).Error("failed to run install process for server")
|
log.WithFields(log.Fields{"server": i.Uuid(), "error": err}).Error("failed to run install process for server")
|
||||||
|
|||||||
@@ -277,7 +277,10 @@ func postTransfer(c *gin.Context) {
|
|||||||
server.GetServers().Add(i.Server())
|
server.GetServers().Add(i.Server())
|
||||||
|
|
||||||
// Create the server's environment (note this does not execute the install script)
|
// Create the server's environment (note this does not execute the install script)
|
||||||
i.Execute()
|
if err := i.Server().CreateEnvironment(); err != nil {
|
||||||
|
l.WithField("error", err).Error("failed to create server environment")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Un-archive the archive. That sounds weird..
|
// Un-archive the archive. That sounds weird..
|
||||||
if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem.Path()); err != nil {
|
if err := archiver.NewTarGz().Unarchive(archivePath, i.Server().Filesystem.Path()); err != nil {
|
||||||
|
|||||||
@@ -50,24 +50,26 @@ var e = []string{
|
|||||||
// Listens for different events happening on a server and sends them along
|
// Listens for different events happening on a server and sends them along
|
||||||
// to the connected websocket.
|
// to the connected websocket.
|
||||||
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
||||||
eventChannel := make(chan events.Event)
|
h.server.Log().Debug("listening for server events over websocket")
|
||||||
for _, event := range e {
|
callback := func(e events.Event) {
|
||||||
h.server.Events().Subscribe(event, eventChannel)
|
if err := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); err != nil {
|
||||||
}
|
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
|
||||||
|
|
||||||
for d := range eventChannel {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
for _, event := range e {
|
|
||||||
h.server.Events().Unsubscribe(event, eventChannel)
|
|
||||||
}
|
|
||||||
|
|
||||||
close(eventChannel)
|
|
||||||
default:
|
|
||||||
_ = h.SendJson(&Message{
|
|
||||||
Event: d.Topic,
|
|
||||||
Args: []string{d.Data},
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe to all of the events with the same callback that will push the data out over the
|
||||||
|
// websocket for the server.
|
||||||
|
for _, evt := range e {
|
||||||
|
h.server.Events().On(evt, &callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(ctx context.Context) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Once this context is stopped, de-register all of the listeners that have been registered.
|
||||||
|
for _, evt := range e {
|
||||||
|
h.server.Events().Off(evt, &callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(ctx)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,12 +12,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// The event to perform. Should be one of the following that are supported:
|
// The event to perform.
|
||||||
//
|
|
||||||
// - status : Returns the server's power state.
|
|
||||||
// - logs : Returns the server log data at the time of the request.
|
|
||||||
// - power : Performs a power action against the server based the data.
|
|
||||||
// - command : Performs a command on a server using the data field.
|
|
||||||
Event string `json:"event"`
|
Event string `json:"event"`
|
||||||
|
|
||||||
// The data to pass along, only used by power/command currently. Other requests
|
// The data to pass along, only used by power/command currently. Other requests
|
||||||
|
|||||||
@@ -37,6 +37,19 @@ type Handler struct {
|
|||||||
server *server.Server
|
server *server.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrJwtNotPresent = errors.New("jwt: no jwt present")
|
||||||
|
ErrJwtNoConnectPerm = errors.New("jwt: missing connect permission")
|
||||||
|
ErrJwtUuidMismatch = errors.New("jwt: server uuid mismatch")
|
||||||
|
)
|
||||||
|
|
||||||
|
func IsJwtError(err error) bool {
|
||||||
|
return errors.Is(err, ErrJwtNotPresent) ||
|
||||||
|
errors.Is(err, ErrJwtNoConnectPerm) ||
|
||||||
|
errors.Is(err, ErrJwtUuidMismatch) ||
|
||||||
|
errors.Is(err, jwt.ErrExpValidation)
|
||||||
|
}
|
||||||
|
|
||||||
// Parses a JWT into a websocket token payload.
|
// Parses a JWT into a websocket token payload.
|
||||||
func NewTokenPayload(token []byte) (*tokens.WebsocketPayload, error) {
|
func NewTokenPayload(token []byte) (*tokens.WebsocketPayload, error) {
|
||||||
payload := tokens.WebsocketPayload{}
|
payload := tokens.WebsocketPayload{}
|
||||||
@@ -92,9 +105,13 @@ func GetHandler(s *server.Server, w http.ResponseWriter, r *http.Request) (*Hand
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) SendJson(v *Message) error {
|
func (h *Handler) SendJson(v *Message) error {
|
||||||
// Do not send JSON down the line if the JWT on the connection is not
|
// Do not send JSON down the line if the JWT on the connection is not valid!
|
||||||
// valid!
|
|
||||||
if err := h.TokenValid(); err != nil {
|
if err := h.TokenValid(); err != nil {
|
||||||
|
h.unsafeSendJson(Message{
|
||||||
|
Event: ErrorEvent,
|
||||||
|
Args: []string{"could not authenticate client: " + err.Error()},
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,7 +151,7 @@ func (h *Handler) unsafeSendJson(v interface{}) error {
|
|||||||
func (h *Handler) TokenValid() error {
|
func (h *Handler) TokenValid() error {
|
||||||
j := h.GetJwt()
|
j := h.GetJwt()
|
||||||
if j == nil {
|
if j == nil {
|
||||||
return errors.New("no jwt present")
|
return ErrJwtNotPresent
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := jwt.ExpirationTimeValidator(time.Now())(&j.Payload); err != nil {
|
if err := jwt.ExpirationTimeValidator(time.Now())(&j.Payload); err != nil {
|
||||||
@@ -142,11 +159,11 @@ func (h *Handler) TokenValid() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !j.HasPermission(PermissionConnect) {
|
if !j.HasPermission(PermissionConnect) {
|
||||||
return errors.New("jwt does not have connect permission")
|
return ErrJwtNoConnectPerm
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.server.Id() != j.GetServerUuid() {
|
if h.server.Id() != j.GetServerUuid() {
|
||||||
return errors.New("jwt server uuid mismatch")
|
return ErrJwtUuidMismatch
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -157,9 +174,10 @@ func (h *Handler) TokenValid() error {
|
|||||||
// error message, otherwise we just send back a standard error message.
|
// error message, otherwise we just send back a standard error message.
|
||||||
func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
|
func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
|
||||||
j := h.GetJwt()
|
j := h.GetJwt()
|
||||||
|
expected := errors.Is(err, server.ErrSuspended) || errors.Is(err, server.ErrIsRunning)
|
||||||
|
|
||||||
message := "an unexpected error was encountered while handling this request"
|
message := "an unexpected error was encountered while handling this request"
|
||||||
if server.IsSuspendedError(err) || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
|
if expected || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
|
||||||
message = err.Error()
|
message = err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +187,7 @@ func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error
|
|||||||
wsm.Args = []string{m}
|
wsm.Args = []string{m}
|
||||||
|
|
||||||
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
|
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
|
||||||
if !server.IsSuspendedError(err) {
|
if !expected && !IsJwtError(err) {
|
||||||
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
|
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
|
||||||
Error("failed to handle websocket process; an error was encountered processing an event")
|
Error("failed to handle websocket process; an error was encountered processing an event")
|
||||||
}
|
}
|
||||||
@@ -206,8 +224,6 @@ func (h *Handler) GetJwt() *tokens.WebsocketPayload {
|
|||||||
func (h *Handler) HandleInbound(m Message) error {
|
func (h *Handler) HandleInbound(m Message) error {
|
||||||
if m.Event != AuthenticationEvent {
|
if m.Event != AuthenticationEvent {
|
||||||
if err := h.TokenValid(); err != nil {
|
if err := h.TokenValid(); err != nil {
|
||||||
log.WithField("message", err.Error()).Debug("jwt for server websocket is no longer valid")
|
|
||||||
|
|
||||||
h.unsafeSendJson(Message{
|
h.unsafeSendJson(Message{
|
||||||
Event: ErrorEvent,
|
Event: ErrorEvent,
|
||||||
Args: []string{"could not authenticate client: " + err.Error()},
|
Args: []string{"could not authenticate client: " + err.Error()},
|
||||||
@@ -313,7 +329,7 @@ func (h *Handler) HandleInbound(m Message) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logs, err := h.server.Environment.Readlog(1024 * 16)
|
logs, err := h.server.Environment.Readlog(100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,60 +1,111 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/mitchellh/colorstring"
|
"github.com/mitchellh/colorstring"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/pterodactyl/wings/config"
|
"github.com/pterodactyl/wings/config"
|
||||||
|
"github.com/pterodactyl/wings/system"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrTooMuchConsoleData = errors.New("console is outputting too much data")
|
||||||
|
|
||||||
type ConsoleThrottler struct {
|
type ConsoleThrottler struct {
|
||||||
sync.RWMutex
|
mu sync.Mutex
|
||||||
config.ConsoleThrottles
|
config.ConsoleThrottles
|
||||||
|
|
||||||
// The total number of activations that have occurred thus far.
|
// The total number of activations that have occurred thus far.
|
||||||
activations uint64
|
activations uint64
|
||||||
|
|
||||||
|
// The total number of lines that have been sent since the last reset timer period.
|
||||||
|
count uint64
|
||||||
|
|
||||||
|
// Wether or not the console output is being throttled. It is up to calling code to
|
||||||
|
// determine what to do if it is.
|
||||||
|
isThrottled system.AtomicBool
|
||||||
|
|
||||||
// The total number of lines processed so far during the given time period.
|
// The total number of lines processed so far during the given time period.
|
||||||
lines uint64
|
timerCancel *context.CancelFunc
|
||||||
|
|
||||||
lastIntervalTime *time.Time
|
|
||||||
lastDecayTime *time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increments the number of activations for a server.
|
// Resets the state of the throttler.
|
||||||
func (ct *ConsoleThrottler) AddActivation() uint64 {
|
func (ct *ConsoleThrottler) Reset() {
|
||||||
ct.Lock()
|
atomic.StoreUint64(&ct.count, 0)
|
||||||
defer ct.Unlock()
|
atomic.StoreUint64(&ct.activations, 0)
|
||||||
|
ct.isThrottled.Set(false)
|
||||||
ct.activations += 1
|
|
||||||
|
|
||||||
return ct.activations
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrements the number of activations for a server.
|
// Triggers an activation for a server. You can also decrement the number of activations
|
||||||
func (ct *ConsoleThrottler) RemoveActivation() uint64 {
|
// by passing a negative number.
|
||||||
ct.Lock()
|
func (ct *ConsoleThrottler) markActivation(increment bool) uint64 {
|
||||||
defer ct.Unlock()
|
if !increment {
|
||||||
|
if atomic.LoadUint64(&ct.activations) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
if ct.activations == 0 {
|
// This weird dohicky subtracts 1 from the activation count.
|
||||||
return 0
|
return atomic.AddUint64(&ct.activations, ^uint64(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
ct.activations -= 1
|
return atomic.AddUint64(&ct.activations, 1)
|
||||||
|
|
||||||
return ct.activations
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment the total count of lines that we have processed so far.
|
// Determines if the console is currently being throttled. Calls to this function can be used to
|
||||||
func (ct *ConsoleThrottler) IncrementLineCount() uint64 {
|
// determine if output should be funneled along to the websocket processes.
|
||||||
return atomic.AddUint64(&ct.lines, 1)
|
func (ct *ConsoleThrottler) Throttled() bool {
|
||||||
|
return ct.isThrottled.Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the line count to zero.
|
// Starts a timer that runs in a seperate thread and will continually decrement the lines processed
|
||||||
func (ct *ConsoleThrottler) ResetLineCount() {
|
// and number of activations, regardless of the current console message volume.
|
||||||
atomic.SwapUint64(&ct.lines, 0)
|
func (ct *ConsoleThrottler) StartTimer() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
reset := time.NewTicker(time.Duration(int64(ct.LineResetInterval)) * time.Millisecond)
|
||||||
|
decay := time.NewTicker(time.Duration(int64(ct.DecayInterval)) * time.Millisecond)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
reset.Stop()
|
||||||
|
return
|
||||||
|
case <-reset.C:
|
||||||
|
ct.isThrottled.Set(false)
|
||||||
|
atomic.StoreUint64(&ct.count, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
decay.Stop()
|
||||||
|
return
|
||||||
|
case <-decay.C:
|
||||||
|
ct.markActivation(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ct.timerCancel = &cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stops a running timer processes if one exists. This is only called when the server is deleted since
|
||||||
|
// we want this to always be running. If there is no process currently running nothing will really happen.
|
||||||
|
func (ct *ConsoleThrottler) StopTimer() {
|
||||||
|
ct.mu.Lock()
|
||||||
|
defer ct.mu.Unlock()
|
||||||
|
if ct.timerCancel != nil {
|
||||||
|
c := *ct.timerCancel
|
||||||
|
c()
|
||||||
|
ct.timerCancel = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handles output from a server's console. This code ensures that a server is not outputting
|
// Handles output from a server's console. This code ensures that a server is not outputting
|
||||||
@@ -70,30 +121,41 @@ func (ct *ConsoleThrottler) ResetLineCount() {
|
|||||||
// data all at once. These values are all configurable via the wings configuration file, however the
|
// data all at once. These values are all configurable via the wings configuration file, however the
|
||||||
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite
|
// defaults have been in the wild for almost two years at the time of this writing, so I feel quite
|
||||||
// confident in them.
|
// confident in them.
|
||||||
func (ct *ConsoleThrottler) Handle() {
|
//
|
||||||
|
// This function returns an error if the server should be stopped due to violating throttle constraints
|
||||||
|
// and a boolean value indicating if a throttle is being violated when it is checked.
|
||||||
|
func (ct *ConsoleThrottler) Increment(onTrigger func()) error {
|
||||||
|
if !ct.Enabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment the line count and if we have now output more lines than are allowed, trigger a throttle
|
||||||
|
// activation. Once the throttle is triggered and has passed the kill at value we will trigger a server
|
||||||
|
// stop automatically.
|
||||||
|
if atomic.AddUint64(&ct.count, 1) >= ct.Lines && !ct.Throttled() {
|
||||||
|
ct.isThrottled.Set(true)
|
||||||
|
if ct.markActivation(true) >= ct.MaximumTriggerCount {
|
||||||
|
return ErrTooMuchConsoleData
|
||||||
|
}
|
||||||
|
|
||||||
|
onTrigger()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the throttler instance for the server or creates a new one.
|
// Returns the throttler instance for the server or creates a new one.
|
||||||
func (s *Server) Throttler() *ConsoleThrottler {
|
func (s *Server) Throttler() *ConsoleThrottler {
|
||||||
s.throttleLock.RLock()
|
s.throttleLock.Lock()
|
||||||
|
defer s.throttleLock.Unlock()
|
||||||
|
|
||||||
if s.throttler == nil {
|
if s.throttler == nil {
|
||||||
// Release the read lock so that we can acquire a normal lock on the process and
|
|
||||||
// make modifications to the throttler.
|
|
||||||
s.throttleLock.RUnlock()
|
|
||||||
|
|
||||||
s.throttleLock.Lock()
|
|
||||||
s.throttler = &ConsoleThrottler{
|
s.throttler = &ConsoleThrottler{
|
||||||
ConsoleThrottles: config.Get().Throttles,
|
ConsoleThrottles: config.Get().Throttles,
|
||||||
}
|
}
|
||||||
s.throttleLock.Unlock()
|
|
||||||
|
|
||||||
return s.throttler
|
|
||||||
} else {
|
|
||||||
defer s.throttleLock.RUnlock()
|
|
||||||
return s.throttler
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s.throttler
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends output to the server console formatted to appear correctly as being sent
|
// Sends output to the server console formatted to appear correctly as being sent
|
||||||
|
|||||||
@@ -1,17 +1,9 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
type suspendedError struct {
|
import "github.com/pkg/errors"
|
||||||
}
|
|
||||||
|
|
||||||
func (e *suspendedError) Error() string {
|
var ErrIsRunning = errors.New("server is running")
|
||||||
return "server is currently in a suspended state"
|
var ErrSuspended = errors.New("server is currently in a suspended state")
|
||||||
}
|
|
||||||
|
|
||||||
func IsSuspendedError(err error) bool {
|
|
||||||
_, ok := err.(*suspendedError)
|
|
||||||
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
type crashTooFrequent struct {
|
type crashTooFrequent struct {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,8 @@ import (
|
|||||||
// Error returned when there is a bad path provided to one of the FS calls.
|
// Error returned when there is a bad path provided to one of the FS calls.
|
||||||
type PathResolutionError struct{}
|
type PathResolutionError struct{}
|
||||||
|
|
||||||
|
var ErrNotEnoughDiskSpace = errors.New("not enough disk space is available to perform this operation")
|
||||||
|
|
||||||
// Returns the error response in a string form that can be more easily consumed.
|
// Returns the error response in a string form that can be more easily consumed.
|
||||||
func (pre PathResolutionError) Error() string {
|
func (pre PathResolutionError) Error() string {
|
||||||
return "invalid path resolution"
|
return "invalid path resolution"
|
||||||
@@ -257,7 +259,7 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
|
|||||||
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
|
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
|
||||||
// Check if cache is expired.
|
// Check if cache is expired.
|
||||||
fs.lookupTimeMu.RLock()
|
fs.lookupTimeMu.RLock()
|
||||||
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10))
|
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * time.Duration(-1*config.Get().System.DiskCheckInterval)))
|
||||||
fs.lookupTimeMu.RUnlock()
|
fs.lookupTimeMu.RUnlock()
|
||||||
|
|
||||||
if !isValidInCache {
|
if !isValidInCache {
|
||||||
@@ -400,9 +402,10 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error {
|
|||||||
currentSize = stat.Size()
|
currentSize = stat.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
o := &fileOpener{}
|
||||||
// This will either create the file if it does not already exist, or open and
|
// This will either create the file if it does not already exist, or open and
|
||||||
// truncate the existing file.
|
// truncate the existing file.
|
||||||
file, err := os.OpenFile(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
file, err := o.open(cleaned, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
@@ -782,6 +785,10 @@ func (fs *Filesystem) EnsureDataDirectory() error {
|
|||||||
if err := os.MkdirAll(fs.Path(), 0700); err != nil {
|
if err := os.MkdirAll(fs.Path(), 0700); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := fs.Chown("/"); err != nil {
|
||||||
|
fs.Server.Log().WithField("error", err).Warn("failed to chown server data directory")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -933,3 +940,29 @@ func (fs *Filesystem) handleWalkerError(err error, f os.FileInfo) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fileOpener struct {
|
||||||
|
busy uint
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempts to open a given file up to "attempts" number of times, using a backoff. If the file
|
||||||
|
// cannot be opened because of a "text file busy" error, we will attempt until the number of attempts
|
||||||
|
// has been exhaused, at which point we will abort with an error.
|
||||||
|
func (fo *fileOpener) open(path string, flags int, perm os.FileMode) (*os.File, error) {
|
||||||
|
for {
|
||||||
|
f, err := os.OpenFile(path, flags, perm)
|
||||||
|
|
||||||
|
// If there is an error because the text file is busy, go ahead and sleep for a few
|
||||||
|
// hundred milliseconds and then try again up to three times before just returning the
|
||||||
|
// error back to the caller.
|
||||||
|
//
|
||||||
|
// Based on code from: https://github.com/golang/go/issues/22220#issuecomment-336458122
|
||||||
|
if err != nil && fo.busy < 3 && strings.Contains(err.Error(), "text file busy") {
|
||||||
|
time.Sleep(100 * time.Millisecond << fo.busy)
|
||||||
|
fo.busy++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return f, err
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,14 +32,17 @@ func (fs *Filesystem) SpaceAvailableForDecompression(dir string, file string) (b
|
|||||||
dirSize, err := fs.DiskUsage(false)
|
dirSize, err := fs.DiskUsage(false)
|
||||||
|
|
||||||
var size int64
|
var size int64
|
||||||
|
var max = fs.Server.DiskSpace() * 1000.0 * 1000.0
|
||||||
// Walk over the archive and figure out just how large the final output would be from unarchiving it.
|
// Walk over the archive and figure out just how large the final output would be from unarchiving it.
|
||||||
archiver.Walk(source, func(f archiver.File) error {
|
err = archiver.Walk(source, func(f archiver.File) error {
|
||||||
atomic.AddInt64(&size, f.Size())
|
if atomic.AddInt64(&size, f.Size()) + dirSize > max {
|
||||||
|
return errors.WithStack(ErrNotEnoughDiskSpace)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return ((dirSize + size) / 1000.0 / 1000.0) <= fs.Server.DiskSpace(), errors.WithStack(err)
|
return err == nil, errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decompress a file in a given directory by using the archiver tool to infer the file
|
// Decompress a file in a given directory by using the archiver tool to infer the file
|
||||||
@@ -80,6 +83,11 @@ func (fs *Filesystem) DecompressFile(dir string, file string) error {
|
|||||||
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
|
return errors.New(fmt.Sprintf("could not parse underlying data source with type %s", reflect.TypeOf(s).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.Wrap(fs.Writefile(name, f), "could not extract file from archive")
|
p, err := fs.SafePath(filepath.Join(dir, name))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to generate a safe path to server file")
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(fs.Writefile(p, f), "could not extract file from archive")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,58 +5,104 @@ import (
|
|||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/pterodactyl/wings/api"
|
"github.com/pterodactyl/wings/api"
|
||||||
|
"github.com/pterodactyl/wings/config"
|
||||||
"github.com/pterodactyl/wings/environment"
|
"github.com/pterodactyl/wings/environment"
|
||||||
"github.com/pterodactyl/wings/events"
|
"github.com/pterodactyl/wings/events"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Adds all of the internal event listeners we want to use for a server.
|
var dockerEvents = []string{
|
||||||
|
environment.DockerImagePullStatus,
|
||||||
|
environment.DockerImagePullStarted,
|
||||||
|
environment.DockerImagePullCompleted,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adds all of the internal event listeners we want to use for a server. These listeners can only be
|
||||||
|
// removed by deleting the server as they should last for the duration of the process' lifetime.
|
||||||
func (s *Server) StartEventListeners() {
|
func (s *Server) StartEventListeners() {
|
||||||
console := make(chan events.Event)
|
console := func(e events.Event) {
|
||||||
state := make(chan events.Event)
|
t := s.Throttler()
|
||||||
stats := make(chan events.Event)
|
err := t.Increment(func () {
|
||||||
|
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
|
||||||
|
})
|
||||||
|
|
||||||
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console)
|
// An error is only returned if the server has breached the thresholds set.
|
||||||
s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
|
if err != nil {
|
||||||
s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
|
// If the process is already stopping, just let it continue with that action rather than attempting
|
||||||
|
// to terminate again.
|
||||||
|
if s.GetState() != environment.ProcessStoppingState {
|
||||||
|
s.SetState(environment.ProcessStoppingState)
|
||||||
|
go func() {
|
||||||
|
s.Log().Warn("stopping server instance, violating throttle limits")
|
||||||
|
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.")
|
||||||
|
// Completely skip over server power actions and terminate the running instance. This gives the
|
||||||
|
// server 15 seconds to finish stopping gracefully before it is forcefully terminated.
|
||||||
|
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil {
|
||||||
|
// If there is an error set the process back to running so that this throttler is called
|
||||||
|
// again and hopefully kills the server.
|
||||||
|
if s.GetState() != environment.ProcessOfflineState {
|
||||||
|
s.SetState(environment.ProcessRunningState)
|
||||||
|
}
|
||||||
|
|
||||||
go func(console chan events.Event) {
|
s.Log().WithField("error", errors.WithStack(err)).Error("failed to terminate environment after triggering throttle")
|
||||||
for data := range console {
|
}
|
||||||
// Immediately emit this event back over the server event stream since it is
|
}()
|
||||||
// being called from the environment event stream and things probably aren't
|
|
||||||
// listening to that event.
|
|
||||||
s.Events().Publish(ConsoleOutputEvent, data.Data)
|
|
||||||
|
|
||||||
// Also pass the data along to the console output channel.
|
|
||||||
s.onConsoleOutput(data.Data)
|
|
||||||
}
|
|
||||||
}(console)
|
|
||||||
|
|
||||||
go func(state chan events.Event) {
|
|
||||||
for data := range state {
|
|
||||||
s.SetState(data.Data)
|
|
||||||
}
|
|
||||||
}(state)
|
|
||||||
|
|
||||||
go func(stats chan events.Event) {
|
|
||||||
for data := range stats {
|
|
||||||
st := new(environment.Stats)
|
|
||||||
if err := json.Unmarshal([]byte(data.Data), st); err != nil {
|
|
||||||
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the server resource tracking object with the resources we got here.
|
|
||||||
s.resources.mu.Lock()
|
|
||||||
s.resources.Stats = *st
|
|
||||||
s.resources.mu.Unlock()
|
|
||||||
|
|
||||||
s.Filesystem.HasSpaceAvailable(true)
|
|
||||||
|
|
||||||
s.emitProcUsage()
|
|
||||||
}
|
}
|
||||||
}(stats)
|
|
||||||
|
// If we are not throttled, go ahead and output the data.
|
||||||
|
if !t.Throttled() {
|
||||||
|
s.Events().Publish(ConsoleOutputEvent, e.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also pass the data along to the console output channel.
|
||||||
|
s.onConsoleOutput(e.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
state := func(e events.Event) {
|
||||||
|
// Reset the throttler when the process is started.
|
||||||
|
if e.Data == environment.ProcessStartingState {
|
||||||
|
s.Throttler().Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
s.SetState(e.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := func(e events.Event) {
|
||||||
|
st := new(environment.Stats)
|
||||||
|
if err := json.Unmarshal([]byte(e.Data), st); err != nil {
|
||||||
|
s.Log().WithField("error", errors.WithStack(err)).Warn("failed to unmarshal server environment stats")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the server resource tracking object with the resources we got here.
|
||||||
|
s.resources.mu.Lock()
|
||||||
|
s.resources.Stats = *st
|
||||||
|
s.resources.mu.Unlock()
|
||||||
|
|
||||||
|
s.Filesystem.HasSpaceAvailable(true)
|
||||||
|
|
||||||
|
s.emitProcUsage()
|
||||||
|
}
|
||||||
|
|
||||||
|
docker := func(e events.Event) {
|
||||||
|
if e.Topic == environment.DockerImagePullStatus {
|
||||||
|
s.Events().Publish(InstallOutputEvent, e.Data)
|
||||||
|
} else if e.Topic == environment.DockerImagePullStarted {
|
||||||
|
s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...")
|
||||||
|
} else {
|
||||||
|
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Log().Info("registering event listeners: console, state, resources...")
|
||||||
|
s.Environment.Events().On(environment.ConsoleOutputEvent, &console)
|
||||||
|
s.Environment.Events().On(environment.StateChangeEvent, &state)
|
||||||
|
s.Environment.Events().On(environment.ResourceEvent, &stats)
|
||||||
|
for _, evt := range dockerEvents {
|
||||||
|
s.Environment.Events().On(evt, &docker)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
||||||
|
|||||||
@@ -37,12 +37,6 @@ func LoadDirectory() error {
|
|||||||
return errors.New(rerr.String())
|
return errors.New(rerr.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("retrieving cached server states from disk")
|
|
||||||
states, err := getServerStates()
|
|
||||||
if err != nil {
|
|
||||||
log.WithField("error", errors.WithStack(err)).Error("failed to retrieve locally cached server states from disk, assuming all servers in offline state")
|
|
||||||
}
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
log.WithField("total_configs", len(configs)).Info("processing servers returned by the API")
|
log.WithField("total_configs", len(configs)).Info("processing servers returned by the API")
|
||||||
|
|
||||||
@@ -59,11 +53,6 @@ func LoadDirectory() error {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if state, exists := states[s.Id()]; exists {
|
|
||||||
s.Log().WithField("state", state).Debug("found existing server state in cache file; re-instantiating server state")
|
|
||||||
s.SetState(state)
|
|
||||||
}
|
|
||||||
|
|
||||||
servers.Add(s)
|
servers.Add(s)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -97,6 +86,9 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.resources = ResourceUsage{}
|
||||||
|
defaults.Set(&s.resources)
|
||||||
|
|
||||||
s.Archiver = Archiver{Server: s}
|
s.Archiver = Archiver{Server: s}
|
||||||
s.Filesystem = Filesystem{Server: s}
|
s.Filesystem = Filesystem{Server: s}
|
||||||
|
|
||||||
@@ -118,7 +110,8 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
s.Environment = env
|
s.Environment = env
|
||||||
go s.StartEventListeners()
|
s.StartEventListeners()
|
||||||
|
s.Throttler().StartTimer()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forces the configuration to be synced with the panel.
|
// Forces the configuration to be synced with the panel.
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/pterodactyl/wings/config"
|
"github.com/pterodactyl/wings/config"
|
||||||
|
"github.com/pterodactyl/wings/environment"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -87,6 +88,10 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
|
|||||||
|
|
||||||
switch action {
|
switch action {
|
||||||
case PowerActionStart:
|
case PowerActionStart:
|
||||||
|
if s.GetState() != environment.ProcessOfflineState {
|
||||||
|
return ErrIsRunning
|
||||||
|
}
|
||||||
|
|
||||||
// Run the pre-boot logic for the server before processing the environment start.
|
// Run the pre-boot logic for the server before processing the environment start.
|
||||||
if err := s.onBeforeStart(); err != nil {
|
if err := s.onBeforeStart(); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -134,7 +139,7 @@ func (s *Server) onBeforeStart() error {
|
|||||||
// Disallow start & restart if the server is suspended. Do this check after performing a sync
|
// Disallow start & restart if the server is suspended. Do this check after performing a sync
|
||||||
// action with the Panel to ensure that we have the most up-to-date information for that server.
|
// action with the Panel to ensure that we have the most up-to-date information for that server.
|
||||||
if s.IsSuspended() {
|
if s.IsSuspended() {
|
||||||
return new(suspendedError)
|
return ErrSuspended
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure we sync the server information with the environment so that any new environment variables
|
// Ensure we sync the server information with the environment so that any new environment variables
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"github.com/pterodactyl/wings/environment"
|
"github.com/pterodactyl/wings/environment"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@@ -37,14 +36,10 @@ func (s *Server) Proc() *ResourceUsage {
|
|||||||
|
|
||||||
func (s *Server) emitProcUsage() {
|
func (s *Server) emitProcUsage() {
|
||||||
s.resources.mu.RLock()
|
s.resources.mu.RLock()
|
||||||
defer s.resources.mu.RUnlock()
|
if err := s.Events().PublishJson(StatsEvent, s.resources); err != nil {
|
||||||
|
s.Log().WithField("error", err).Warn("error while emitting server resource usage to listeners")
|
||||||
b, err := json.Marshal(s.resources)
|
|
||||||
if err == nil {
|
|
||||||
s.Events().Publish(StatsEvent, string(b))
|
|
||||||
}
|
}
|
||||||
|
s.resources.mu.RUnlock()
|
||||||
// TODO: This might be a good place to add a debug log if stats are not sending.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the servers current state.
|
// Returns the servers current state.
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ type Server struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
emitterLock sync.Mutex
|
emitterLock sync.Mutex
|
||||||
powerLock *semaphore.Weighted
|
powerLock *semaphore.Weighted
|
||||||
throttleLock sync.RWMutex
|
throttleLock sync.Mutex
|
||||||
|
|
||||||
// Maintains the configuration for the server. This is the data that gets returned by the Panel
|
// Maintains the configuration for the server. This is the data that gets returned by the Panel
|
||||||
// such as build settings and container images.
|
// such as build settings and container images.
|
||||||
@@ -137,6 +137,7 @@ func (s *Server) SyncWithConfiguration(cfg *api.ServerConfigurationResponse) err
|
|||||||
// the process isn't just terminated when a user requests it be stopped.
|
// the process isn't just terminated when a user requests it be stopped.
|
||||||
if e, ok := s.Environment.(*docker.Environment); ok {
|
if e, ok := s.Environment.(*docker.Environment); ok {
|
||||||
s.Log().Debug("syncing stop configuration with configured docker environment")
|
s.Log().Debug("syncing stop configuration with configured docker environment")
|
||||||
|
e.SetImage(s.Config().Container.Image)
|
||||||
e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop)
|
e.SetStopConfiguration(&cfg.ProcessConfiguration.Stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
var stateMutex sync.Mutex
|
var stateMutex sync.Mutex
|
||||||
|
|
||||||
// Returns the state of the servers.
|
// Returns the state of the servers.
|
||||||
func getServerStates() (map[string]string, error) {
|
func CachedServerStates() (map[string]string, error) {
|
||||||
// Request a lock after we check if the file exists.
|
// Request a lock after we check if the file exists.
|
||||||
stateMutex.Lock()
|
stateMutex.Lock()
|
||||||
defer stateMutex.Unlock()
|
defer stateMutex.Unlock()
|
||||||
@@ -78,8 +78,8 @@ func (s *Server) SetState(state string) error {
|
|||||||
|
|
||||||
// Emit the event to any listeners that are currently registered.
|
// Emit the event to any listeners that are currently registered.
|
||||||
if prevState != state {
|
if prevState != state {
|
||||||
s.Log().WithField("status", s.Proc().State).Debug("saw server status change event")
|
s.Log().WithField("status", s.Proc().getInternalState()).Debug("saw server status change event")
|
||||||
s.Events().Publish(StatusEvent, s.Proc().State)
|
s.Events().Publish(StatusEvent, s.Proc().getInternalState())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist this change to the disk immediately so that should the Daemon be stopped or
|
// Persist this change to the disk immediately so that should the Daemon be stopped or
|
||||||
|
|||||||
20
system/bool.go
Normal file
20
system/bool.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import "sync/atomic"
|
||||||
|
|
||||||
|
type AtomicBool struct {
|
||||||
|
flag uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ab *AtomicBool) Set(v bool) {
|
||||||
|
i := 0
|
||||||
|
if v {
|
||||||
|
i = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.StoreUint32(&ab.flag, uint32(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ab *AtomicBool) Get() bool {
|
||||||
|
return atomic.LoadUint32(&ab.flag) == 1
|
||||||
|
}
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
{{.LogDirectory}}/wings.log {
|
|
||||||
size 10M
|
|
||||||
compress
|
|
||||||
delaycompress
|
|
||||||
dateext
|
|
||||||
maxage 7
|
|
||||||
missingok
|
|
||||||
notifempty
|
|
||||||
create 0640 {{.User.Uid}} {{.User.Gid}}
|
|
||||||
postrotate
|
|
||||||
killall -SIGHUP wings
|
|
||||||
endscript
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user