Server Event Optimizations (#116)

This commit is contained in:
Matthew Penner 2022-01-17 20:23:29 -07:00 committed by GitHub
parent 521cc2aef2
commit 649dc9663e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 551 additions and 282 deletions

View File

@ -342,10 +342,10 @@ func (e *Environment) followOutput() error {
func (e *Environment) scanOutput(reader io.ReadCloser) { func (e *Environment) scanOutput(reader io.ReadCloser) {
defer reader.Close() defer reader.Close()
events := e.Events() if err := system.ScanReader(reader, func(v []byte) {
e.logCallbackMx.Lock()
if err := system.ScanReader(reader, func(line string) { defer e.logCallbackMx.Unlock()
events.Publish(environment.ConsoleOutputEvent, line) e.logCallback(v)
}); err != nil && err != io.EOF { }); err != nil && err != io.EOF {
log.WithField("error", err).WithField("container_id", e.Id).Warn("error processing scanner line in console output") log.WithField("error", err).WithField("container_id", e.Id).Warn("error processing scanner line in console output")
return return

View File

@ -49,7 +49,10 @@ type Environment struct {
// Holds the stats stream used by the polling commands so that we can easily close it out. // Holds the stats stream used by the polling commands so that we can easily close it out.
stats io.ReadCloser stats io.ReadCloser
emitter *events.EventBus emitter *events.Bus
logCallbackMx sync.Mutex
logCallback func([]byte)
// Tracks the environment state. // Tracks the environment state.
st *system.AtomicString st *system.AtomicString
@ -100,9 +103,9 @@ func (e *Environment) IsAttached() bool {
return e.stream != nil return e.stream != nil
} }
func (e *Environment) Events() *events.EventBus { func (e *Environment) Events() *events.Bus {
e.eventMu.Do(func() { e.eventMu.Do(func() {
e.emitter = events.New() e.emitter = events.NewBus()
}) })
return e.emitter return e.emitter
@ -214,3 +217,10 @@ func (e *Environment) SetState(state string) {
e.Events().Publish(environment.StateChangeEvent, state) e.Events().Publish(environment.StateChangeEvent, state)
} }
} }
func (e *Environment) SetLogCallback(f func([]byte)) {
e.logCallbackMx.Lock()
defer e.logCallbackMx.Unlock()
e.logCallback = f
}

View File

@ -90,11 +90,7 @@ func (e *Environment) pollResources(ctx context.Context) error {
st.Network.TxBytes += nw.TxBytes st.Network.TxBytes += nw.TxBytes
} }
if b, err := json.Marshal(st); err != nil { e.Events().Publish(environment.ResourceEvent, st)
e.log().WithField("error", err).Warn("error while marshaling stats object for environment")
} else {
e.Events().Publish(environment.ResourceEvent, string(b))
}
} }
} }
} }

View File

@ -8,7 +8,6 @@ import (
) )
const ( const (
ConsoleOutputEvent = "console output"
StateChangeEvent = "state change" StateChangeEvent = "state change"
ResourceEvent = "resources" ResourceEvent = "resources"
DockerImagePullStarted = "docker image pull started" DockerImagePullStarted = "docker image pull started"
@ -35,7 +34,7 @@ type ProcessEnvironment interface {
// Returns an event emitter instance that can be hooked into to listen for different // Returns an event emitter instance that can be hooked into to listen for different
// events that are fired by the environment. This should not allow someone to publish // events that are fired by the environment. This should not allow someone to publish
// events, only subscribe to them. // events, only subscribe to them.
Events() *events.EventBus Events() *events.Bus
// Determines if the server instance exists. For example, in a docker environment // Determines if the server instance exists. For example, in a docker environment
// this should confirm that the container is created and in a bootable state. In // this should confirm that the container is created and in a bootable state. In
@ -108,4 +107,7 @@ type ProcessEnvironment interface {
// Uptime returns the current environment uptime in milliseconds. This is // Uptime returns the current environment uptime in milliseconds. This is
// the time that has passed since it was last started. // the time that has passed since it was last started.
Uptime(ctx context.Context) (int64, error) Uptime(ctx context.Context) (int64, error)
// SetLogCallback sets the callback that the container's log output will be passed to.
SetLogCallback(func([]byte))
} }

View File

@ -1,32 +1,79 @@
package events package events
import ( import (
"encoding/json"
"strings" "strings"
"sync" "sync"
"github.com/gammazero/workerpool"
) )
type Listener chan Event
// Event represents an Event sent over a Bus.
type Event struct { type Event struct {
Data string
Topic string Topic string
Data interface{}
} }
type EventBus struct { // Bus represents an Event Bus.
mu sync.RWMutex type Bus struct {
pools map[string]*CallbackPool listenersMx sync.Mutex
listeners map[string][]Listener
} }
func New() *EventBus { // NewBus returns a new empty Event Bus.
return &EventBus{ func NewBus() *Bus {
pools: make(map[string]*CallbackPool), return &Bus{
listeners: make(map[string][]Listener),
} }
} }
// Publish data to a given topic. // Off unregisters a listener from the specified topics on the Bus.
func (e *EventBus) Publish(topic string, data string) { func (b *Bus) Off(listener Listener, topics ...string) {
t := topic b.listenersMx.Lock()
defer b.listenersMx.Unlock()
for _, topic := range topics {
b.off(topic, listener)
}
}
func (b *Bus) off(topic string, listener Listener) bool {
listeners, ok := b.listeners[topic]
if !ok {
return false
}
for i, l := range listeners {
if l != listener {
continue
}
listeners = append(listeners[:i], listeners[i+1:]...)
b.listeners[topic] = listeners
return true
}
return false
}
// On registers a listener to the specified topics on the Bus.
func (b *Bus) On(listener Listener, topics ...string) {
b.listenersMx.Lock()
defer b.listenersMx.Unlock()
for _, topic := range topics {
b.on(topic, listener)
}
}
func (b *Bus) on(topic string, listener Listener) {
listeners, ok := b.listeners[topic]
if !ok {
b.listeners[topic] = []Listener{listener}
} else {
b.listeners[topic] = append(listeners, listener)
}
}
// Publish publishes a message to the Bus.
func (b *Bus) Publish(topic string, data interface{}) {
// Some of our topics for the socket support passing a more specific namespace, // Some of our topics for the socket support passing a more specific namespace,
// such as "backup completed:1234" to indicate which specific backup was completed. // such as "backup completed:1234" to indicate which specific backup was completed.
// //
@ -36,87 +83,44 @@ func (e *EventBus) Publish(topic string, data string) {
parts := strings.SplitN(topic, ":", 2) parts := strings.SplitN(topic, ":", 2)
if len(parts) == 2 { if len(parts) == 2 {
t = parts[0] topic = parts[0]
} }
} }
e.mu.RLock() b.listenersMx.Lock()
defer e.mu.RUnlock() defer b.listenersMx.Unlock()
// Acquire a read lock and loop over all the channels registered for the topic. This listeners, ok := b.listeners[topic]
// avoids a panic crash if the process tries to unregister the channel while this routine if !ok {
// is running. return
if cp, ok := e.pools[t]; ok {
for _, callback := range cp.callbacks {
c := *callback
evt := Event{Data: data, Topic: topic}
// Using the workerpool with one worker allows us to execute events in a FIFO manner. Running
// this using goroutines would cause things such as console output to just output in random order
// if more than one event is fired at the same time.
//
// However, the pool submission does not block the execution of this function itself, allowing
// us to call publish without blocking any of the other pathways.
//
// @see https://github.com/pterodactyl/panel/issues/2303
cp.pool.Submit(func() {
c(evt)
})
}
} }
} if len(listeners) < 1 {
return
// PublishJson publishes a JSON message to a given topic.
func (e *EventBus) PublishJson(topic string, data interface{}) error {
b, err := json.Marshal(data)
if err != nil {
return err
} }
e.Publish(topic, string(b)) var wg sync.WaitGroup
event := Event{Topic: topic, Data: data}
return nil for _, listener := range listeners {
l := listener
wg.Add(1)
go func(l Listener, event Event) {
defer wg.Done()
l <- event
}(l, event)
}
wg.Wait()
} }
// On adds a callback function that will be executed each time one of the events using the topic // Destroy destroys the Event Bus by unregistering and closing all listeners.
// name is called. func (b *Bus) Destroy() {
func (e *EventBus) On(topic string, callback *func(Event)) { b.listenersMx.Lock()
e.mu.Lock() defer b.listenersMx.Unlock()
defer e.mu.Unlock()
// Check if this topic has been registered at least once for the event listener, and if for _, listeners := range b.listeners {
// not create an empty struct for the topic. for _, listener := range listeners {
if _, exists := e.pools[topic]; !exists { close(listener)
e.pools[topic] = &CallbackPool{
callbacks: make([]*func(Event), 0),
pool: workerpool.New(1),
} }
} }
// If this callback is not already registered as an event listener, go ahead and append b.listeners = make(map[string][]Listener)
// it to the array of callbacks for this topic.
e.pools[topic].Add(callback)
}
// Off removes an event listener from the bus.
func (e *EventBus) Off(topic string, callback *func(Event)) {
e.mu.Lock()
defer e.mu.Unlock()
if cp, ok := e.pools[topic]; ok {
cp.Remove(callback)
}
}
// Destroy removes all the event listeners that have been registered for any topic. Also stops the worker
// pool to close that routine.
func (e *EventBus) Destroy() {
e.mu.Lock()
defer e.mu.Unlock()
// Stop every pool that exists for a given callback topic.
for _, cp := range e.pools {
cp.pool.Stop()
}
e.pools = make(map[string]*CallbackPool)
} }

180
events/events_test.go Normal file
View File

@ -0,0 +1,180 @@
package events
import (
"testing"
"time"
. "github.com/franela/goblin"
)
func TestNewBus(t *testing.T) {
g := Goblin(t)
bus := NewBus()
g.Describe("NewBus", func() {
g.It("is not nil", func() {
g.Assert(bus).IsNotNil("Bus expected to not be nil")
g.Assert(bus.listeners).IsNotNil("Bus#listeners expected to not be nil")
})
})
}
func TestBus_Off(t *testing.T) {
g := Goblin(t)
const topic = "test"
g.Describe("Off", func() {
g.It("unregisters listener", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
bus.Off(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
close(listener)
})
g.It("unregisters correct listener", func() {
bus := NewBus()
listener := make(chan Event)
listener2 := make(chan Event)
listener3 := make(chan Event)
bus.On(listener, topic)
bus.On(listener2, topic)
bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listeners were not registered")
bus.Off(listener, topic)
bus.Off(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Expected 1 listener to remain")
if bus.listeners[topic][0] != listener2 {
// A normal Assert does not properly compare channels.
g.Fail("wrong listener unregistered")
}
// Cleanup
bus.Off(listener2, topic)
close(listener)
close(listener2)
close(listener3)
})
})
}
func TestBus_On(t *testing.T) {
g := Goblin(t)
const topic = "test"
g.Describe("On", func() {
g.It("registers listener", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
if bus.listeners[topic][0] != listener {
// A normal Assert does not properly compare channels.
g.Fail("wrong listener registered")
}
// Cleanup
bus.Off(listener, topic)
close(listener)
})
})
}
func TestBus_Publish(t *testing.T) {
g := Goblin(t)
const topic = "test"
const message = "this is a test message!"
g.Describe("Publish", func() {
g.It("publishes message", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
bus.On(listener, topic)
g.Assert(len(bus.listeners[topic])).Equal(1, "Listener was not registered")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
close(listener)
bus.Off(listener, topic)
})
g.It("publishes message to all listeners", func() {
bus := NewBus()
g.Assert(bus.listeners[topic]).IsNotNil()
g.Assert(len(bus.listeners[topic])).IsZero()
listener := make(chan Event)
listener2 := make(chan Event)
listener3 := make(chan Event)
bus.On(listener, topic)
bus.On(listener2, topic)
bus.On(listener3, topic)
g.Assert(len(bus.listeners[topic])).Equal(3, "Listener was not registered")
done := make(chan struct{}, 1)
go func() {
for i := 0; i < 3; i++ {
select {
case m := <-listener:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case m := <-listener2:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case m := <-listener3:
g.Assert(m.Topic).Equal(topic)
g.Assert(m.Data).Equal(message)
case <-time.After(1 * time.Second):
g.Fail("all listeners did not receive the message in time")
i = 3
}
}
done <- struct{}{}
}()
bus.Publish(topic, message)
<-done
// Cleanup
bus.Off(listener, topic)
bus.Off(listener2, topic)
bus.Off(listener3, topic)
close(listener)
close(listener2)
close(listener3)
})
})
}

View File

@ -1,50 +0,0 @@
package events
import (
"reflect"
"github.com/gammazero/workerpool"
)
type CallbackPool struct {
callbacks []*func(Event)
pool *workerpool.WorkerPool
}
// Pushes a new callback into the array of listeners for the pool.
func (cp *CallbackPool) Add(callback *func(Event)) {
if cp.index(reflect.ValueOf(callback)) < 0 {
cp.callbacks = append(cp.callbacks, callback)
}
}
// Removes a callback from the array of registered callbacks if it exists.
func (cp *CallbackPool) Remove(callback *func(Event)) {
i := cp.index(reflect.ValueOf(callback))
// If i < 0 it means there was no index found for the given callback, meaning it was
// never registered or was already unregistered from the listeners. Also double check
// that we didn't somehow escape the length of the topic callback (not sure how that
// would happen, but lets avoid a panic condition).
if i < 0 || i >= len(cp.callbacks) {
return
}
// We can assume that the topic still exists at this point since we acquire an exclusive
// lock on the process, and the "e.index" function cannot return a value >= 0 if there is
// no topic already existing.
cp.callbacks = append(cp.callbacks[:i], cp.callbacks[i+1:]...)
}
// Finds the index of a given callback in the topic by comparing all of the registered callback
// pointers to the passed function. This function does not aquire a lock as it should only be called
// within the confines of a function that has already acquired a lock for the duration of the lookup.
func (cp *CallbackPool) index(v reflect.Value) int {
for i, handler := range cp.callbacks {
if reflect.ValueOf(handler).Pointer() == v.Pointer() {
return i
}
}
return -1
}

View File

@ -188,6 +188,8 @@ func deleteServer(c *gin.Context) {
// as well. // as well.
s.CtxCancel() s.CtxCancel()
s.Events().Destroy() s.Events().Destroy()
s.LogSink().Destroy()
s.InstallSink().Destroy()
s.Websockets().CancelAll() s.Websockets().CancelAll()
// Remove any pending remote file downloads for the server. // Remove any pending remote file downloads for the server.

View File

@ -2,6 +2,7 @@ package websocket
import ( import (
"context" "context"
"encoding/json"
"sync" "sync"
"time" "time"
@ -53,9 +54,9 @@ func (h *Handler) listenForExpiration(ctx context.Context) {
jwt := h.GetJwt() jwt := h.GetJwt()
if jwt != nil { if jwt != nil {
if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 0 { if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 0 {
_ = h.SendJson(&Message{Event: TokenExpiredEvent}) _ = h.SendJson(Message{Event: TokenExpiredEvent})
} else if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 60 { } else if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 60 {
_ = h.SendJson(&Message{Event: TokenExpiringEvent}) _ = h.SendJson(Message{Event: TokenExpiringEvent})
} }
} }
} }
@ -79,38 +80,79 @@ var e = []string{
// ListenForServerEvents will listen for different events happening on a server // ListenForServerEvents will listen for different events happening on a server
// and send them along to the connected websocket client. This function will // and send them along to the connected websocket client. This function will
// block until the context provided to it is canceled. // block until the context provided to it is canceled.
func (h *Handler) listenForServerEvents(pctx context.Context) error { func (h *Handler) listenForServerEvents(ctx context.Context) error {
var o sync.Once var o sync.Once
var err error var err error
ctx, cancel := context.WithCancel(pctx)
callback := func(e events.Event) { ctx, cancel := context.WithCancel(ctx)
if sendErr := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); sendErr != nil { defer cancel()
h.Logger().WithField("event", e.Topic).WithField("error", sendErr).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling eventChan := make(chan events.Event)
// the context. This way if additional processing errors come through due logOutput := make(chan []byte)
// to a massive flood of things you still only report and stop at the first. installOutput := make(chan []byte)
o.Do(func() { h.server.Events().On(eventChan, e...)
err = sendErr h.server.LogSink().On(logOutput)
cancel() h.server.InstallSink().On(installOutput)
})
} onError := func(evt string, err2 error) {
h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = err2
})
cancel()
} }
// Subscribe to all of the events with the same callback that will push the for {
// data out over the websocket for the server. select {
for _, evt := range e { case <-ctx.Done():
h.server.Events().On(evt, &callback) break
case e := <-logOutput:
sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}})
if sendErr == nil {
continue
}
onError(server.ConsoleOutputEvent, sendErr)
case e := <-installOutput:
sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}})
if sendErr == nil {
continue
}
onError(server.InstallOutputEvent, sendErr)
case e := <-eventChan:
var sendErr error
message := Message{Event: e.Topic}
if str, ok := e.Data.(string); ok {
message.Args = []string{str}
} else if b, ok := e.Data.([]byte); ok {
message.Args = []string{string(b)}
} else {
b, sendErr = json.Marshal(e.Data)
if sendErr == nil {
message.Args = []string{string(b)}
}
}
if sendErr == nil {
sendErr = h.SendJson(message)
if sendErr == nil {
continue
}
}
onError(message.Event, sendErr)
}
break
} }
// When this function returns de-register all of the event listeners. h.server.Events().Off(eventChan, e...)
defer func() { h.server.InstallSink().Off(logOutput)
for _, evt := range e { h.server.InstallSink().Off(installOutput)
h.server.Events().Off(evt, &callback) close(eventChan)
} close(logOutput)
}() close(installOutput)
<-ctx.Done()
// If the internal context is stopped it is either because the parent context // If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil // got canceled or because we ran into an error. If the "err" variable is nil
// we can assume the parent was canceled and need not perform any actions. // we can assume the parent was canceled and need not perform any actions.

View File

@ -122,18 +122,17 @@ func (h *Handler) Logger() *log.Entry {
WithField("server", h.server.ID()) WithField("server", h.server.ID())
} }
func (h *Handler) SendJson(v *Message) error { func (h *Handler) SendJson(v Message) error {
// Do not send JSON down the line if the JWT on the connection is not valid! // Do not send JSON down the line if the JWT on the connection is not valid!
if err := h.TokenValid(); err != nil { if err := h.TokenValid(); err != nil {
h.unsafeSendJson(Message{ _ = h.unsafeSendJson(Message{
Event: JwtErrorEvent, Event: JwtErrorEvent,
Args: []string{err.Error()}, Args: []string{err.Error()},
}) })
return nil return nil
} }
j := h.GetJwt() if j := h.GetJwt(); j != nil {
if j != nil {
// If we're sending installation output but the user does not have the required // If we're sending installation output but the user does not have the required
// permissions to see the output, don't send it down the line. // permissions to see the output, don't send it down the line.
if v.Event == server.InstallOutputEvent { if v.Event == server.InstallOutputEvent {
@ -297,7 +296,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
h.setJwt(token) h.setJwt(token)
// Tell the client they authenticated successfully. // Tell the client they authenticated successfully.
h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent}) _ = h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent})
// Check if the client was refreshing their authentication token // Check if the client was refreshing their authentication token
// instead of authenticating for the first time. // instead of authenticating for the first time.
@ -315,7 +314,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
// On every authentication event, send the current server status back // On every authentication event, send the current server status back
// to the client. :) // to the client. :)
state := h.server.Environment.State() state := h.server.Environment.State()
h.SendJson(&Message{ _ = h.SendJson(Message{
Event: server.StatusEvent, Event: server.StatusEvent,
Args: []string{state}, Args: []string{state},
}) })
@ -327,7 +326,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
_ = h.server.Filesystem().HasSpaceAvailable(false) _ = h.server.Filesystem().HasSpaceAvailable(false)
b, _ := json.Marshal(h.server.Proc()) b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{ _ = h.SendJson(Message{
Event: server.StatsEvent, Event: server.StatsEvent,
Args: []string{string(b)}, Args: []string{string(b)},
}) })
@ -357,7 +356,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later") m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later")
h.SendJson(&Message{ _ = h.SendJson(Message{
Event: ErrorEvent, Event: ErrorEvent,
Args: []string{m}, Args: []string{m},
}) })
@ -381,7 +380,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
} }
for _, line := range logs { for _, line := range logs {
h.SendJson(&Message{ _ = h.SendJson(Message{
Event: server.ConsoleOutputEvent, Event: server.ConsoleOutputEvent,
Args: []string{line}, Args: []string{line},
}) })
@ -392,7 +391,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
case SendStatsEvent: case SendStatsEvent:
{ {
b, _ := json.Marshal(h.server.Proc()) b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{ _ = h.SendJson(Message{
Event: server.StatsEvent, Event: server.StatsEvent,
Args: []string{string(b)}, Args: []string{string(b)},
}) })

View File

@ -79,7 +79,7 @@ func (s *Server) Backup(b backup.BackupInterface) error {
s.Log().WithField("backup", b.Identifier()).Info("notified panel of failed backup state") s.Log().WithField("backup", b.Identifier()).Info("notified panel of failed backup state")
} }
_ = s.Events().PublishJson(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ s.Events().Publish(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{
"uuid": b.Identifier(), "uuid": b.Identifier(),
"is_successful": false, "is_successful": false,
"checksum": "", "checksum": "",
@ -103,7 +103,7 @@ func (s *Server) Backup(b backup.BackupInterface) error {
// Emit an event over the socket so we can update the backup in realtime on // Emit an event over the socket so we can update the backup in realtime on
// the frontend for the server. // the frontend for the server.
_ = s.Events().PublishJson(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{ s.Events().Publish(BackupCompletedEvent+":"+b.Identifier(), map[string]interface{}{
"uuid": b.Identifier(), "uuid": b.Identifier(),
"is_successful": true, "is_successful": true,
"checksum": ad.Checksum, "checksum": ad.Checksum,

View File

@ -21,12 +21,12 @@ const (
) )
// Returns the server's emitter instance. // Returns the server's emitter instance.
func (s *Server) Events() *events.EventBus { func (s *Server) Events() *events.Bus {
s.emitterLock.Lock() s.emitterLock.Lock()
defer s.emitterLock.Unlock() defer s.emitterLock.Unlock()
if s.emitter == nil { if s.emitter == nil {
s.emitter = events.New() s.emitter = events.NewBus()
} }
return s.emitter return s.emitter

View File

@ -521,10 +521,7 @@ func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) erro
} }
defer reader.Close() defer reader.Close()
evts := ip.Server.Events() err = system.ScanReader(reader, ip.Server.InstallSink().Push)
err = system.ScanReader(reader, func(line string) {
evts.Publish(InstallOutputEvent, line)
})
if err != nil { if err != nil {
ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines")
} }

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"encoding/json"
"regexp" "regexp"
"strconv" "strconv"
"sync" "sync"
@ -51,98 +50,103 @@ func (dsl *diskSpaceLimiter) Trigger() {
}) })
} }
func (s *Server) processConsoleOutputEvent(v []byte) {
t := s.Throttler()
err := t.Increment(func() {
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
})
// An error is only returned if the server has breached the thresholds set.
if err != nil {
// If the process is already stopping, just let it continue with that action rather than attempting
// to terminate again.
if s.Environment.State() != environment.ProcessStoppingState {
s.Environment.SetState(environment.ProcessStoppingState)
go func() {
s.Log().Warn("stopping server instance, violating throttle limits")
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.")
// Completely skip over server power actions and terminate the running instance. This gives the
// server 15 seconds to finish stopping gracefully before it is forcefully terminated.
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil {
// If there is an error set the process back to running so that this throttler is called
// again and hopefully kills the server.
if s.Environment.State() != environment.ProcessOfflineState {
s.Environment.SetState(environment.ProcessRunningState)
}
s.Log().WithField("error", err).Error("failed to terminate environment after triggering throttle")
}
}()
}
}
// If we are not throttled, go ahead and output the data.
if !t.Throttled() {
s.LogSink().Push(v)
}
// Also pass the data along to the console output channel.
s.onConsoleOutput(string(v))
}
// StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be // StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be
// removed by deleting the server as they should last for the duration of the process' lifetime. // removed by deleting the server as they should last for the duration of the process' lifetime.
func (s *Server) StartEventListeners() { func (s *Server) StartEventListeners() {
console := func(e events.Event) { state := make(chan events.Event)
t := s.Throttler() stats := make(chan events.Event)
err := t.Increment(func() { docker := make(chan events.Event)
s.PublishConsoleOutputFromDaemon("Your server is outputting too much data and is being throttled.")
})
// An error is only returned if the server has breached the thresholds set.
if err != nil {
// If the process is already stopping, just let it continue with that action rather than attempting
// to terminate again.
if s.Environment.State() != environment.ProcessStoppingState {
s.Environment.SetState(environment.ProcessStoppingState)
go func() {
l := newDiskLimiter(s)
for {
select {
case e := <-state:
go func() { go func() {
s.Log().Warn("stopping server instance, violating throttle limits") // Reset the throttler when the process is started.
s.PublishConsoleOutputFromDaemon("Your server is being stopped for outputting too much data in a short period of time.") if e.Data == environment.ProcessStartingState {
l.Reset()
s.Throttler().Reset()
}
// Completely skip over server power actions and terminate the running instance. This gives the s.OnStateChange()
// server 15 seconds to finish stopping gracefully before it is forcefully terminated. }()
if err := s.Environment.WaitForStop(config.Get().Throttles.StopGracePeriod, true); err != nil { case e := <-stats:
// If there is an error set the process back to running so that this throttler is called go func() {
// again and hopefully kills the server. // Update the server resource tracking object with the resources we got here.
if s.Environment.State() != environment.ProcessOfflineState { s.resources.mu.Lock()
s.Environment.SetState(environment.ProcessRunningState) s.resources.Stats = e.Data.(environment.Stats)
} s.resources.mu.Unlock()
s.Log().WithField("error", err).Error("failed to terminate environment after triggering throttle") // If there is no disk space available at this point, trigger the server disk limiter logic
// which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
l.Trigger()
}
s.emitProcUsage()
}()
case e := <-docker:
go func() {
switch e.Topic {
case environment.DockerImagePullStatus:
s.Events().Publish(InstallOutputEvent, e.Data)
case environment.DockerImagePullStarted:
s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...")
default:
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
} }
}() }()
} }
} }
}()
// If we are not throttled, go ahead and output the data.
if !t.Throttled() {
s.Events().Publish(ConsoleOutputEvent, e.Data)
}
// Also pass the data along to the console output channel.
s.onConsoleOutput(e.Data)
}
l := newDiskLimiter(s)
state := func(e events.Event) {
// Reset the throttler when the process is started.
if e.Data == environment.ProcessStartingState {
l.Reset()
s.Throttler().Reset()
}
s.OnStateChange()
}
stats := func(e events.Event) {
var st environment.Stats
if err := json.Unmarshal([]byte(e.Data), &st); err != nil {
s.Log().WithField("error", err).Warn("failed to unmarshal server environment stats")
return
}
// Update the server resource tracking object with the resources we got here.
s.resources.mu.Lock()
s.resources.Stats = st
s.resources.mu.Unlock()
// If there is no disk space available at this point, trigger the server disk limiter logic
// which will start to stop the running instance.
if !s.Filesystem().HasSpaceAvailable(true) {
l.Trigger()
}
s.emitProcUsage()
}
docker := func(e events.Event) {
if e.Topic == environment.DockerImagePullStatus {
s.Events().Publish(InstallOutputEvent, e.Data)
} else if e.Topic == environment.DockerImagePullStarted {
s.PublishConsoleOutputFromDaemon("Pulling Docker container image, this could take a few minutes to complete...")
} else {
s.PublishConsoleOutputFromDaemon("Finished pulling Docker container image")
}
}
s.Log().Debug("registering event listeners: console, state, resources...") s.Log().Debug("registering event listeners: console, state, resources...")
s.Environment.Events().On(environment.ConsoleOutputEvent, &console) s.Environment.SetLogCallback(s.processConsoleOutputEvent)
s.Environment.Events().On(environment.StateChangeEvent, &state) s.Environment.Events().On(state, environment.StateChangeEvent)
s.Environment.Events().On(environment.ResourceEvent, &stats) s.Environment.Events().On(stats, environment.ResourceEvent)
for _, evt := range dockerEvents { s.Environment.Events().On(docker, dockerEvents...)
s.Environment.Events().On(evt, &docker)
}
} }
var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))") var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))")

View File

@ -52,7 +52,5 @@ func (ru *ResourceUsage) Reset() {
} }
func (s *Server) emitProcUsage() { func (s *Server) emitProcUsage() {
if err := s.Events().PublishJson(StatsEvent, s.Proc()); err != nil { s.Events().Publish(StatsEvent, s.Proc())
s.Log().WithField("error", err).Warn("error while emitting server resource usage to listeners")
}
} }

View File

@ -49,7 +49,7 @@ type Server struct {
fs *filesystem.Filesystem fs *filesystem.Filesystem
// Events emitted by the server instance. // Events emitted by the server instance.
emitter *events.EventBus emitter *events.Bus
// Defines the process configuration for the server instance. This is dynamically // Defines the process configuration for the server instance. This is dynamically
// fetched from the Pterodactyl Server instance each time the server process is // fetched from the Pterodactyl Server instance each time the server process is
@ -70,6 +70,9 @@ type Server struct {
// Tracks open websocket connections for the server. // Tracks open websocket connections for the server.
wsBag *WebsocketBag wsBag *WebsocketBag
wsBagLocker sync.Mutex wsBagLocker sync.Mutex
logSink *sinkPool
installSink *sinkPool
} }
// New returns a new server instance with a context and all of the default // New returns a new server instance with a context and all of the default
@ -83,6 +86,9 @@ func New(client remote.Client) (*Server, error) {
installing: system.NewAtomicBool(false), installing: system.NewAtomicBool(false),
transferring: system.NewAtomicBool(false), transferring: system.NewAtomicBool(false),
restoring: system.NewAtomicBool(false), restoring: system.NewAtomicBool(false),
logSink: newSinkPool(),
installSink: newSinkPool(),
} }
if err := defaults.Set(&s); err != nil { if err := defaults.Set(&s); err != nil {
return nil, errors.Wrap(err, "server: could not set default values for struct") return nil, errors.Wrap(err, "server: could not set default values for struct")
@ -349,3 +355,11 @@ func (s *Server) ToAPIResponse() APIResponse {
Configuration: *s.Config(), Configuration: *s.Config(),
} }
} }
func (s *Server) LogSink() *sinkPool {
return s.logSink
}
func (s *Server) InstallSink() *sinkPool {
return s.installSink
}

71
server/sink.go Normal file
View File

@ -0,0 +1,71 @@
package server
import (
"sync"
"time"
)
// sinkPool represents a pool with sinks.
type sinkPool struct {
mx sync.RWMutex
sinks []chan []byte
}
// newSinkPool returns a new empty sinkPool.
func newSinkPool() *sinkPool {
return &sinkPool{}
}
// On adds a sink on the pool.
func (p *sinkPool) On(c chan []byte) {
p.mx.Lock()
defer p.mx.Unlock()
p.sinks = append(p.sinks, c)
}
// Off removes a sink from the pool.
func (p *sinkPool) Off(c chan []byte) {
p.mx.Lock()
defer p.mx.Unlock()
sinks := p.sinks
for i, sink := range sinks {
if c != sink {
continue
}
copy(sinks[i:], sinks[i+1:])
sinks[len(sinks)-1] = nil
sinks = sinks[:len(sinks)-1]
p.sinks = sinks
return
}
}
// Destroy destroys the pool by removing and closing all sinks.
func (p *sinkPool) Destroy() {
p.mx.Lock()
defer p.mx.Unlock()
for _, c := range p.sinks {
close(c)
}
p.sinks = nil
}
// Push pushes a message to all registered sinks.
func (p *sinkPool) Push(v []byte) {
p.mx.RLock()
for _, c := range p.sinks {
// TODO: should this be done in parallel?
select {
// Send the log output to the channel
case c <- v:
// Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled.
case <-time.After(100 * time.Millisecond):
}
}
p.mx.RUnlock()
}

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"io" "io"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -38,14 +37,14 @@ func MustInt(v string) int {
return i return i
} }
func ScanReader(r io.Reader, callback func(line string)) error { func ScanReader(r io.Reader, callback func(line []byte)) error {
br := bufio.NewReader(r) br := bufio.NewReader(r)
// Avoid constantly re-allocating memory when we're flooding lines through this // Avoid constantly re-allocating memory when we're flooding lines through this
// function by using the same buffer for the duration of the call and just truncating // function by using the same buffer for the duration of the call and just truncating
// the value back to 0 every loop. // the value back to 0 every loop.
var str strings.Builder buf := &bytes.Buffer{}
for { for {
str.Reset() buf.Reset()
var err error var err error
var line []byte var line []byte
var isPrefix bool var isPrefix bool
@ -57,7 +56,7 @@ func ScanReader(r io.Reader, callback func(line string)) error {
// in line with that it thinks is the terminal size. Those returns break a lot of output handling, // in line with that it thinks is the terminal size. Those returns break a lot of output handling,
// so we'll just replace them with proper new-lines and then split it later and send each line as // so we'll just replace them with proper new-lines and then split it later and send each line as
// its own event in the response. // its own event in the response.
str.Write(bytes.Replace(line, cr, crr, -1)) buf.Write(bytes.Replace(line, cr, crr, -1))
// Finish this loop and begin outputting the line if there is no prefix (the line fit into // Finish this loop and begin outputting the line if there is no prefix (the line fit into
// the default buffer), or if we hit the end of the line. // the default buffer), or if we hit the end of the line.
if !isPrefix || err == io.EOF { if !isPrefix || err == io.EOF {
@ -71,8 +70,9 @@ func ScanReader(r io.Reader, callback func(line string)) error {
} }
// Publish the line for this loop. Break on new-line characters so every line is sent as a single // Publish the line for this loop. Break on new-line characters so every line is sent as a single
// output event, otherwise you get funky handling in the browser console. // output event, otherwise you get funky handling in the browser console.
for _, line := range strings.Split(str.String(), "\r\n") { s := bufio.NewScanner(buf)
callback(line) for s.Scan() {
callback(s.Bytes())
} }
// If the error we got previously that lead to the line being output is an io.EOF we want to // If the error we got previously that lead to the line being output is an io.EOF we want to
// exit the entire looping process. // exit the entire looping process.