Compare commits

...

4 Commits

Author SHA1 Message Date
Matthew Penner
99ed8dc9a9
websocket: remove channel buffers 2022-01-22 12:30:07 -07:00
Matthew Penner
86f41c8027
server: remove Push timeout from sinkPool 2022-01-22 12:29:00 -07:00
Matthew Penner
766692bfe6
websocket: add small buffer to listener channels 2022-01-22 10:16:56 -07:00
Matthew Penner
764aed89ae
events: remove waitgroup on Publish, add Destroy test 2022-01-22 10:14:54 -07:00
4 changed files with 63 additions and 15 deletions

View File

@ -104,17 +104,13 @@ func (b *Bus) Publish(topic string, data interface{}) {
return return
} }
var wg sync.WaitGroup
event := Event{Topic: topic, Data: data} event := Event{Topic: topic, Data: data}
for _, listener := range listeners { for _, listener := range listeners {
l := listener l := listener
wg.Add(1)
go func(l Listener, event Event) { go func(l Listener, event Event) {
defer wg.Done()
l <- event l <- event
}(l, event) }(l, event)
} }
wg.Wait()
} }
// Destroy destroys the Event Bus by unregistering and closing all listeners. // Destroy destroys the Event Bus by unregistering and closing all listeners.

View File

@ -168,3 +168,58 @@ func TestBus_Publish(t *testing.T) {
}) })
}) })
} }
func TestBus_Destroy(t *testing.T) {
g := Goblin(t)
g.Describe("Destroy", func() {
g.It("unsubscribes and closes all listeners", func() {
bus := NewBus()
listener := make(chan Event)
bus.On(listener, "test")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m).IsZero()
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Destroy()
<-done
g.Assert(bus.listeners).Equal(map[string][]Listener{})
})
// This is a check that ensures Destroy only closes each listener
// channel once, even if it is subscribed to multiple topics.
//
// Closing a channel multiple times will cause a runtime panic, which
// I'm pretty sure we don't want.
g.It("unsubscribes and closes channel only once", func() {
bus := NewBus()
listener := make(chan Event)
bus.On(listener, "test", "test2", "test3", "test4", "test5")
done := make(chan struct{}, 1)
go func() {
select {
case m := <-listener:
g.Assert(m).IsZero()
case <-time.After(1 * time.Second):
g.Fail("listener did not receive message in time")
}
done <- struct{}{}
}()
bus.Destroy()
<-done
g.Assert(bus.listeners).Equal(map[string][]Listener{})
})
})
}

View File

@ -267,11 +267,18 @@ func (h *Handler) setJwt(token *tokens.WebsocketPayload) {
h.Unlock() h.Unlock()
} }
var actions = map[server.PowerAction]string{
server.PowerActionStart: PermissionSendPowerStart,
server.PowerActionStop: PermissionSendPowerStop,
server.PowerActionRestart: PermissionSendPowerRestart,
server.PowerActionTerminate: PermissionSendPowerStop,
}
// HandleInbound handles an inbound socket request and route it to the proper action. // HandleInbound handles an inbound socket request and route it to the proper action.
func (h *Handler) HandleInbound(ctx context.Context, m Message) error { func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
if m.Event != AuthenticationEvent { if m.Event != AuthenticationEvent {
if err := h.TokenValid(); err != nil { if err := h.TokenValid(); err != nil {
h.unsafeSendJson(Message{ _ = h.unsafeSendJson(Message{
Event: JwtErrorEvent, Event: JwtErrorEvent,
Args: []string{err.Error()}, Args: []string{err.Error()},
}) })
@ -339,12 +346,6 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
{ {
action := server.PowerAction(strings.Join(m.Args, "")) action := server.PowerAction(strings.Join(m.Args, ""))
actions := make(map[server.PowerAction]string)
actions[server.PowerActionStart] = PermissionSendPowerStart
actions[server.PowerActionStop] = PermissionSendPowerStop
actions[server.PowerActionRestart] = PermissionSendPowerRestart
actions[server.PowerActionTerminate] = PermissionSendPowerStop
// Check that they have permission to perform this action if it is needed. // Check that they have permission to perform this action if it is needed.
if permission, exists := actions[action]; exists { if permission, exists := actions[action]; exists {
if !h.GetJwt().HasPermission(permission) { if !h.GetJwt().HasPermission(permission) {

View File

@ -2,7 +2,6 @@ package server
import ( import (
"sync" "sync"
"time"
) )
// sinkPool represents a pool with sinks. // sinkPool represents a pool with sinks.
@ -60,12 +59,9 @@ func (p *sinkPool) Destroy() {
func (p *sinkPool) Push(v []byte) { func (p *sinkPool) Push(v []byte) {
p.mx.RLock() p.mx.RLock()
for _, c := range p.sinks { for _, c := range p.sinks {
// TODO: should this be done in parallel?
select { select {
// Send the log output to the channel // Send the log output to the channel
case c <- v: case c <- v:
// Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled.
case <-time.After(100 * time.Millisecond):
} }
} }
p.mx.RUnlock() p.mx.RUnlock()