Fix deadlocks in event listener system; closes pterodactyl/panel#2298
Fixes deadlocks that occurred when events were registered while other events were being unsubscribed and data was being flooded to these listeners. A complete mess, I hate this code, it is going to break again, but jesus I'm so tired.
This commit is contained in:
parent
45bcb9cd68
commit
b2eebcaf6d
|
@ -2,8 +2,8 @@ package events
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/sasha-s/go-deadlock"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
@ -12,7 +12,7 @@ type Event struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
sync.RWMutex
|
deadlock.RWMutex
|
||||||
|
|
||||||
subscribers map[string]map[chan Event]struct{}
|
subscribers map[string]map[chan Event]struct{}
|
||||||
}
|
}
|
||||||
|
@ -47,8 +47,12 @@ func (e *EventBus) Publish(topic string, data string) {
|
||||||
defer e.RUnlock()
|
defer e.RUnlock()
|
||||||
|
|
||||||
if ch, ok := e.subscribers[t]; ok {
|
if ch, ok := e.subscribers[t]; ok {
|
||||||
|
e := Event{Data: data, Topic: topic}
|
||||||
|
|
||||||
for channel := range ch {
|
for channel := range ch {
|
||||||
channel <- Event{Data: data, Topic: topic}
|
go func(channel chan Event, e Event) {
|
||||||
|
channel <- e
|
||||||
|
}(channel, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -66,29 +70,33 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to an emitter topic using a channel.
|
// Subscribe to an emitter topic using a channel.
|
||||||
func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
func (e *EventBus) Subscribe(topics []string, ch chan Event) {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
if _, exists := e.subscribers[topic]; !exists {
|
for _, topic := range topics {
|
||||||
e.subscribers[topic] = make(map[chan Event]struct{})
|
if _, exists := e.subscribers[topic]; !exists {
|
||||||
}
|
e.subscribers[topic] = make(map[chan Event]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
// Only set the channel if there is not currently a matching one for this topic. This
|
// Only set the channel if there is not currently a matching one for this topic. This
|
||||||
// avoids registering two identical listeners for the same topic and causing pain in
|
// avoids registering two identical listeners for the same topic and causing pain in
|
||||||
// the unsubscribe functionality as well.
|
// the unsubscribe functionality as well.
|
||||||
if _, exists := e.subscribers[topic][ch]; !exists {
|
if _, exists := e.subscribers[topic][ch]; !exists {
|
||||||
e.subscribers[topic][ch] = struct{}{}
|
e.subscribers[topic][ch] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe a channel from a given topic.
|
// Unsubscribe a channel from a given topic.
|
||||||
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
func (e *EventBus) Unsubscribe(topics []string, ch chan Event) {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
if _, exists := e.subscribers[topic][ch]; exists {
|
for _, topic := range topics {
|
||||||
delete(e.subscribers[topic], ch)
|
if _, exists := e.subscribers[topic][ch]; exists {
|
||||||
|
delete(e.subscribers[topic], ch)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -55,6 +55,7 @@ require (
|
||||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||||
github.com/opencontainers/image-spec v1.0.1 // indirect
|
github.com/opencontainers/image-spec v1.0.1 // indirect
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
|
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
|
||||||
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
|
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/pkg/profile v1.5.0
|
github.com/pkg/profile v1.5.0
|
||||||
|
@ -62,6 +63,7 @@ require (
|
||||||
github.com/prometheus/common v0.11.1 // indirect
|
github.com/prometheus/common v0.11.1 // indirect
|
||||||
github.com/remeh/sizedwaitgroup v1.0.0
|
github.com/remeh/sizedwaitgroup v1.0.0
|
||||||
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94
|
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94
|
||||||
|
github.com/sasha-s/go-deadlock v0.2.0
|
||||||
github.com/spf13/cobra v1.0.0
|
github.com/spf13/cobra v1.0.0
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
github.com/spf13/pflag v1.0.5 // indirect
|
||||||
github.com/ulikunitz/xz v0.5.7 // indirect
|
github.com/ulikunitz/xz v0.5.7 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -412,6 +412,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK
|
||||||
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
|
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
|
||||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||||
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
|
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
|
||||||
|
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
||||||
|
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
|
||||||
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
|
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
|
||||||
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
|
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
|
||||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||||
|
@ -479,6 +481,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
|
||||||
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94 h1:G04eS0JkAIVZfaJLjla9dNxkJCPiKIGZlw9AfOhzOD0=
|
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94 h1:G04eS0JkAIVZfaJLjla9dNxkJCPiKIGZlw9AfOhzOD0=
|
||||||
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94/go.mod h1:b18R55ulyQ/h3RaWyloPyER7fWQVZvimKKhnI5OfrJQ=
|
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94/go.mod h1:b18R55ulyQ/h3RaWyloPyER7fWQVZvimKKhnI5OfrJQ=
|
||||||
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
|
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
|
||||||
|
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
|
||||||
|
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
|
||||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||||
|
|
|
@ -50,24 +50,26 @@ var e = []string{
|
||||||
// Listens for different events happening on a server and sends them along
|
// Listens for different events happening on a server and sends them along
|
||||||
// to the connected websocket.
|
// to the connected websocket.
|
||||||
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
func (h *Handler) ListenForServerEvents(ctx context.Context) {
|
||||||
eventChannel := make(chan events.Event)
|
h.server.Log().Debug("listening for server events over websocket")
|
||||||
for _, event := range e {
|
|
||||||
h.server.Events().Subscribe(event, eventChannel)
|
|
||||||
}
|
|
||||||
|
|
||||||
for d := range eventChannel {
|
eventChannel := make(chan events.Event)
|
||||||
|
h.server.Events().Subscribe(e, eventChannel)
|
||||||
|
|
||||||
|
go func(ctx context.Context) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
for _, event := range e {
|
if h.jwt != nil {
|
||||||
h.server.Events().Unsubscribe(event, eventChannel)
|
h.server.Log().WithField("jwt_subject", h.jwt.Subject).Debug("unsubscribing server from event listeners")
|
||||||
}
|
}
|
||||||
|
h.server.Events().Unsubscribe(e, eventChannel)
|
||||||
|
|
||||||
close(eventChannel)
|
close(eventChannel)
|
||||||
default:
|
}
|
||||||
_ = h.SendJson(&Message{
|
}(ctx)
|
||||||
Event: d.Topic,
|
|
||||||
Args: []string{d.Data},
|
for d := range eventChannel {
|
||||||
})
|
if err := h.SendJson(&Message{Event: d.Topic, Args: []string{d.Data}}); err != nil {
|
||||||
|
h.server.Log().WithField("error", err).Warn("error while sending server data over websocket")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,6 +95,8 @@ func (h *Handler) SendJson(v *Message) error {
|
||||||
// Do not send JSON down the line if the JWT on the connection is not
|
// Do not send JSON down the line if the JWT on the connection is not
|
||||||
// valid!
|
// valid!
|
||||||
if err := h.TokenValid(); err != nil {
|
if err := h.TokenValid(); err != nil {
|
||||||
|
h.server.Log().WithField("error", err).Warn("invalid JWT detected for server websocket!")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,6 @@ func (s *Server) StartEventListeners() {
|
||||||
state := make(chan events.Event)
|
state := make(chan events.Event)
|
||||||
stats := make(chan events.Event)
|
stats := make(chan events.Event)
|
||||||
|
|
||||||
s.Environment.Events().Subscribe(environment.ConsoleOutputEvent, console)
|
|
||||||
s.Environment.Events().Subscribe(environment.StateChangeEvent, state)
|
|
||||||
s.Environment.Events().Subscribe(environment.ResourceEvent, stats)
|
|
||||||
|
|
||||||
go func(console chan events.Event) {
|
go func(console chan events.Event) {
|
||||||
for data := range console {
|
for data := range console {
|
||||||
// Immediately emit this event back over the server event stream since it is
|
// Immediately emit this event back over the server event stream since it is
|
||||||
|
@ -31,12 +27,16 @@ func (s *Server) StartEventListeners() {
|
||||||
// Also pass the data along to the console output channel.
|
// Also pass the data along to the console output channel.
|
||||||
s.onConsoleOutput(data.Data)
|
s.onConsoleOutput(data.Data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.Log().Fatal("unexpected end-of-range for server console channel")
|
||||||
}(console)
|
}(console)
|
||||||
|
|
||||||
go func(state chan events.Event) {
|
go func(state chan events.Event) {
|
||||||
for data := range state {
|
for data := range state {
|
||||||
s.SetState(data.Data)
|
s.SetState(data.Data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.Log().Fatal("unexpected end-of-range for server state channel")
|
||||||
}(state)
|
}(state)
|
||||||
|
|
||||||
go func(stats chan events.Event) {
|
go func(stats chan events.Event) {
|
||||||
|
@ -56,7 +56,14 @@ func (s *Server) StartEventListeners() {
|
||||||
|
|
||||||
s.emitProcUsage()
|
s.emitProcUsage()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.Log().Fatal("unexpected end-of-range for server stats channel")
|
||||||
}(stats)
|
}(stats)
|
||||||
|
|
||||||
|
s.Log().Info("registering event listeners: console, state, resources...")
|
||||||
|
s.Environment.Events().Subscribe([]string{environment.ConsoleOutputEvent}, console)
|
||||||
|
s.Environment.Events().Subscribe([]string{environment.StateChangeEvent}, state)
|
||||||
|
s.Environment.Events().Subscribe([]string{environment.ResourceEvent}, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")
|
||||||
|
|
|
@ -118,7 +118,7 @@ func FromConfiguration(data *api.ServerConfigurationResponse) (*Server, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
s.Environment = env
|
s.Environment = env
|
||||||
go s.StartEventListeners()
|
s.StartEventListeners()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forces the configuration to be synced with the panel.
|
// Forces the configuration to be synced with the panel.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user