tweaks to websocket event handling

This commit is contained in:
Matthew Penner 2021-11-15 10:13:31 -07:00
parent 54b6033392
commit d874af85db
No known key found for this signature in database
GPG Key ID: BAB67850901908A8
2 changed files with 42 additions and 31 deletions

View File

@ -35,7 +35,6 @@ func (h *Handler) registerListenerEvents(ctx context.Context) {
go h.listenForExpiration(ctx) go h.listenForExpiration(ctx)
} }
// ListenForExpiration checks the time to expiration on the JWT every 30 seconds // ListenForExpiration checks the time to expiration on the JWT every 30 seconds
// until the token has expired. If we are within 3 minutes of the token expiring, // until the token has expired. If we are within 3 minutes of the token expiring,
// send a notice over the socket that it is expiring soon. If it has expired, // send a notice over the socket that it is expiring soon. If it has expired,
@ -80,22 +79,17 @@ var e = []string{
// ListenForServerEvents will listen for different events happening on a server // ListenForServerEvents will listen for different events happening on a server
// and send them along to the connected websocket client. This function will // and send them along to the connected websocket client. This function will
// block until the context provided to it is canceled. // block until the context provided to it is canceled.
func (h *Handler) listenForServerEvents(pctx context.Context) error { func (h *Handler) listenForServerEvents(ctx context.Context) error {
var o sync.Once var o sync.Once
var err error var err error
ctx, cancel := context.WithCancel(pctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := make(chan events.Event)
callback := func(e events.Event) { callback := func(e events.Event) {
if sendErr := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}}); sendErr != nil { c <- e
h.Logger().WithField("event", e.Topic).WithField("error", sendErr).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = sendErr
cancel()
})
}
} }
// Subscribe to all of the events with the same callback that will push the // Subscribe to all of the events with the same callback that will push the
@ -104,20 +98,38 @@ func (h *Handler) listenForServerEvents(pctx context.Context) error {
h.server.Events().On(evt, &callback) h.server.Events().On(evt, &callback)
} }
// When this function returns de-register all of the event listeners. for {
defer func() { select {
for _, evt := range e { case <-ctx.Done():
h.server.Events().Off(evt, &callback) break
} case e := <-c:
}() sendErr := h.SendJson(&Message{Event: e.Topic, Args: []string{e.Data}})
if sendErr == nil {
continue
}
h.Logger().WithField("event", e.Topic).WithField("error", sendErr).Error("failed to send event over server websocket")
// Avoid race conditions by only setting the error once and then canceling
// the context. This way if additional processing errors come through due
// to a massive flood of things you still only report and stop at the first.
o.Do(func() {
err = sendErr
})
cancel()
}
break
}
for _, evt := range e {
h.server.Events().Off(evt, &callback)
}
close(c)
<-ctx.Done()
// If the internal context is stopped it is either because the parent context // If the internal context is stopped it is either because the parent context
// got canceled or because we ran into an error. If the "err" variable is nil // got canceled or because we ran into an error. If the "err" variable is nil
// we can assume the parent was canceled and need not perform any actions. // we can assume the parent was canceled and need not perform any actions.
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
return nil return nil
} }

View File

@ -125,15 +125,14 @@ func (h *Handler) Logger() *log.Entry {
func (h *Handler) SendJson(v *Message) error { func (h *Handler) SendJson(v *Message) error {
// Do not send JSON down the line if the JWT on the connection is not valid! // Do not send JSON down the line if the JWT on the connection is not valid!
if err := h.TokenValid(); err != nil { if err := h.TokenValid(); err != nil {
h.unsafeSendJson(Message{ _ = h.unsafeSendJson(Message{
Event: JwtErrorEvent, Event: JwtErrorEvent,
Args: []string{err.Error()}, Args: []string{err.Error()},
}) })
return nil return nil
} }
j := h.GetJwt() if j := h.GetJwt(); j != nil {
if j != nil {
// If we're sending installation output but the user does not have the required // If we're sending installation output but the user does not have the required
// permissions to see the output, don't send it down the line. // permissions to see the output, don't send it down the line.
if v.Event == server.InstallOutputEvent { if v.Event == server.InstallOutputEvent {
@ -297,7 +296,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
h.setJwt(token) h.setJwt(token)
// Tell the client they authenticated successfully. // Tell the client they authenticated successfully.
h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent}) _ = h.unsafeSendJson(Message{Event: AuthenticationSuccessEvent})
// Check if the client was refreshing their authentication token // Check if the client was refreshing their authentication token
// instead of authenticating for the first time. // instead of authenticating for the first time.
@ -315,7 +314,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
// On every authentication event, send the current server status back // On every authentication event, send the current server status back
// to the client. :) // to the client. :)
state := h.server.Environment.State() state := h.server.Environment.State()
h.SendJson(&Message{ _ = h.SendJson(&Message{
Event: server.StatusEvent, Event: server.StatusEvent,
Args: []string{state}, Args: []string{state},
}) })
@ -327,7 +326,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
_ = h.server.Filesystem().HasSpaceAvailable(false) _ = h.server.Filesystem().HasSpaceAvailable(false)
b, _ := json.Marshal(h.server.Proc()) b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{ _ = h.SendJson(&Message{
Event: server.StatsEvent, Event: server.StatsEvent,
Args: []string{string(b)}, Args: []string{string(b)},
}) })
@ -357,7 +356,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later") m, _ := h.GetErrorMessage("another power action is currently being processed for this server, please try again later")
h.SendJson(&Message{ _ = h.SendJson(&Message{
Event: ErrorEvent, Event: ErrorEvent,
Args: []string{m}, Args: []string{m},
}) })
@ -369,7 +368,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
} }
case SendServerLogsEvent: case SendServerLogsEvent:
{ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
if running, _ := h.server.Environment.IsRunning(ctx); !running { if running, _ := h.server.Environment.IsRunning(ctx); !running {
return nil return nil
@ -381,7 +380,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
} }
for _, line := range logs { for _, line := range logs {
h.SendJson(&Message{ _ = h.SendJson(&Message{
Event: server.ConsoleOutputEvent, Event: server.ConsoleOutputEvent,
Args: []string{line}, Args: []string{line},
}) })
@ -392,7 +391,7 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
case SendStatsEvent: case SendStatsEvent:
{ {
b, _ := json.Marshal(h.server.Proc()) b, _ := json.Marshal(h.server.Proc())
h.SendJson(&Message{ _ = h.SendJson(&Message{
Event: server.StatsEvent, Event: server.StatsEvent,
Args: []string{string(b)}, Args: []string{string(b)},
}) })