From 11ae5e69ed84c17c5658c5dd24d8e2104dbdce2f Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sun, 30 Jan 2022 11:28:06 -0500 Subject: [PATCH] Improve performance of console output watcher; work directly with bytes rather than string conversions --- remote/types.go | 26 ++++++++++++++------------ server/listeners.go | 35 +++++++++++++++++++++++++---------- server/sink_test.go | 14 +++++++------- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/remote/types.go b/remote/types.go index 37ef462..f37baf0 100644 --- a/remote/types.go +++ b/remote/types.go @@ -1,6 +1,7 @@ package remote import ( + "bytes" "regexp" "strings" @@ -85,37 +86,38 @@ type SftpAuthResponse struct { type OutputLineMatcher struct { // The raw string to match against. This may or may not be prefixed with // regex: which indicates we want to match against the regex expression. - raw string + raw []byte reg *regexp.Regexp } -// Matches determines if a given string "s" matches the given line. -func (olm *OutputLineMatcher) Matches(s string) bool { +// Matches determines if the provided byte string matches the given regex or +// raw string provided to the matcher. +func (olm *OutputLineMatcher) Matches(s []byte) bool { if olm.reg == nil { - return strings.Contains(s, olm.raw) + return bytes.Contains(s, olm.raw) } - - return olm.reg.MatchString(s) + return olm.reg.Match(s) } // String returns the matcher's raw comparison string. func (olm *OutputLineMatcher) String() string { - return olm.raw + return string(olm.raw) } // UnmarshalJSON unmarshals the startup lines into individual structs for easier // matching abilities. func (olm *OutputLineMatcher) UnmarshalJSON(data []byte) error { - if err := json.Unmarshal(data, &olm.raw); err != nil { + var r string + if err := json.Unmarshal(data, &r); err != nil { return err } - if strings.HasPrefix(olm.raw, "regex:") && len(olm.raw) > 6 { - r, err := regexp.Compile(strings.TrimPrefix(olm.raw, "regex:")) + olm.raw = []byte(r) + if bytes.HasPrefix(olm.raw, []byte("regex:")) && len(olm.raw) > 6 { + r, err := regexp.Compile(strings.TrimPrefix(string(olm.raw), "regex:")) if err != nil { - log.WithField("error", err).WithField("raw", olm.raw).Warn("failed to compile output line marked as being regex") + log.WithField("error", err).WithField("raw", string(olm.raw)).Warn("failed to compile output line marked as being regex") } - olm.reg = r } diff --git a/server/listeners.go b/server/listeners.go index ea36373..6cac576 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "regexp" "strconv" "sync" @@ -50,6 +51,11 @@ func (dsl *diskSpaceLimiter) Trigger() { }) } +// processConsoleOutputEvent handles output from a server's Docker container +// and runs through different limiting logic to ensure that spam console output +// does not cause negative effects to the system. This will also monitor the +// output lines to determine if the server is started yet, and if the output is +// not being throttled, will send the data over to the websocket. func (s *Server) processConsoleOutputEvent(v []byte) { t := s.Throttler() err := t.Increment(func() { @@ -81,13 +87,15 @@ func (s *Server) processConsoleOutputEvent(v []byte) { } } + // Always process the console output, but do this in a seperate thread since we + // don't really care about side-effects from this call, and don't want it to block + // the console sending logic. + go s.onConsoleOutput(v) + // If we are not throttled, go ahead and output the data. if !t.Throttled() { s.Sink(LogSink).Push(v) } - - // Also pass the data along to the console output channel. - s.onConsoleOutput(string(v)) } // StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be @@ -153,27 +161,34 @@ var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-z // Custom listener for console output events that will check if the given line // of output matches one that should mark the server as started or not. -func (s *Server) onConsoleOutput(data string) { - // Get the server's process configuration. +func (s *Server) onConsoleOutput(data []byte) { + if s.Environment.State() != environment.ProcessStartingState && !s.IsRunning() { + return + } + processConfiguration := s.ProcessConfiguration() + // Make a copy of the data provided since it is by reference, otherwise you'll + // potentially introduce a race condition by modifying the value. + v := make([]byte, len(data)) + copy(v, data) + // Check if the server is currently starting. if s.Environment.State() == environment.ProcessStartingState { // Check if we should strip ansi color codes. if processConfiguration.Startup.StripAnsi { - // Strip ansi color codes from the data string. - data = stripAnsiRegex.ReplaceAllString(data, "") + v = stripAnsiRegex.ReplaceAll(v, []byte("")) } // Iterate over all the done lines. for _, l := range processConfiguration.Startup.Done { - if !l.Matches(data) { + if !l.Matches(v) { continue } s.Log().WithFields(log.Fields{ "match": l.String(), - "against": strconv.QuoteToASCII(data), + "against": strconv.QuoteToASCII(string(v)), }).Debug("detected server in running state based on console line output") // If the specific line of output is one that would mark the server as started, @@ -190,7 +205,7 @@ func (s *Server) onConsoleOutput(data string) { if s.IsRunning() { stop := processConfiguration.Stop - if stop.Type == remote.ProcessStopCommand && data == stop.Value { + if stop.Type == remote.ProcessStopCommand && bytes.Equal(v, []byte(stop.Value)) { s.Environment.SetState(environment.ProcessOfflineState) } } diff --git a/server/sink_test.go b/server/sink_test.go index 0d6c495..d309244 100644 --- a/server/sink_test.go +++ b/server/sink_test.go @@ -66,10 +66,10 @@ func TestSink(t *testing.T) { g.It("removes a channel and maintains the order", func() { channels := make([]chan []byte, 8) - for i := 0; i < len(channels); i++ { + for i := 0; i < len(channels); i++ { channels[i] = make(chan []byte, 1) - pool.On(channels[i]) - } + pool.On(channels[i]) + } g.Assert(len(pool.sinks)).Equal(8) @@ -83,10 +83,10 @@ func TestSink(t *testing.T) { g.It("does not panic if a nil channel is provided", func() { ch := make([]chan []byte, 1) - defer func () { + defer func() { if r := recover(); r != nil { - g.Fail("removing a nil channel should not cause a panic") - } + g.Fail("removing a nil channel should not cause a panic") + } }() pool.On(ch[0]) @@ -233,4 +233,4 @@ func TestSink(t *testing.T) { g.Assert(MutexLocked(&pool.mu)).IsFalse() }) }) -} \ No newline at end of file +}