Merge branch 'develop' into v2

This commit is contained in:
Matthew Penner
2022-01-18 13:12:09 -07:00
50 changed files with 864 additions and 493 deletions

View File

@@ -2,6 +2,7 @@ package websocket
import (
"context"
"encoding/json"
"sync"
"time"
@@ -53,9 +54,9 @@ func (h *Handler) listenForExpiration(ctx context.Context) {
jwt := h.GetJwt()
if jwt != nil {
if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 0 {
_ = h.SendJson(&Message{Event: TokenExpiredEvent})
_ = h.SendJson(Message{Event: TokenExpiredEvent})
} else if jwt.ExpirationTime.Unix()-time.Now().Unix() <= 60 {
_ = h.SendJson(&Message{Event: TokenExpiringEvent})
_ = h.SendJson(Message{Event: TokenExpiringEvent})
}
}
}
@@ -86,44 +87,71 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := make(chan events.Event)
eventChan := make(chan events.Event)
logOutput := make(chan []byte)
installOutput := make(chan []byte)
h.server.Events().On(eventChan, e...)
h.server.LogSink().On(logOutput)
h.server.InstallSink().On(installOutput)
callback := func(e events.Event) {
c <- e
}
// Subscribe to all of the events with the same callback that will push the
// data out over the websocket for the server.
for _, evt := range e {
h.server.Events().On(evt, &callback)
onError := func(evt string, err2 error) {
h.Logger().WithField("event", evt).WithField("error", err2).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = err2
})
cancel()
}
for {
select {
case <-ctx.Done():
break
case e := <-c:
sendErr := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}})
case e := <-logOutput:
sendErr := h.SendJson(Message{Event: server.ConsoleOutputEvent, Args: []string{string(e)}})
if sendErr == nil {
continue
}
onError(server.ConsoleOutputEvent, sendErr)
case e := <-installOutput:
sendErr := h.SendJson(Message{Event: server.InstallOutputEvent, Args: []string{string(e)}})
if sendErr == nil {
continue
}
onError(server.InstallOutputEvent, sendErr)
case e := <-eventChan:
var sendErr error
message := Message{Event: e.Topic}
if str, ok := e.Data.(string); ok {
message.Args = []string{str}
} else if b, ok := e.Data.([]byte); ok {
message.Args = []string{string(b)}
} else {
b, sendErr = json.Marshal(e.Data)
if sendErr == nil {
message.Args = []string{string(b)}
}
}
h.Logger().WithField("event", e.Topic).WithField("error", sendErr).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = sendErr
})
cancel()
if sendErr == nil {
sendErr = h.SendJson(message)
if sendErr == nil {
continue
}
}
onError(message.Event, sendErr)
}
break
}
for _, evt := range e {
h.server.Events().Off(evt, &callback)
}
close(c)
h.server.Events().Off(eventChan, e...)
h.server.InstallSink().Off(logOutput)
h.server.InstallSink().Off(installOutput)
close(eventChan)
close(logOutput)
close(installOutput)
// If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil

View File

@@ -122,7 +122,7 @@ func (h *Handler) Logger() *log.Entry {
WithField("server", h.server.ID())
}
func (h *Handler) SendJson(v *Message) error {
func (h *Handler) SendJson(v Message) error {
// Do not send JSON down the line if the JWT on the connection is not valid!
if err := h.TokenValid(); err != nil {
_ = h.unsafeSendJson(Message{
@@ -314,7 +314,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
// On every authentication event, send the current server status back
// to the client. :)
state := h.server.Environment.State()
_ = h.SendJson(&Message{
_ = h.SendJson(Message{
Event: server.StatusEvent,
Args: []string{state},
})
@@ -326,7 +326,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
_ = h.server.Filesystem().HasSpaceAvailable(false)
b, _ := json.Marshal(h.server.Proc())
_ = h.SendJson(&Message{
_ = h.SendJson(Message{
Event: server.StatsEvent,
Args: []string{string(b)},
})
@@ -356,7 +356,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
if errors.Is(err, context.DeadlineExceeded) {
m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later")
_ = h.SendJson(&Message{
_ = h.SendJson(Message{
Event: ErrorEvent,
Args: []string{m},
})
@@ -380,7 +380,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
}
for _, line := range logs {
_ = h.SendJson(&Message{
_ = h.SendJson(Message{
Event: server.ConsoleOutputEvent,
Args: []string{line},
})
@@ -391,7 +391,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
case SendStatsEvent:
{
b, _ := json.Marshal(h.server.Proc())
_ = h.SendJson(&Message{
_ = h.SendJson(Message{
Event: server.StatsEvent,
Args: []string{string(b)},
})