Compare commits

..

8 Commits

Author SHA1 Message Date
Dane Everitt
fa6f56caa8 Remove pointless debug log 2020-09-11 23:18:51 -07:00
Dane Everitt
5a62f83ec8 Don't run pre-boot actions if the server is already running 2020-09-11 23:11:57 -07:00
Dane Everitt
8bcb3d7c62 Remove deadlock specific code 2020-09-11 23:03:35 -07:00
Dane Everitt
b2eebcaf6d Fix deadlocks in event listener system; closes pterodactyl/panel#2298
Fixes deadlocks that occurred when events were registered while other events were being unsubscribed and data was being flooded to these listeners. A complete mess, I hate this code, it is going to break again, but jesus I'm so tired.
2020-09-11 23:01:54 -07:00
Matthew Penner
45bcb9cd68 Lets not attempt to pull 16384 log lines 2020-09-11 22:52:07 -06:00
Dane Everitt
e1ff4db330 Also fix builds for non-releases 2020-09-11 21:00:37 -07:00
Dane Everitt
606143b3ad Fix flags for workflow, strips 8MB off final binary size 2020-09-11 20:59:39 -07:00
Dane Everitt
57221bdd30 Make disk checking timeout configurable 2020-09-11 20:24:23 -07:00
11 changed files with 69 additions and 49 deletions

View File

@@ -17,7 +17,7 @@ jobs:
go-version: '^1.15'
- name: Build
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=dev-${GIT_COMMIT:0:7}" -o build/wings_linux_amd64 -v wings.go
- name: Test
run: go test ./...

View File

@@ -17,7 +17,7 @@ jobs:
- name: Build
env:
REF: ${{ github.ref }}
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -ldflags "-X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
run: GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -X github.com/pterodactyl/wings/system.Version=${REF:11}" -o build/wings_linux_amd64 -v wings.go
- name: Test
run: go test ./...

View File

@@ -36,6 +36,12 @@ type SystemConfiguration struct {
Gid int
}
// The amount of time in seconds that can elapse before a server's disk space calculation is
// considered stale and a re-check should occur. DANGER: setting this value too low can seriously
// impact system performance and cause massive I/O bottlenecks and high CPU usage for the Wings
// process.
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
// Determines if Wings should detect a server that stops with a normal exit code of
// "0" as being crashed if the process stopped without any Wings interaction. E.g.
// the user did not press the stop button, but the process stopped cleanly.

View File

@@ -47,8 +47,12 @@ func (e *EventBus) Publish(topic string, data string) {
defer e.RUnlock()
if ch, ok := e.subscribers[t]; ok {
e := Event{Data: data, Topic: topic}
for channel := range ch {
channel <- Event{Data: data, Topic: topic}
go func(channel chan Event, e Event) {
channel <- e
}(channel, e)
}
}
}()
@@ -66,29 +70,33 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
}
// Subscribe to an emitter topic using a channel.
func (e *EventBus) Subscribe(topic string, ch chan Event) {
func (e *EventBus) Subscribe(topics []string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic]; !exists {
e.subscribers[topic] = make(map[chan Event]struct{})
}
for _, topic := range topics {
if _, exists := e.subscribers[topic]; !exists {
e.subscribers[topic] = make(map[chan Event]struct{})
}
// Only set the channel if there is not currently a matching one for this topic. This
// avoids registering two identical listeners for the same topic and causing pain in
// the unsubscribe functionality as well.
if _, exists := e.subscribers[topic][ch]; !exists {
e.subscribers[topic][ch] = struct{}{}
// Only set the channel if there is not currently a matching one for this topic. This
// avoids registering two identical listeners for the same topic and causing pain in
// the unsubscribe functionality as well.
if _, exists := e.subscribers[topic][ch]; !exists {
e.subscribers[topic][ch] = struct{}{}
}
}
}
// Unsubscribe a channel from a given topic.
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
func (e *EventBus) Unsubscribe(topics []string, ch chan Event) {
e.Lock()
defer e.Unlock()
if _, exists := e.subscribers[topic][ch]; exists {
delete(e.subscribers[topic], ch)
for _, topic := range topics {
if _, exists := e.subscribers[topic][ch]; exists {
delete(e.subscribers[topic], ch)
}
}
}

View File

@@ -50,24 +50,23 @@ var e = []string{
// Listens for different events happening on a server and sends them along
// to the connected websocket.
func (h *Handler) ListenForServerEvents(ctx context.Context) {
eventChannel := make(chan events.Event)
for _, event := range e {
h.server.Events().Subscribe(event, eventChannel)
}
h.server.Log().Debug("listening for server events over websocket")
for d := range eventChannel {
eventChannel := make(chan events.Event)
h.server.Events().Subscribe(e, eventChannel)
go func(ctx context.Context) {
select {
case <-ctx.Done():
for _, event := range e {
h.server.Events().Unsubscribe(event, eventChannel)
}
h.server.Events().Unsubscribe(e, eventChannel)
close(eventChannel)
default:
_ = h.SendJson(&Message{
Event: d.Topic,
Args: []string{d.Data},
})
}
}(ctx)
for d := range eventChannel {
if err := h.SendJson(&Message{Event: d.Topic, Args: []string{d.Data}}); err != nil {
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
}
}
}

View File

@@ -95,6 +95,8 @@ func (h *Handler) SendJson(v *Message) error {
// Do not send JSON down the line if the JWT on the connection is not
// valid!
if err := h.TokenValid(); err != nil {
h.server.Log().WithField("error", err).Warn("invalid JWT detected for server websocket!")
return nil
}
@@ -157,9 +159,10 @@ func (h *Handler) TokenValid() error {
// error message, otherwise we just send back a standard error message.
func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error {
j := h.GetJwt()
expected := errors.Is(err, server.ErrSuspended) || errors.Is(err, server.ErrIsRunning)
message := "an unexpected error was encountered while handling this request"
if server.IsSuspendedError(err) || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
if expected || (j != nil && j.HasPermission(PermissionReceiveErrors)) {
message = err.Error()
}
@@ -169,7 +172,7 @@ func (h *Handler) SendErrorJson(msg Message, err error, shouldLog ...bool) error
wsm.Args = []string{m}
if len(shouldLog) == 0 || (len(shouldLog) == 1 && shouldLog[0] == true) {
if !server.IsSuspendedError(err) {
if !expected {
h.server.Log().WithFields(log.Fields{"event": msg.Event, "error_identifier": u.String(), "error": err}).
Error("failed to handle websocket process; an error was encountered processing an event")
}
@@ -313,7 +316,7 @@ func (h *Handler) HandleInbound(m Message) error {
return nil
}
logs, err := h.server.Environment.Readlog(1024 * 16)
logs, err := h.server.Environment.Readlog(100)
if err != nil {
return err
}

View File

@@ -1,17 +1,9 @@
package server
type suspendedError struct {
}
import "github.com/pkg/errors"
func (e *suspendedError) Error() string {
return "server is currently in a suspended state"
}
func IsSuspendedError(err error) bool {
_, ok := err.(*suspendedError)
return ok
}
var ErrIsRunning = errors.New("server is running")
var ErrSuspended = errors.New("server is currently in a suspended state")
type crashTooFrequent struct {
}

View File

@@ -257,7 +257,7 @@ func (fs *Filesystem) HasSpaceAvailable(allowStaleValue bool) bool {
func (fs *Filesystem) DiskUsage(allowStaleValue bool) (int64, error) {
// Check if cache is expired.
fs.lookupTimeMu.RLock()
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * -10))
isValidInCache := fs.lastLookupTime.After(time.Now().Add(time.Second * time.Duration(-1*config.Get().System.DiskCheckInterval)))
fs.lookupTimeMu.RUnlock()
if !isValidInCache {

View File

@@ -17,10 +17,6 @@ func (s *Server) StartEventListeners() {
state := make(chan events.Event)
stats := make(chan events.Event)
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console)
s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
go func(console chan events.Event) {
for data := range console {
// Immediately emit this event back over the server event stream since it is
@@ -31,12 +27,16 @@ func (s *Server) StartEventListeners() {
// Also pass the data along to the console output channel.
s.onConsoleOutput(data.Data)
}
s.Log().Fatal("unexpected end-of-range for server console channel")
}(console)
go func(state chan events.Event) {
for data := range state {
s.SetState(data.Data)
}
s.Log().Fatal("unexpected end-of-range for server state channel")
}(state)
go func(stats chan events.Event) {
@@ -56,7 +56,14 @@ func (s *Server) StartEventListeners() {
s.emitProcUsage()
}
s.Log().Fatal("unexpected end-of-range for server stats channel")
}(stats)
s.Log().Info("registering event listeners: console, state, resources...")
s.Environment.Events().Subscribe([]string{environment.ConsoleOutputEvent}, console)
s.Environment.Events().Subscribe([]string{environment.StateChangeEvent}, state)
s.Environment.Events().Subscribe([]string{environment.ResourceEvent}, stats)
}
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")

View File

@@ -118,7 +118,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
return nil, err
} else {
s.Environment = env
go s.StartEventListeners()
s.StartEventListeners()
}
// Forces the configuration to be synced with the panel.

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/pkg/errors"
"github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment"
"golang.org/x/sync/semaphore"
"os"
"time"
@@ -87,6 +88,10 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error
switch action {
case PowerActionStart:
if s.GetState() != environment.ProcessOfflineState {
return ErrIsRunning
}
// Run the pre-boot logic for the server before processing the environment start.
if err := s.onBeforeStart(); err != nil {
return err
@@ -134,7 +139,7 @@ func (s *Server) onBeforeStart() error {
// Disallow start & restart if the server is suspended. Do this check after performing a sync
// action with the Panel to ensure that we have the most up-to-date information for that server.
if s.IsSuspended() {
return new(suspendedError)
return ErrSuspended
}
// Ensure we sync the server information with the environment so that any new environment variables