Compare commits
	
		
			4 Commits
		
	
	
		
			develop
			...
			matthewpi/
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					99ed8dc9a9 | ||
| 
						 | 
					86f41c8027 | ||
| 
						 | 
					766692bfe6 | ||
| 
						 | 
					764aed89ae | 
| 
						 | 
					@ -104,17 +104,13 @@ func (b *Bus) Publish(topic string, data interface{}) {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var wg sync.WaitGroup
 | 
					 | 
				
			||||||
	event := Event{Topic: topic, Data: data}
 | 
						event := Event{Topic: topic, Data: data}
 | 
				
			||||||
	for _, listener := range listeners {
 | 
						for _, listener := range listeners {
 | 
				
			||||||
		l := listener
 | 
							l := listener
 | 
				
			||||||
		wg.Add(1)
 | 
					 | 
				
			||||||
		go func(l Listener, event Event) {
 | 
							go func(l Listener, event Event) {
 | 
				
			||||||
			defer wg.Done()
 | 
					 | 
				
			||||||
			l <- event
 | 
								l <- event
 | 
				
			||||||
		}(l, event)
 | 
							}(l, event)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	wg.Wait()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Destroy destroys the Event Bus by unregistering and closing all listeners.
 | 
					// Destroy destroys the Event Bus by unregistering and closing all listeners.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -168,3 +168,58 @@ func TestBus_Publish(t *testing.T) {
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestBus_Destroy(t *testing.T) {
 | 
				
			||||||
 | 
						g := Goblin(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						g.Describe("Destroy", func() {
 | 
				
			||||||
 | 
							g.It("unsubscribes and closes all listeners", func() {
 | 
				
			||||||
 | 
								bus := NewBus()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								listener := make(chan Event)
 | 
				
			||||||
 | 
								bus.On(listener, "test")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								done := make(chan struct{}, 1)
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									select {
 | 
				
			||||||
 | 
									case m := <-listener:
 | 
				
			||||||
 | 
										g.Assert(m).IsZero()
 | 
				
			||||||
 | 
									case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
										g.Fail("listener did not receive message in time")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									done <- struct{}{}
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
								bus.Destroy()
 | 
				
			||||||
 | 
								<-done
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								g.Assert(bus.listeners).Equal(map[string][]Listener{})
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// This is a check that ensures Destroy only closes each listener
 | 
				
			||||||
 | 
							// channel once, even if it is subscribed to multiple topics.
 | 
				
			||||||
 | 
							//
 | 
				
			||||||
 | 
							// Closing a channel multiple times will cause a runtime panic, which
 | 
				
			||||||
 | 
							// I'm pretty sure we don't want.
 | 
				
			||||||
 | 
							g.It("unsubscribes and closes channel only once", func() {
 | 
				
			||||||
 | 
								bus := NewBus()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								listener := make(chan Event)
 | 
				
			||||||
 | 
								bus.On(listener, "test", "test2", "test3", "test4", "test5")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								done := make(chan struct{}, 1)
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									select {
 | 
				
			||||||
 | 
									case m := <-listener:
 | 
				
			||||||
 | 
										g.Assert(m).IsZero()
 | 
				
			||||||
 | 
									case <-time.After(1 * time.Second):
 | 
				
			||||||
 | 
										g.Fail("listener did not receive message in time")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									done <- struct{}{}
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
								bus.Destroy()
 | 
				
			||||||
 | 
								<-done
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								g.Assert(bus.listeners).Equal(map[string][]Listener{})
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -267,11 +267,18 @@ func (h *Handler) setJwt(token *tokens.WebsocketPayload) {
 | 
				
			||||||
	h.Unlock()
 | 
						h.Unlock()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var actions = map[server.PowerAction]string{
 | 
				
			||||||
 | 
						server.PowerActionStart:     PermissionSendPowerStart,
 | 
				
			||||||
 | 
						server.PowerActionStop:      PermissionSendPowerStop,
 | 
				
			||||||
 | 
						server.PowerActionRestart:   PermissionSendPowerRestart,
 | 
				
			||||||
 | 
						server.PowerActionTerminate: PermissionSendPowerStop,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// HandleInbound handles an inbound socket request and route it to the proper action.
 | 
					// HandleInbound handles an inbound socket request and route it to the proper action.
 | 
				
			||||||
func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
 | 
					func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
 | 
				
			||||||
	if m.Event != AuthenticationEvent {
 | 
						if m.Event != AuthenticationEvent {
 | 
				
			||||||
		if err := h.TokenValid(); err != nil {
 | 
							if err := h.TokenValid(); err != nil {
 | 
				
			||||||
			h.unsafeSendJson(Message{
 | 
								_ = h.unsafeSendJson(Message{
 | 
				
			||||||
				Event: JwtErrorEvent,
 | 
									Event: JwtErrorEvent,
 | 
				
			||||||
				Args:  []string{err.Error()},
 | 
									Args:  []string{err.Error()},
 | 
				
			||||||
			})
 | 
								})
 | 
				
			||||||
| 
						 | 
					@ -339,12 +346,6 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			action := server.PowerAction(strings.Join(m.Args, ""))
 | 
								action := server.PowerAction(strings.Join(m.Args, ""))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			actions := make(map[server.PowerAction]string)
 | 
					 | 
				
			||||||
			actions[server.PowerActionStart] = PermissionSendPowerStart
 | 
					 | 
				
			||||||
			actions[server.PowerActionStop] = PermissionSendPowerStop
 | 
					 | 
				
			||||||
			actions[server.PowerActionRestart] = PermissionSendPowerRestart
 | 
					 | 
				
			||||||
			actions[server.PowerActionTerminate] = PermissionSendPowerStop
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Check that they have permission to perform this action if it is needed.
 | 
								// Check that they have permission to perform this action if it is needed.
 | 
				
			||||||
			if permission, exists := actions[action]; exists {
 | 
								if permission, exists := actions[action]; exists {
 | 
				
			||||||
				if !h.GetJwt().HasPermission(permission) {
 | 
									if !h.GetJwt().HasPermission(permission) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2,7 +2,6 @@ package server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// sinkPool represents a pool with sinks.
 | 
					// sinkPool represents a pool with sinks.
 | 
				
			||||||
| 
						 | 
					@ -60,12 +59,9 @@ func (p *sinkPool) Destroy() {
 | 
				
			||||||
func (p *sinkPool) Push(v []byte) {
 | 
					func (p *sinkPool) Push(v []byte) {
 | 
				
			||||||
	p.mx.RLock()
 | 
						p.mx.RLock()
 | 
				
			||||||
	for _, c := range p.sinks {
 | 
						for _, c := range p.sinks {
 | 
				
			||||||
		// TODO: should this be done in parallel?
 | 
					 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		// Send the log output to the channel
 | 
							// Send the log output to the channel
 | 
				
			||||||
		case c <- v:
 | 
							case c <- v:
 | 
				
			||||||
		// Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled.
 | 
					 | 
				
			||||||
		case <-time.After(100 * time.Millisecond):
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	p.mx.RUnlock()
 | 
						p.mx.RUnlock()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user