package server import ( "encoding/json" "strings" "sync" ) // Defines all of the possible output events for a server. // noinspection GoNameStartsWithPackageName const ( DaemonMessageEvent = "daemon message" InstallOutputEvent = "install output" InstallStartedEvent = "install started" InstallCompletedEvent = "install completed" ConsoleOutputEvent = "console output" StatusEvent = "status" StatsEvent = "stats" BackupCompletedEvent = "backup completed" ) type Event struct { Data string Topic string } type EventBus struct { sync.RWMutex subscribers map[string]map[chan Event]struct{} } // Returns the server's emitter instance. func (s *Server) Events() *EventBus { s.emitterLock.Lock() defer s.emitterLock.Unlock() if s.emitter == nil { s.emitter = &EventBus{ subscribers: make(map[string]map[chan Event]struct{}), } } return s.emitter } // Publish data to a given topic. func (e *EventBus) Publish(topic string, data string) { t := topic // Some of our topics for the socket support passing a more specific namespace, // such as "backup completed:1234" to indicate which specific backup was completed. // // In these cases, we still need to the send the event using the standard listener // name of "backup completed". if strings.Contains(topic, ":") { parts := strings.SplitN(topic, ":", 2) if len(parts) == 2 { t = parts[0] } } // Acquire a read lock and loop over all of the channels registered for the topic. This // avoids a panic crash if the process tries to unregister the channel while this routine // is running. go func() { e.RLock() defer e.RUnlock() if ch, ok := e.subscribers[t]; ok { for channel := range ch { channel <- Event{Data: data, Topic: topic} } } }() } func (e *EventBus) PublishJson(topic string, data interface{}) error { b, err := json.Marshal(data) if err != nil { return err } e.Publish(topic, string(b)) return nil } // Subscribe to an emitter topic using a channel. func (e *EventBus) Subscribe(topic string, ch chan Event) { e.Lock() defer e.Unlock() if _, exists := e.subscribers[topic]; !exists { e.subscribers[topic] = make(map[chan Event]struct{}) } // Only set the channel if there is not currently a matching one for this topic. This // avoids registering two identical listeners for the same topic and causing pain in // the unsubscribe functionality as well. if _, exists := e.subscribers[topic][ch]; !exists { e.subscribers[topic][ch] = struct{}{} } } // Unsubscribe a channel from a given topic. func (e *EventBus) Unsubscribe(topic string, ch chan Event) { e.Lock() defer e.Unlock() if _, exists := e.subscribers[topic][ch]; exists { delete(e.subscribers[topic], ch) } } // Removes all of the event listeners for the server. This is used when a server // is being deleted to avoid a bunch of de-reference errors cropping up. Obviously // should also check elsewhere and handle a server reference going nil, but this // won't hurt. func (e *EventBus) UnsubscribeAll() { e.Lock() defer e.Unlock() // Reset the entire struct into an empty map. e.subscribers = make(map[string]map[chan Event]struct{}) }