From b2eebcaf6deb51c098889727608d69fd7482336a Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Fri, 11 Sep 2020 23:01:54 -0700 Subject: [PATCH] Fix deadlocks in event listener system; closes pterodactyl/panel#2298 Fixes deadlocks that occurred when events were registered while other events were being unsubscribed and data was being flooded to these listeners. A complete mess, I hate this code, it is going to break again, but jesus I'm so tired. --- events/events.go | 38 +++++++++++++++++++++-------------- go.mod | 2 ++ go.sum | 4 ++++ router/websocket/listeners.go | 26 +++++++++++++----------- router/websocket/websocket.go | 2 ++ server/listeners.go | 15 ++++++++++---- server/loader.go | 2 +- 7 files changed, 57 insertions(+), 32 deletions(-) diff --git a/events/events.go b/events/events.go index bd935ad..198e40d 100644 --- a/events/events.go +++ b/events/events.go @@ -2,8 +2,8 @@ package events import ( "encoding/json" + "github.com/sasha-s/go-deadlock" "strings" - "sync" ) type Event struct { @@ -12,7 +12,7 @@ type Event struct { } type EventBus struct { - sync.RWMutex + deadlock.RWMutex subscribers map[string]map[chan Event]struct{} } @@ -47,8 +47,12 @@ func (e *EventBus) Publish(topic string, data string) { defer e.RUnlock() if ch, ok := e.subscribers[t]; ok { + e := Event{Data: data, Topic: topic} + for channel := range ch { - channel <- Event{Data: data, Topic: topic} + go func(channel chan Event, e Event) { + channel <- e + }(channel, e) } } }() @@ -66,29 +70,33 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error { } // Subscribe to an emitter topic using a channel. -func (e *EventBus) Subscribe(topic string, ch chan Event) { +func (e *EventBus) Subscribe(topics []string, ch chan Event) { e.Lock() defer e.Unlock() - if _, exists := e.subscribers[topic]; !exists { - e.subscribers[topic] = make(map[chan Event]struct{}) - } + for _, topic := range topics { + if _, exists := e.subscribers[topic]; !exists { + e.subscribers[topic] = make(map[chan Event]struct{}) + } - // Only set the channel if there is not currently a matching one for this topic. This - // avoids registering two identical listeners for the same topic and causing pain in - // the unsubscribe functionality as well. - if _, exists := e.subscribers[topic][ch]; !exists { - e.subscribers[topic][ch] = struct{}{} + // Only set the channel if there is not currently a matching one for this topic. This + // avoids registering two identical listeners for the same topic and causing pain in + // the unsubscribe functionality as well. + if _, exists := e.subscribers[topic][ch]; !exists { + e.subscribers[topic][ch] = struct{}{} + } } } // Unsubscribe a channel from a given topic. -func (e *EventBus) Unsubscribe(topic string, ch chan Event) { +func (e *EventBus) Unsubscribe(topics []string, ch chan Event) { e.Lock() defer e.Unlock() - if _, exists := e.subscribers[topic][ch]; exists { - delete(e.subscribers[topic], ch) + for _, topic := range topics { + if _, exists := e.subscribers[topic][ch]; exists { + delete(e.subscribers[topic], ch) + } } } diff --git a/go.mod b/go.mod index f174131..f29bc2f 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.5.0 @@ -62,6 +63,7 @@ require ( github.com/prometheus/common v0.11.1 // indirect github.com/remeh/sizedwaitgroup v1.0.0 github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94 + github.com/sasha-s/go-deadlock v0.2.0 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 // indirect github.com/ulikunitz/xz v0.5.7 // indirect diff --git a/go.sum b/go.sum index 5733b1f..88c0926 100644 --- a/go.sum +++ b/go.sum @@ -412,6 +412,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -479,6 +481,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94 h1:G04eS0JkAIVZfaJLjla9dNxkJCPiKIGZlw9AfOhzOD0= github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94/go.mod h1:b18R55ulyQ/h3RaWyloPyER7fWQVZvimKKhnI5OfrJQ= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y= +github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index f35e0e6..8632f45 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -50,24 +50,26 @@ var e = []string{ // Listens for different events happening on a server and sends them along // to the connected websocket. func (h *Handler) ListenForServerEvents(ctx context.Context) { - eventChannel := make(chan events.Event) - for _, event := range e { - h.server.Events().Subscribe(event, eventChannel) - } + h.server.Log().Debug("listening for server events over websocket") - for d := range eventChannel { + eventChannel := make(chan events.Event) + h.server.Events().Subscribe(e, eventChannel) + + go func(ctx context.Context) { select { case <-ctx.Done(): - for _, event := range e { - h.server.Events().Unsubscribe(event, eventChannel) + if h.jwt != nil { + h.server.Log().WithField("jwt_subject", h.jwt.Subject).Debug("unsubscribing server from event listeners") } + h.server.Events().Unsubscribe(e, eventChannel) close(eventChannel) - default: - _ = h.SendJson(&Message{ - Event: d.Topic, - Args: []string{d.Data}, - }) + } + }(ctx) + + for d := range eventChannel { + if err := h.SendJson(&Message{Event: d.Topic, Args: []string{d.Data}}); err != nil { + h.server.Log().WithField("error", err).Warn("error while sending server data over websocket") } } } diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index c9e81be..5714657 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -95,6 +95,8 @@ func (h *Handler) SendJson(v *Message) error { // Do not send JSON down the line if the JWT on the connection is not // valid! if err := h.TokenValid(); err != nil { + h.server.Log().WithField("error", err).Warn("invalid JWT detected for server websocket!") + return nil } diff --git a/server/listeners.go b/server/listeners.go index 5f70027..bb0d278 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -17,10 +17,6 @@ func (s *Server) StartEventListeners() { state := make(chan events.Event) stats := make(chan events.Event) - s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console) - s.Environment.Events().Subscribe(environment.StateChangeEvent, state) - s.Environment.Events().Subscribe(environment.ResourceEvent, stats) - go func(console chan events.Event) { for data := range console { // Immediately emit this event back over the server event stream since it is @@ -31,12 +27,16 @@ func (s *Server) StartEventListeners() { // Also pass the data along to the console output channel. s.onConsoleOutput(data.Data) } + + s.Log().Fatal("unexpected end-of-range for server console channel") }(console) go func(state chan events.Event) { for data := range state { s.SetState(data.Data) } + + s.Log().Fatal("unexpected end-of-range for server state channel") }(state) go func(stats chan events.Event) { @@ -56,7 +56,14 @@ func (s *Server) StartEventListeners() { s.emitProcUsage() } + + s.Log().Fatal("unexpected end-of-range for server stats channel") }(stats) + + s.Log().Info("registering event listeners: console, state, resources...") + s.Environment.Events().Subscribe([]string{environment.ConsoleOutputEvent}, console) + s.Environment.Events().Subscribe([]string{environment.StateChangeEvent}, state) + s.Environment.Events().Subscribe([]string{environment.ResourceEvent}, stats) } var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") diff --git a/server/loader.go b/server/loader.go index cd2bc78..45de2ab 100644 --- a/server/loader.go +++ b/server/loader.go @@ -118,7 +118,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) { return nil, err } else { s.Environment = env - go s.StartEventListeners() + s.StartEventListeners() } // Forces the configuration to be synced with the panel.