Compare commits
9 Commits
v1.5.4
...
matthewpi/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99ed8dc9a9 | ||
|
|
86f41c8027 | ||
|
|
766692bfe6 | ||
|
|
764aed89ae | ||
|
|
45418c86dd | ||
|
|
71e56c7da6 | ||
|
|
4ba5fe2866 | ||
|
|
6d8c1d2225 | ||
|
|
a6b77a31dc |
@@ -1,5 +1,11 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## v1.5.5
|
||||||
|
### Fixed
|
||||||
|
* Fixes sending to a closed channel when sending server logs over the websocket
|
||||||
|
* Fixes `wings diagnostics` uploading no content
|
||||||
|
* Fixes a panic caused by the event bus closing channels multiple times when a server is deleted
|
||||||
|
|
||||||
## v1.5.4
|
## v1.5.4
|
||||||
### Fixed
|
### Fixed
|
||||||
* Fixes SSL paths being improperly converted to lowercase in environments where the path is case-sensitive.
|
* Fixes SSL paths being improperly converted to lowercase in environments where the path is case-sensitive.
|
||||||
|
|||||||
@@ -179,6 +179,17 @@ func diagnosticsCmdRun(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Fprintln(output, "Logs redacted.")
|
fmt.Fprintln(output, "Logs redacted.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !diagnosticsArgs.IncludeEndpoints {
|
||||||
|
s := output.String()
|
||||||
|
output.Reset()
|
||||||
|
s = strings.ReplaceAll(s, cfg.PanelLocation, "{redacted}")
|
||||||
|
s = strings.ReplaceAll(s, cfg.Api.Host, "{redacted}")
|
||||||
|
s = strings.ReplaceAll(s, cfg.Api.Ssl.CertificateFile, "{redacted}")
|
||||||
|
s = strings.ReplaceAll(s, cfg.Api.Ssl.KeyFile, "{redacted}")
|
||||||
|
s = strings.ReplaceAll(s, cfg.System.Sftp.Address, "{redacted}")
|
||||||
|
output.WriteString(s)
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Println("\n--------------- generated report ---------------")
|
fmt.Println("\n--------------- generated report ---------------")
|
||||||
fmt.Println(output.String())
|
fmt.Println(output.String())
|
||||||
fmt.Print("--------------- end of report ---------------\n\n")
|
fmt.Print("--------------- end of report ---------------\n\n")
|
||||||
@@ -188,16 +199,6 @@ func diagnosticsCmdRun(cmd *cobra.Command, args []string) {
|
|||||||
survey.AskOne(&survey.Confirm{Message: "Upload to " + diagnosticsArgs.HastebinURL + "?", Default: false}, &upload)
|
survey.AskOne(&survey.Confirm{Message: "Upload to " + diagnosticsArgs.HastebinURL + "?", Default: false}, &upload)
|
||||||
}
|
}
|
||||||
if upload {
|
if upload {
|
||||||
if !diagnosticsArgs.IncludeEndpoints {
|
|
||||||
s := output.String()
|
|
||||||
output.Reset()
|
|
||||||
a := strings.ReplaceAll(cfg.PanelLocation, s, "{redacted}")
|
|
||||||
a = strings.ReplaceAll(cfg.Api.Host, a, "{redacted}")
|
|
||||||
a = strings.ReplaceAll(cfg.Api.Ssl.CertificateFile, a, "{redacted}")
|
|
||||||
a = strings.ReplaceAll(cfg.Api.Ssl.KeyFile, a, "{redacted}")
|
|
||||||
a = strings.ReplaceAll(cfg.System.Sftp.Address, a, "{redacted}")
|
|
||||||
output.WriteString(a)
|
|
||||||
}
|
|
||||||
u, err := uploadToHastebin(diagnosticsArgs.HastebinURL, output.String())
|
u, err := uploadToHastebin(diagnosticsArgs.HastebinURL, output.String())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fmt.Println("Your report is available here: ", u)
|
fmt.Println("Your report is available here: ", u)
|
||||||
|
|||||||
@@ -31,8 +31,14 @@ func (b *Bus) Off(listener Listener, topics ...string) {
|
|||||||
b.listenersMx.Lock()
|
b.listenersMx.Lock()
|
||||||
defer b.listenersMx.Unlock()
|
defer b.listenersMx.Unlock()
|
||||||
|
|
||||||
|
var closed bool
|
||||||
|
|
||||||
for _, topic := range topics {
|
for _, topic := range topics {
|
||||||
b.off(topic, listener)
|
ok := b.off(topic, listener)
|
||||||
|
if !closed && ok {
|
||||||
|
close(listener)
|
||||||
|
closed = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,17 +104,13 @@ func (b *Bus) Publish(topic string, data interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
event := Event{Topic: topic, Data: data}
|
event := Event{Topic: topic, Data: data}
|
||||||
for _, listener := range listeners {
|
for _, listener := range listeners {
|
||||||
l := listener
|
l := listener
|
||||||
wg.Add(1)
|
|
||||||
go func(l Listener, event Event) {
|
go func(l Listener, event Event) {
|
||||||
defer wg.Done()
|
|
||||||
l <- event
|
l <- event
|
||||||
}(l, event)
|
}(l, event)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Destroy destroys the Event Bus by unregistering and closing all listeners.
|
// Destroy destroys the Event Bus by unregistering and closing all listeners.
|
||||||
@@ -116,11 +118,30 @@ func (b *Bus) Destroy() {
|
|||||||
b.listenersMx.Lock()
|
b.listenersMx.Lock()
|
||||||
defer b.listenersMx.Unlock()
|
defer b.listenersMx.Unlock()
|
||||||
|
|
||||||
|
// Track what listeners have already been closed. Because the same listener
|
||||||
|
// can be listening on multiple topics, we need a way to essentially
|
||||||
|
// "de-duplicate" all the listeners across all the topics.
|
||||||
|
var closed []Listener
|
||||||
|
|
||||||
for _, listeners := range b.listeners {
|
for _, listeners := range b.listeners {
|
||||||
for _, listener := range listeners {
|
for _, listener := range listeners {
|
||||||
|
if contains(closed, listener) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
close(listener)
|
close(listener)
|
||||||
|
closed = append(closed, listener)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
b.listeners = make(map[string][]Listener)
|
b.listeners = make(map[string][]Listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func contains(closed []Listener, listener Listener) bool {
|
||||||
|
for _, c := range closed {
|
||||||
|
if c == listener {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -36,8 +36,6 @@ func TestBus_Off(t *testing.T) {
|
|||||||
|
|
||||||
bus.Off(listener, topic)
|
bus.Off(listener, topic)
|
||||||
g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
|
g.Assert(len(bus.listeners[topic])).Equal(0, "Topic still has one or more listeners")
|
||||||
|
|
||||||
close(listener)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
g.It("unregisters correct listener", func() {
|
g.It("unregisters correct listener", func() {
|
||||||
@@ -62,9 +60,6 @@ func TestBus_Off(t *testing.T) {
|
|||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
bus.Off(listener2, topic)
|
bus.Off(listener2, topic)
|
||||||
close(listener)
|
|
||||||
close(listener2)
|
|
||||||
close(listener3)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -91,7 +86,6 @@ func TestBus_On(t *testing.T) {
|
|||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
bus.Off(listener, topic)
|
bus.Off(listener, topic)
|
||||||
close(listener)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -127,7 +121,6 @@ func TestBus_Publish(t *testing.T) {
|
|||||||
<-done
|
<-done
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
close(listener)
|
|
||||||
bus.Off(listener, topic)
|
bus.Off(listener, topic)
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -172,9 +165,61 @@ func TestBus_Publish(t *testing.T) {
|
|||||||
bus.Off(listener, topic)
|
bus.Off(listener, topic)
|
||||||
bus.Off(listener2, topic)
|
bus.Off(listener2, topic)
|
||||||
bus.Off(listener3, topic)
|
bus.Off(listener3, topic)
|
||||||
close(listener)
|
})
|
||||||
close(listener2)
|
})
|
||||||
close(listener3)
|
}
|
||||||
|
|
||||||
|
func TestBus_Destroy(t *testing.T) {
|
||||||
|
g := Goblin(t)
|
||||||
|
|
||||||
|
g.Describe("Destroy", func() {
|
||||||
|
g.It("unsubscribes and closes all listeners", func() {
|
||||||
|
bus := NewBus()
|
||||||
|
|
||||||
|
listener := make(chan Event)
|
||||||
|
bus.On(listener, "test")
|
||||||
|
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case m := <-listener:
|
||||||
|
g.Assert(m).IsZero()
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
g.Fail("listener did not receive message in time")
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
bus.Destroy()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
g.Assert(bus.listeners).Equal(map[string][]Listener{})
|
||||||
|
})
|
||||||
|
|
||||||
|
// This is a check that ensures Destroy only closes each listener
|
||||||
|
// channel once, even if it is subscribed to multiple topics.
|
||||||
|
//
|
||||||
|
// Closing a channel multiple times will cause a runtime panic, which
|
||||||
|
// I'm pretty sure we don't want.
|
||||||
|
g.It("unsubscribes and closes channel only once", func() {
|
||||||
|
bus := NewBus()
|
||||||
|
|
||||||
|
listener := make(chan Event)
|
||||||
|
bus.On(listener, "test", "test2", "test3", "test4", "test5")
|
||||||
|
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case m := <-listener:
|
||||||
|
g.Assert(m).IsZero()
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
g.Fail("listener did not receive message in time")
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
bus.Destroy()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
g.Assert(bus.listeners).Equal(map[string][]Listener{})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,12 +146,10 @@ func (h *Handler) listenForServerEvents(ctx context.Context) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// These functions will automatically close the channel if it hasn't been already.
|
||||||
h.server.Events().Off(eventChan, e...)
|
h.server.Events().Off(eventChan, e...)
|
||||||
h.server.InstallSink().Off(logOutput)
|
h.server.LogSink().Off(logOutput)
|
||||||
h.server.InstallSink().Off(installOutput)
|
h.server.InstallSink().Off(installOutput)
|
||||||
close(eventChan)
|
|
||||||
close(logOutput)
|
|
||||||
close(installOutput)
|
|
||||||
|
|
||||||
// If the internal context is stopped it is either because the parent context
|
// If the internal context is stopped it is either because the parent context
|
||||||
// got canceled or because we ran into an error. If the "err" variable is nil
|
// got canceled or because we ran into an error. If the "err" variable is nil
|
||||||
|
|||||||
@@ -267,11 +267,18 @@ func (h *Handler) setJwt(token *tokens.WebsocketPayload) {
|
|||||||
h.Unlock()
|
h.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var actions = map[server.PowerAction]string{
|
||||||
|
server.PowerActionStart: PermissionSendPowerStart,
|
||||||
|
server.PowerActionStop: PermissionSendPowerStop,
|
||||||
|
server.PowerActionRestart: PermissionSendPowerRestart,
|
||||||
|
server.PowerActionTerminate: PermissionSendPowerStop,
|
||||||
|
}
|
||||||
|
|
||||||
// HandleInbound handles an inbound socket request and route it to the proper action.
|
// HandleInbound handles an inbound socket request and route it to the proper action.
|
||||||
func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
|
func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
|
||||||
if m.Event != AuthenticationEvent {
|
if m.Event != AuthenticationEvent {
|
||||||
if err := h.TokenValid(); err != nil {
|
if err := h.TokenValid(); err != nil {
|
||||||
h.unsafeSendJson(Message{
|
_ = h.unsafeSendJson(Message{
|
||||||
Event: JwtErrorEvent,
|
Event: JwtErrorEvent,
|
||||||
Args: []string{err.Error()},
|
Args: []string{err.Error()},
|
||||||
})
|
})
|
||||||
@@ -339,12 +346,6 @@ func (h *Handler) HandleInbound(ctx context.Context, m Message) error {
|
|||||||
{
|
{
|
||||||
action := server.PowerAction(strings.Join(m.Args, ""))
|
action := server.PowerAction(strings.Join(m.Args, ""))
|
||||||
|
|
||||||
actions := make(map[server.PowerAction]string)
|
|
||||||
actions[server.PowerActionStart] = PermissionSendPowerStart
|
|
||||||
actions[server.PowerActionStop] = PermissionSendPowerStop
|
|
||||||
actions[server.PowerActionRestart] = PermissionSendPowerRestart
|
|
||||||
actions[server.PowerActionTerminate] = PermissionSendPowerStop
|
|
||||||
|
|
||||||
// Check that they have permission to perform this action if it is needed.
|
// Check that they have permission to perform this action if it is needed.
|
||||||
if permission, exists := actions[action]; exists {
|
if permission, exists := actions[action]; exists {
|
||||||
if !h.GetJwt().HasPermission(permission) {
|
if !h.GetJwt().HasPermission(permission) {
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// sinkPool represents a pool with sinks.
|
// sinkPool represents a pool with sinks.
|
||||||
@@ -16,14 +15,6 @@ func newSinkPool() *sinkPool {
|
|||||||
return &sinkPool{}
|
return &sinkPool{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// On adds a sink on the pool.
|
|
||||||
func (p *sinkPool) On(c chan []byte) {
|
|
||||||
p.mx.Lock()
|
|
||||||
defer p.mx.Unlock()
|
|
||||||
|
|
||||||
p.sinks = append(p.sinks, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Off removes a sink from the pool.
|
// Off removes a sink from the pool.
|
||||||
func (p *sinkPool) Off(c chan []byte) {
|
func (p *sinkPool) Off(c chan []byte) {
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
@@ -39,10 +30,19 @@ func (p *sinkPool) Off(c chan []byte) {
|
|||||||
sinks[len(sinks)-1] = nil
|
sinks[len(sinks)-1] = nil
|
||||||
sinks = sinks[:len(sinks)-1]
|
sinks = sinks[:len(sinks)-1]
|
||||||
p.sinks = sinks
|
p.sinks = sinks
|
||||||
|
close(c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// On adds a sink on the pool.
|
||||||
|
func (p *sinkPool) On(c chan []byte) {
|
||||||
|
p.mx.Lock()
|
||||||
|
defer p.mx.Unlock()
|
||||||
|
|
||||||
|
p.sinks = append(p.sinks, c)
|
||||||
|
}
|
||||||
|
|
||||||
// Destroy destroys the pool by removing and closing all sinks.
|
// Destroy destroys the pool by removing and closing all sinks.
|
||||||
func (p *sinkPool) Destroy() {
|
func (p *sinkPool) Destroy() {
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
@@ -59,12 +59,9 @@ func (p *sinkPool) Destroy() {
|
|||||||
func (p *sinkPool) Push(v []byte) {
|
func (p *sinkPool) Push(v []byte) {
|
||||||
p.mx.RLock()
|
p.mx.RLock()
|
||||||
for _, c := range p.sinks {
|
for _, c := range p.sinks {
|
||||||
// TODO: should this be done in parallel?
|
|
||||||
select {
|
select {
|
||||||
// Send the log output to the channel
|
// Send the log output to the channel
|
||||||
case c <- v:
|
case c <- v:
|
||||||
// Timeout after 100 milliseconds, this will cause the write to the channel to be cancelled.
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.mx.RUnlock()
|
p.mx.RUnlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user