Unsubscribe any open event listeners when deleting a server
This commit is contained in:
parent
fab489d264
commit
2e055cf630
|
@ -178,6 +178,9 @@ func deleteServer(c *gin.Context) {
|
||||||
zap.S().Warnw("failed to delete server archive during deletion process", zap.String("server", s.Uuid), zap.Error(err))
|
zap.S().Warnw("failed to delete server archive during deletion process", zap.String("server", s.Uuid), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unsubscribe all of the event listeners.
|
||||||
|
s.Events().UnsubscribeAll()
|
||||||
|
|
||||||
// Destroy the environment; in Docker this will handle a running container and
|
// Destroy the environment; in Docker this will handle a running container and
|
||||||
// forcibly terminate it before removing the container, so we do not need to handle
|
// forcibly terminate it before removing the container, so we do not need to handle
|
||||||
// that here.
|
// that here.
|
||||||
|
|
|
@ -23,8 +23,9 @@ type Event struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
subscribers map[string][]chan Event
|
subscribers map[string][]chan Event
|
||||||
mu sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the server's emitter instance.
|
// Returns the server's emitter instance.
|
||||||
|
@ -40,8 +41,8 @@ func (s *Server) Events() *EventBus {
|
||||||
|
|
||||||
// Publish data to a given topic.
|
// Publish data to a given topic.
|
||||||
func (e *EventBus) Publish(topic string, data string) {
|
func (e *EventBus) Publish(topic string, data string) {
|
||||||
e.mu.Lock()
|
e.RLock()
|
||||||
defer e.mu.Unlock()
|
defer e.RUnlock()
|
||||||
|
|
||||||
t := topic
|
t := topic
|
||||||
// Some of our topics for the socket support passing a more specific namespace,
|
// Some of our topics for the socket support passing a more specific namespace,
|
||||||
|
@ -79,8 +80,8 @@ func (e *EventBus) PublishJson(topic string, data interface{}) error {
|
||||||
|
|
||||||
// Subscribe to an emitter topic using a channel.
|
// Subscribe to an emitter topic using a channel.
|
||||||
func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
||||||
e.mu.Lock()
|
e.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
if p, ok := e.subscribers[topic]; ok {
|
if p, ok := e.subscribers[topic]; ok {
|
||||||
e.subscribers[topic] = append(p, ch)
|
e.subscribers[topic] = append(p, ch)
|
||||||
|
@ -91,8 +92,8 @@ func (e *EventBus) Subscribe(topic string, ch chan Event) {
|
||||||
|
|
||||||
// Unsubscribe a channel from a topic.
|
// Unsubscribe a channel from a topic.
|
||||||
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
||||||
e.mu.Lock()
|
e.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.Unlock()
|
||||||
|
|
||||||
if _, ok := e.subscribers[topic]; ok {
|
if _, ok := e.subscribers[topic]; ok {
|
||||||
for i := range e.subscribers[topic] {
|
for i := range e.subscribers[topic] {
|
||||||
|
@ -102,3 +103,18 @@ func (e *EventBus) Unsubscribe(topic string, ch chan Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Removes all of the event listeners for the server. This is used when a server
|
||||||
|
// is being deleted to avoid a bunch of de-reference errors cropping up. Obviously
|
||||||
|
// should also check elsewhere and handle a server reference going nil, but this
|
||||||
|
// won't hurt.
|
||||||
|
func (e *EventBus) UnsubscribeAll() {
|
||||||
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
|
|
||||||
|
// Loop over all of the subscribers and just remove all of the events
|
||||||
|
// for them.
|
||||||
|
for t := range e.subscribers {
|
||||||
|
e.subscribers[t] = make([]chan Event, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user