diff --git a/environment/docker/container.go b/environment/docker/container.go index 32d6df1..74ee7b7 100644 --- a/environment/docker/container.go +++ b/environment/docker/container.go @@ -342,10 +342,10 @@ func (e *Environment) followOutput() error { func (e *Environment) scanOutput(reader io.ReadCloser) { defer reader.Close() - events := e.Events() - - if err := system.ScanReader(reader, func(line string) { - events.Publish(environment.ConsoleOutputEvent, line) + if err := system.ScanReader(reader, func(v []byte) { + e.logCallbackMx.Lock() + defer e.logCallbackMx.Unlock() + e.logCallback(v) }); err != nil && err != io.EOF { log.WithField("error", err).WithField("container_id", e.Id).Warn("error processing scanner line in console output") return diff --git a/environment/docker/environment.go b/environment/docker/environment.go index eac9eb3..40bc571 100644 --- a/environment/docker/environment.go +++ b/environment/docker/environment.go @@ -49,7 +49,10 @@ type Environment struct { // Holds the stats stream used by the polling commands so that we can easily close it out. stats io.ReadCloser - emitter *events.EventBus + emitter *events.Bus + + logCallbackMx sync.Mutex + logCallback func([]byte) // Tracks the environment state. st *system.AtomicString @@ -100,9 +103,9 @@ func (e *Environment) IsAttached() bool { return e.stream != nil } -func (e *Environment) Events() *events.EventBus { +func (e *Environment) Events() *events.Bus { e.eventMu.Do(func() { - e.emitter = events.New() + e.emitter = events.NewBus() }) return e.emitter @@ -214,3 +217,10 @@ func (e *Environment) SetState(state string) { e.Events().Publish(environment.StateChangeEvent, state) } } + +func (e *Environment) SetLogCallback(f func([]byte)) { + e.logCallbackMx.Lock() + defer e.logCallbackMx.Unlock() + + e.logCallback = f +} diff --git a/environment/docker/stats.go b/environment/docker/stats.go index cab9479..3e14e32 100644 --- a/environment/docker/stats.go +++ b/environment/docker/stats.go @@ -90,11 +90,7 @@ func (e *Environment) pollResources(ctx context.Context) error { st.Network.TxBytes += nw.TxBytes } - if b, err := json.Marshal(st); err != nil { - e.log().WithField("error", err).Warn("error while marshaling stats object for environment") - } else { - e.Events().Publish(environment.ResourceEvent, string(b)) - } + e.Events().Publish(environment.ResourceEvent, st) } } } diff --git a/environment/environment.go b/environment/environment.go index ead4d25..da21269 100644 --- a/environment/environment.go +++ b/environment/environment.go @@ -8,7 +8,6 @@ import ( ) const ( - ConsoleOutputEvent = "console output" StateChangeEvent = "state change" ResourceEvent = "resources" DockerImagePullStarted = "docker image pull started" @@ -35,7 +34,7 @@ type ProcessEnvironment interface { // Returns an event emitter instance that can be hooked into to listen for different // events that are fired by the environment. This should not allow someone to publish // events, only subscribe to them. - Events() *events.EventBus + Events() *events.Bus // Determines if the server instance exists. For example, in a docker environment // this should confirm that the container is created and in a bootable state. In @@ -108,4 +107,7 @@ type ProcessEnvironment interface { // Uptime returns the current environment uptime in milliseconds. This is // the time that has passed since it was last started. Uptime(ctx context.Context) (int64, error) + + // SetLogCallback sets the callback that the container's log output will be passed to. + SetLogCallback(func([]byte)) } diff --git a/events/events.go b/events/events.go index f26fcb3..3e7f699 100644 --- a/events/events.go +++ b/events/events.go @@ -1,32 +1,79 @@ package events import ( - "encoding/json" "strings" "sync" - - "github.com/gammazero/workerpool" ) +type Listener chan Event + +// Event represents an Event sent over a Bus. type Event struct { - Data string Topic string + Data interface{} } -type EventBus struct { - mu sync.RWMutex - pools map[string]*CallbackPool +// Bus represents an Event Bus. +type Bus struct { + listenersMx sync.Mutex + listeners map[string][]Listener } -func New() *EventBus { - return &EventBus{ - pools: make(map[string]*CallbackPool), +// NewBus returns a new empty Event Bus. +func NewBus() *Bus { + return &Bus{ + listeners: make(map[string][]Listener), } } -// Publish data to a given topic. -func (e *EventBus) Publish(topic string, data string) { - t := topic +// Off unregisters a listener from the specified topics on the Bus. +func (b *Bus) Off(listener Listener, topics ...string) { + b.listenersMx.Lock() + defer b.listenersMx.Unlock() + + for _, topic := range topics { + b.off(topic, listener) + } +} + +func (b *Bus) off(topic string, listener Listener) bool { + listeners, ok := b.listeners[topic] + if !ok { + return false + } + for i, l := range listeners { + if l != listener { + continue + } + + listeners = append(listeners[:i], listeners[i+1:]...) + b.listeners[topic] = listeners + return true + } + return false +} + +// On registers a listener to the specified topics on the Bus. +func (b *Bus) On(listener Listener, topics ...string) { + b.listenersMx.Lock() + defer b.listenersMx.Unlock() + + for _, topic := range topics { + b.on(topic, listener) + } +} + +func (b *Bus) on(topic string, listener Listener) { + listeners, ok := b.listeners[topic] + if !ok { + b.listeners[topic] = []Listener{listener} + } else { + b.listeners[topic] = append(listeners, listener) + } +} + +// Publish publishes a message to the Bus. +func (b *Bus) Publish(topic string, data interface{}) { // Some of our topics for the socket support passing a more specific namespace, // such as "backup completed:1234" to indicate which specific backup was completed. // @@ -36,87 +83,44 @@ func (e *EventBus) Publish(topic string, data string) { parts := strings.SplitN(topic, ":", 2) if len(parts) == 2 { - t = parts[0] + topic = parts[0] } } - e.mu.RLock() - defer e.mu.RUnlock() + b.listenersMx.Lock() + defer b.listenersMx.Unlock() - // Acquire a read lock and loop over all the channels registered for the topic. This - // avoids a panic crash if the process tries to unregister the channel while this routine - // is running. - if cp, ok := e.pools[t]; ok { - for _, callback := range cp.callbacks { - c := *callback - evt := Event{Data: data, Topic: topic} - // Using the workerpool with one worker allows us to execute events in a FIFO manner. Running - // this using goroutines would cause things such as console output to just output in random order - // if more than one event is fired at the same time. - // - // However, the pool submission does not block the execution of this function itself, allowing - // us to call publish without blocking any of the other pathways. - // - // @see https://github.com/pterodactyl/panel/issues/2303 - cp.pool.Submit(func() { - c(evt) - }) - } + listeners, ok := b.listeners[topic] + if !ok { + return } -} - -// PublishJson publishes a JSON message to a given topic. -func (e *EventBus) PublishJson(topic string, data interface{}) error { - b, err := json.Marshal(data) - if err != nil { - return err + if len(listeners) < 1 { + return } - e.Publish(topic, string(b)) - - return nil + var wg sync.WaitGroup + event := Event{Topic: topic, Data: data} + for _, listener := range listeners { + l := listener + wg.Add(1) + go func(l Listener, event Event) { + defer wg.Done() + l <- event + }(l, event) + } + wg.Wait() } -// On adds a callback function that will be executed each time one of the events using the topic -// name is called. -func (e *EventBus) On(topic string, callback *func(Event)) { - e.mu.Lock() - defer e.mu.Unlock() +// Destroy destroys the Event Bus by unregistering and closing all listeners. +func (b *Bus) Destroy() { + b.listenersMx.Lock() + defer b.listenersMx.Unlock() - // Check if this topic has been registered at least once for the event listener, and if - // not create an empty struct for the topic. - if _, exists := e.pools[topic]; !exists { - e.pools[topic] = &CallbackPool{ - callbacks: make([]*func(Event), 0), - pool: workerpool.New(1), + for _, listeners := range b.listeners { + for _, listener := range listeners { + close(listener) } } - // If this callback is not already registered as an event listener, go ahead and append - // it to the array of callbacks for this topic. - e.pools[topic].Add(callback) -} - -// Off removes an event listener from the bus. -func (e *EventBus) Off(topic string, callback *func(Event)) { - e.mu.Lock() - defer e.mu.Unlock() - - if cp, ok := e.pools[topic]; ok { - cp.Remove(callback) - } -} - -// Destroy removes all the event listeners that have been registered for any topic. Also stops the worker -// pool to close that routine. -func (e *EventBus) Destroy() { - e.mu.Lock() - defer e.mu.Unlock() - - // Stop every pool that exists for a given callback topic. - for _, cp := range e.pools { - cp.pool.Stop() - } - - e.pools = make(map[string]*CallbackPool) + b.listeners = make(map[string][]Listener) } diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 0000000..91e6fea --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,180 @@ +package events + +import ( + "testing" + "time" + + . "github.com/franela/goblin" +) + +func TestNewBus(t *testing.T) { + g := Goblin(t) + bus := NewBus() + + g.Describe("NewBus", func() { + g.It("is not nil", func() { + g.Assert(bus).IsNotNil("Bus expected to not be nil") + g.Assert(bus.listeners).IsNotNil("Bus#listeners expected to not be nil") + }) + }) +} + +func TestBus_Off(t *testing.T) { + g := Goblin(t) + + const topic = "test" + + g.Describe("Off", func() { + g.It("unregisters listener", func() { + bus := NewBus() + + g.Assert(bus.listeners[topic]).IsNotNil() + g.Assert(len(bus.listeners[topic])).IsZero() + listener := make(chan Event) + bus.On(listener, topic) + g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") + + bus.Off(listener, topic) + g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners") + + close(listener) + }) + + g.It("unregisters correct listener", func() { + bus := NewBus() + + listener := make(chan Event) + listener2 := make(chan Event) + listener3 := make(chan Event) + bus.On(listener, topic) + bus.On(listener2, topic) + bus.On(listener3, topic) + g.Assert(len(bus.listeners[topic])).Equal(3, "Listeners were not registered") + + bus.Off(listener, topic) + bus.Off(listener3, topic) + g.Assert(len(bus.listeners[topic])).Equal(1, "Expected 1 listener to remain") + + if bus.listeners[topic][0] != listener2 { + // A normal Assert does not properly compare channels. + g.Fail("wrong listener unregistered") + } + + // Cleanup + bus.Off(listener2, topic) + close(listener) + close(listener2) + close(listener3) + }) + }) +} + +func TestBus_On(t *testing.T) { + g := Goblin(t) + + const topic = "test" + + g.Describe("On", func() { + g.It("registers listener", func() { + bus := NewBus() + + g.Assert(bus.listeners[topic]).IsNotNil() + g.Assert(len(bus.listeners[topic])).IsZero() + listener := make(chan Event) + bus.On(listener, topic) + g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") + + if bus.listeners[topic][0] != listener { + // A normal Assert does not properly compare channels. + g.Fail("wrong listener registered") + } + + // Cleanup + bus.Off(listener, topic) + close(listener) + }) + }) +} + +func TestBus_Publish(t *testing.T) { + g := Goblin(t) + + const topic = "test" + const message = "this is a test message!" + + g.Describe("Publish", func() { + g.It("publishes message", func() { + bus := NewBus() + + g.Assert(bus.listeners[topic]).IsNotNil() + g.Assert(len(bus.listeners[topic])).IsZero() + listener := make(chan Event) + bus.On(listener, topic) + g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered") + + done := make(chan struct{}, 1) + go func() { + select { + case m := <-listener: + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case <-time.After(1 * time.Second): + g.Fail("listener did not receive message in time") + } + done <- struct{}{} + }() + bus.Publish(topic, message) + <-done + + // Cleanup + close(listener) + bus.Off(listener, topic) + }) + + g.It("publishes message to all listeners", func() { + bus := NewBus() + + g.Assert(bus.listeners[topic]).IsNotNil() + g.Assert(len(bus.listeners[topic])).IsZero() + listener := make(chan Event) + listener2 := make(chan Event) + listener3 := make(chan Event) + bus.On(listener, topic) + bus.On(listener2, topic) + bus.On(listener3, topic) + g.Assert(len(bus.listeners[topic])).Equal(3, "Listener was not registered") + + done := make(chan struct{}, 1) + go func() { + for i := 0; i < 3; i++ { + select { + case m := <-listener: + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case m := <-listener2: + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case m := <-listener3: + g.Assert(m.Topic).Equal(topic) + g.Assert(m.Data).Equal(message) + case <-time.After(1 * time.Second): + g.Fail("all listeners did not receive the message in time") + i = 3 + } + } + + done <- struct{}{} + }() + bus.Publish(topic, message) + <-done + + // Cleanup + bus.Off(listener, topic) + bus.Off(listener2, topic) + bus.Off(listener3, topic) + close(listener) + close(listener2) + close(listener3) + }) + }) +} diff --git a/events/pool.go b/events/pool.go deleted file mode 100644 index fd291e6..0000000 --- a/events/pool.go +++ /dev/null @@ -1,50 +0,0 @@ -package events - -import ( - "reflect" - - "github.com/gammazero/workerpool" -) - -type CallbackPool struct { - callbacks []*func(Event) - pool *workerpool.WorkerPool -} - -// Pushes a new callback into the array of listeners for the pool. -func (cp *CallbackPool) Add(callback *func(Event)) { - if cp.index(reflect.ValueOf(callback)) < 0 { - cp.callbacks = append(cp.callbacks, callback) - } -} - -// Removes a callback from the array of registered callbacks if it exists. -func (cp *CallbackPool) Remove(callback *func(Event)) { - i := cp.index(reflect.ValueOf(callback)) - - // If i < 0 it means there was no index found for the given callback, meaning it was - // never registered or was already unregistered from the listeners. Also double check - // that we didn't somehow escape the length of the topic callback (not sure how that - // would happen, but lets avoid a panic condition). - if i < 0 || i >= len(cp.callbacks) { - return - } - - // We can assume that the topic still exists at this point since we acquire an exclusive - // lock on the process, and the "e.index" function cannot return a value >= 0 if there is - // no topic already existing. - cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...) -} - -// Finds the index of a given callback in the topic by comparing all of the registered callback -// pointers to the passed function. This function does not aquire a lock as it should only be called -// within the confines of a function that has already acquired a lock for the duration of the lookup. -func (cp *CallbackPool) index(v reflect.Value) int { - for i, handler := range cp.callbacks { - if reflect.ValueOf(handler).Pointer() == v.Pointer() { - return i - } - } - - return -1 -} diff --git a/router/router_server.go b/router/router_server.go index 31c93b7..c5b7b8d 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -188,6 +188,8 @@ func deleteServer(c *gin.Context) { // as well. s.CtxCancel() s.Events().Destroy() + s.LogSink().Destroy() + s.InstallSink().Destroy() s.Websockets().CancelAll() // Remove any pending remote file downloads for the server. diff --git a/router/websocket/listeners.go b/router/websocket/listeners.go index 656dbee..3fa90ef 100644 --- a/router/websocket/listeners.go +++ b/router/websocket/listeners.go @@ -2,6 +2,7 @@ package websocket import ( "context" + "encoding/json" "sync" "time" @@ -53,9 +54,9 @@ func (h *Handler) listenForExpiration(ctx context.Context) { jwt := h.GetJwt() if jwt != nil { if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 0 { - _ = h.SendJson(&Message{Event: TokenExpiredEvent}) + _ = h.SendJson(Message{Event: TokenExpiredEvent}) } else if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 60 { - _ = h.SendJson(&Message{Event: TokenExpiringEvent}) + _ = h.SendJson(Message{Event: TokenExpiringEvent}) } } } @@ -79,38 +80,79 @@ var e = []string{ // ListenForServerEvents will listen for different events happening on a server // and send them along to the connected websocket client. This function will // block until the context provided to it is canceled. -func (h *Handler) listenForServerEvents(pctx context.Context) error { +func (h *Handler) listenForServerEvents(ctx context.Context) error { var o sync.Once var err error - ctx, cancel := context.WithCancel(pctx) - callback := func(e events.Event) { - if sendErr := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); sendErr != nil { - h.Logger().WithField("event", e.Topic).WithField("error", sendErr).Error("failed to send event over server websocket") - // Avoid race conditions by only setting the error once and then canceling - // the context. This way if additional processing errors come through due - // to a massive flood of things you still only report and stop at the first. - o.Do(func() { - err = sendErr - cancel() - }) - } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + eventChan := make(chan events.Event) + logOutput := make(chan []byte) + installOutput := make(chan []byte) + h.server.Events().On(eventChan, e...) + h.server.LogSink().On(logOutput) + h.server.InstallSink().On(installOutput) + + onError := func(evt string, err2 error) { + h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket") + // Avoid race conditions by only setting the error once and then canceling + // the context. This way if additional processing errors come through due + // to a massive flood of things you still only report and stop at the first. + o.Do(func() { + err = err2 + }) + cancel() } - // Subscribe to all of the events with the same callback that will push the - // data out over the websocket for the server. - for _, evt := range e { - h.server.Events().On(evt, &callback) + for { + select { + case <-ctx.Done(): + break + case e := <-logOutput: + sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}}) + if sendErr == nil { + continue + } + onError(server.ConsoleOutputEvent, sendErr) + case e := <-installOutput: + sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}}) + if sendErr == nil { + continue + } + onError(server.InstallOutputEvent, sendErr) + case e := <-eventChan: + var sendErr error + message := Message{Event: e.Topic} + if str, ok := e.Data.(string); ok { + message.Args = []string{str} + } else if b, ok := e.Data.([]byte); ok { + message.Args = []string{string(b)} + } else { + b, sendErr = json.Marshal(e.Data) + if sendErr == nil { + message.Args = []string{string(b)} + } + } + + if sendErr == nil { + sendErr = h.SendJson(message) + if sendErr == nil { + continue + } + } + onError(message.Event, sendErr) + } + break } - // When this function returns de-register all of the event listeners. - defer func() { - for _, evt := range e { - h.server.Events().Off(evt, &callback) - } - }() + h.server.Events().Off(eventChan, e...) + h.server.InstallSink().Off(logOutput) + h.server.InstallSink().Off(installOutput) + close(eventChan) + close(logOutput) + close(installOutput) - <-ctx.Done() // If the internal context is stopped it is either because the parent context // got canceled or because we ran into an error. If the "err" variable is nil // we can assume the parent was canceled and need not perform any actions. diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index e9a4a63..92548ef 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -122,18 +122,17 @@ func (h *Handler) Logger() *log.Entry { WithField("server", h.server.ID()) } -func (h *Handler) SendJson(v *Message) error { +func (h *Handler) SendJson(v Message) error { // Do not send JSON down the line if the JWT on the connection is not valid! if err := h.TokenValid(); err != nil { - h.unsafeSendJson(Message{ + _ = h.unsafeSendJson(Message{ Event: JwtErrorEvent, Args: []string{err.Error()}, }) return nil } - j := h.GetJwt() - if j != nil { + if j := h.GetJwt(); j != nil { // If we're sending installation output but the user does not have the required // permissions to see the output, don't send it down the line. if v.Event == server.InstallOutputEvent { @@ -297,7 +296,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { h.setJwt(token) // Tell the client they authenticated successfully. - h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent}) + _ = h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent}) // Check if the client was refreshing their authentication token // instead of authenticating for the first time. @@ -315,7 +314,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { // On every authentication event, send the current server status back // to the client. :) state := h.server.Environment.State() - h.SendJson(&Message{ + _ = h.SendJson(Message{ Event: server.StatusEvent, Args: []string{state}, }) @@ -327,7 +326,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { _ = h.server.Filesystem().HasSpaceAvailable(false) b, _ := json.Marshal(h.server.Proc()) - h.SendJson(&Message{ + _ = h.SendJson(Message{ Event: server.StatsEvent, Args: []string{string(b)}, }) @@ -357,7 +356,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { if errors.Is(err, context.DeadlineExceeded) { m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later") - h.SendJson(&Message{ + _ = h.SendJson(Message{ Event: ErrorEvent, Args: []string{m}, }) @@ -381,7 +380,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { } for _, line := range logs { - h.SendJson(&Message{ + _ = h.SendJson(Message{ Event: server.ConsoleOutputEvent, Args: []string{line}, }) @@ -392,7 +391,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error { case SendStatsEvent: { b, _ := json.Marshal(h.server.Proc()) - h.SendJson(&Message{ + _ = h.SendJson(Message{ Event: server.StatsEvent, Args: []string{string(b)}, }) diff --git a/server/backup.go b/server/backup.go index a35ccda..b7d5cf6 100644 --- a/server/backup.go +++ b/server/backup.go @@ -79,7 +79,7 @@ func (s *Server) Backup(b backup.BackupInterface) error { s.Log().WithField("backup", b.Identifier()).Info("notified panel of failed backup state") } - _ = s.Events().PublishJson(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ + s.Events().Publish(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ "uuid": b.Identifier(), "is_successful": false, "checksum": "", @@ -103,7 +103,7 @@ func (s *Server) Backup(b backup.BackupInterface) error { // Emit an event over the socket so we can update the backup in realtime on // the frontend for the server. - _ = s.Events().PublishJson(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ + s.Events().Publish(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ "uuid": b.Identifier(), "is_successful": true, "checksum": ad.Checksum, diff --git a/server/events.go b/server/events.go index 36a9de8..f83c17b 100644 --- a/server/events.go +++ b/server/events.go @@ -21,12 +21,12 @@ const ( ) // Returns the server's emitter instance. -func (s *Server) Events() *events.EventBus { +func (s *Server) Events() *events.Bus { s.emitterLock.Lock() defer s.emitterLock.Unlock() if s.emitter == nil { - s.emitter = events.New() + s.emitter = events.NewBus() } return s.emitter diff --git a/server/install.go b/server/install.go index b450609..42c3bcf 100644 --- a/server/install.go +++ b/server/install.go @@ -521,10 +521,7 @@ func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) erro } defer reader.Close() - evts := ip.Server.Events() - err = system.ScanReader(reader, func(line string) { - evts.Publish(InstallOutputEvent, line) - }) + err = system.ScanReader(reader, ip.Server.InstallSink().Push) if err != nil { ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") } diff --git a/server/listeners.go b/server/listeners.go index 0847f3f..a8077a9 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -1,7 +1,6 @@ package server import ( - "encoding/json" "regexp" "strconv" "sync" @@ -51,98 +50,103 @@ func (dsl *diskSpaceLimiter) Trigger() { }) } +func (s *Server) processConsoleOutputEvent(v []byte) { + t := s.Throttler() + err := t.Increment(func() { + s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.") + }) + // An error is only returned if the server has breached the thresholds set. + if err != nil { + // If the process is already stopping, just let it continue with that action rather than attempting + // to terminate again. + if s.Environment.State() != environment.ProcessStoppingState { + s.Environment.SetState(environment.ProcessStoppingState) + + go func() { + s.Log().Warn("stopping server instance, violating throttle limits") + s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.") + + // Completely skip over server power actions and terminate the running instance. This gives the + // server 15 seconds to finish stopping gracefully before it is forcefully terminated. + if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil { + // If there is an error set the process back to running so that this throttler is called + // again and hopefully kills the server. + if s.Environment.State() != environment.ProcessOfflineState { + s.Environment.SetState(environment.ProcessRunningState) + } + + s.Log().WithField("error", err).Error("failed to terminate environment after triggering throttle") + } + }() + } + } + + // If we are not throttled, go ahead and output the data. + if !t.Throttled() { + s.LogSink().Push(v) + } + + // Also pass the data along to the console output channel. + s.onConsoleOutput(string(v)) +} + // StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be // removed by deleting the server as they should last for the duration of the process' lifetime. func (s *Server) StartEventListeners() { - console := func(e events.Event) { - t := s.Throttler() - err := t.Increment(func() { - s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.") - }) - // An error is only returned if the server has breached the thresholds set. - if err != nil { - // If the process is already stopping, just let it continue with that action rather than attempting - // to terminate again. - if s.Environment.State() != environment.ProcessStoppingState { - s.Environment.SetState(environment.ProcessStoppingState) + state := make(chan events.Event) + stats := make(chan events.Event) + docker := make(chan events.Event) + go func() { + l := newDiskLimiter(s) + + for { + select { + case e := <-state: go func() { - s.Log().Warn("stopping server instance, violating throttle limits") - s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.") + // Reset the throttler when the process is started. + if e.Data == environment.ProcessStartingState { + l.Reset() + s.Throttler().Reset() + } - // Completely skip over server power actions and terminate the running instance. This gives the - // server 15 seconds to finish stopping gracefully before it is forcefully terminated. - if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil { - // If there is an error set the process back to running so that this throttler is called - // again and hopefully kills the server. - if s.Environment.State() != environment.ProcessOfflineState { - s.Environment.SetState(environment.ProcessRunningState) - } + s.OnStateChange() + }() + case e := <-stats: + go func() { + // Update the server resource tracking object with the resources we got here. + s.resources.mu.Lock() + s.resources.Stats = e.Data.(environment.Stats) + s.resources.mu.Unlock() - s.Log().WithField("error", err).Error("failed to terminate environment after triggering throttle") + // If there is no disk space available at this point, trigger the server disk limiter logic + // which will start to stop the running instance. + if !s.Filesystem().HasSpaceAvailable(true) { + l.Trigger() + } + + s.emitProcUsage() + }() + case e := <-docker: + go func() { + switch e.Topic { + case environment.DockerImagePullStatus: + s.Events().Publish(InstallOutputEvent, e.Data) + case environment.DockerImagePullStarted: + s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...") + default: + s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image") } }() } } - - // If we are not throttled, go ahead and output the data. - if !t.Throttled() { - s.Events().Publish(ConsoleOutputEvent, e.Data) - } - - // Also pass the data along to the console output channel. - s.onConsoleOutput(e.Data) - } - - l := newDiskLimiter(s) - state := func(e events.Event) { - // Reset the throttler when the process is started. - if e.Data == environment.ProcessStartingState { - l.Reset() - s.Throttler().Reset() - } - - s.OnStateChange() - } - - stats := func(e events.Event) { - var st environment.Stats - if err := json.Unmarshal([]byte(e.Data), &st); err != nil { - s.Log().WithField("error", err).Warn("failed to unmarshal server environment stats") - return - } - - // Update the server resource tracking object with the resources we got here. - s.resources.mu.Lock() - s.resources.Stats = st - s.resources.mu.Unlock() - - // If there is no disk space available at this point, trigger the server disk limiter logic - // which will start to stop the running instance. - if !s.Filesystem().HasSpaceAvailable(true) { - l.Trigger() - } - - s.emitProcUsage() - } - - docker := func(e events.Event) { - if e.Topic == environment.DockerImagePullStatus { - s.Events().Publish(InstallOutputEvent, e.Data) - } else if e.Topic == environment.DockerImagePullStarted { - s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...") - } else { - s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image") - } - } + }() s.Log().Debug("registering event listeners: console, state, resources...") - s.Environment.Events().On(environment.ConsoleOutputEvent, &console) - s.Environment.Events().On(environment.StateChangeEvent, &state) - s.Environment.Events().On(environment.ResourceEvent, &stats) - for _, evt := range dockerEvents { - s.Environment.Events().On(evt, &docker) - } + s.Environment.SetLogCallback(s.processConsoleOutputEvent) + s.Environment.Events().On(state, environment.StateChangeEvent) + s.Environment.Events().On(stats, environment.ResourceEvent) + s.Environment.Events().On(docker, dockerEvents...) } var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") diff --git a/server/resources.go b/server/resources.go index c21723f..81c2c87 100644 --- a/server/resources.go +++ b/server/resources.go @@ -52,7 +52,5 @@ func (ru *ResourceUsage) Reset() { } func (s *Server) emitProcUsage() { - if err := s.Events().PublishJson(StatsEvent, s.Proc()); err != nil { - s.Log().WithField("error", err).Warn("error while emitting server resource usage to listeners") - } + s.Events().Publish(StatsEvent, s.Proc()) } diff --git a/server/server.go b/server/server.go index 76c82aa..3b50684 100644 --- a/server/server.go +++ b/server/server.go @@ -49,7 +49,7 @@ type Server struct { fs *filesystem.Filesystem // Events emitted by the server instance. - emitter *events.EventBus + emitter *events.Bus // Defines the process configuration for the server instance. This is dynamically // fetched from the Pterodactyl Server instance each time the server process is @@ -70,6 +70,9 @@ type Server struct { // Tracks open websocket connections for the server. wsBag *WebsocketBag wsBagLocker sync.Mutex + + logSink *sinkPool + installSink *sinkPool } // New returns a new server instance with a context and all of the default @@ -83,6 +86,9 @@ func New(client remote.Client) (*Server, error) { installing: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false), + + logSink: newSinkPool(), + installSink: newSinkPool(), } if err := defaults.Set(&s); err != nil { return nil, errors.Wrap(err, "server: could not set default values for struct") @@ -349,3 +355,11 @@ func (s *Server) ToAPIResponse() APIResponse { Configuration: *s.Config(), } } + +func (s *Server) LogSink() *sinkPool { + return s.logSink +} + +func (s *Server) InstallSink() *sinkPool { + return s.installSink +} diff --git a/server/sink.go b/server/sink.go new file mode 100644 index 0000000..c6f2b20 --- /dev/null +++ b/server/sink.go @@ -0,0 +1,71 @@ +package server + +import ( + "sync" + "time" +) + +// sinkPool represents a pool with sinks. +type sinkPool struct { + mx sync.RWMutex + sinks []chan []byte +} + +// newSinkPool returns a new empty sinkPool. +func newSinkPool() *sinkPool { + return &sinkPool{} +} + +// On adds a sink on the pool. +func (p *sinkPool) On(c chan []byte) { + p.mx.Lock() + defer p.mx.Unlock() + + p.sinks = append(p.sinks, c) +} + +// Off removes a sink from the pool. +func (p *sinkPool) Off(c chan []byte) { + p.mx.Lock() + defer p.mx.Unlock() + + sinks := p.sinks + + for i, sink := range sinks { + if c != sink { + continue + } + copy(sinks[i:], sinks[i+1:]) + sinks[len(sinks)-1] = nil + sinks = sinks[:len(sinks)-1] + p.sinks = sinks + return + } +} + +// Destroy destroys the pool by removing and closing all sinks. +func (p *sinkPool) Destroy() { + p.mx.Lock() + defer p.mx.Unlock() + + for _, c := range p.sinks { + close(c) + } + + p.sinks = nil +} + +// Push pushes a message to all registered sinks. +func (p *sinkPool) Push(v []byte) { + p.mx.RLock() + for _, c := range p.sinks { + // TODO: should this be done in parallel? + select { + // Send the log output to the channel + case c <- v: + // Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled. + case <-time.After(100 * time.Millisecond): + } + } + p.mx.RUnlock() +} diff --git a/system/utils.go b/system/utils.go index 785b505..999f038 100644 --- a/system/utils.go +++ b/system/utils.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "strconv" - "strings" "sync" "time" @@ -38,14 +37,14 @@ func MustInt(v string) int { return i } -func ScanReader(r io.Reader, callback func(line string)) error { +func ScanReader(r io.Reader, callback func(line []byte)) error { br := bufio.NewReader(r) // Avoid constantly re-allocating memory when we're flooding lines through this // function by using the same buffer for the duration of the call and just truncating // the value back to 0 every loop. - var str strings.Builder + buf := &bytes.Buffer{} for { - str.Reset() + buf.Reset() var err error var line []byte var isPrefix bool @@ -57,7 +56,7 @@ func ScanReader(r io.Reader, callback func(line string)) error { // in line with that it thinks is the terminal size. Those returns break a lot of output handling, // so we'll just replace them with proper new-lines and then split it later and send each line as // its own event in the response. - str.Write(bytes.Replace(line, cr, crr, -1)) + buf.Write(bytes.Replace(line, cr, crr, -1)) // Finish this loop and begin outputting the line if there is no prefix (the line fit into // the default buffer), or if we hit the end of the line. if !isPrefix || err == io.EOF { @@ -71,8 +70,9 @@ func ScanReader(r io.Reader, callback func(line string)) error { } // Publish the line for this loop. Break on new-line characters so every line is sent as a single // output event, otherwise you get funky handling in the browser console. - for _, line := range strings.Split(str.String(), "\r\n") { - callback(line) + s := bufio.NewScanner(buf) + for s.Scan() { + callback(s.Bytes()) } // If the error we got previously that lead to the line being output is an io.EOF we want to // exit the entire looping process.