Improve performance of console output watcher; work directly with bytes rather than string conversions

This commit is contained in:
Dane Everitt 2022-01-30 11:28:06 -05:00
parent fab88a380e
commit 11ae5e69ed
3 changed files with 46 additions and 29 deletions

View File

@ -1,6 +1,7 @@
package remote package remote
import ( import (
"bytes"
"regexp" "regexp"
"strings" "strings"
@ -85,37 +86,38 @@ type SftpAuthResponse struct {
type OutputLineMatcher struct { type OutputLineMatcher struct {
// The raw string to match against. This may or may not be prefixed with // The raw string to match against. This may or may not be prefixed with
// regex: which indicates we want to match against the regex expression. // regex: which indicates we want to match against the regex expression.
raw string raw []byte
reg *regexp.Regexp reg *regexp.Regexp
} }
// Matches determines if a given string "s" matches the given line. // Matches determines if the provided byte string matches the given regex or
func (olm *OutputLineMatcher) Matches(s string) bool { // raw string provided to the matcher.
func (olm *OutputLineMatcher) Matches(s []byte) bool {
if olm.reg == nil { if olm.reg == nil {
return strings.Contains(s, olm.raw) return bytes.Contains(s, olm.raw)
} }
return olm.reg.Match(s)
return olm.reg.MatchString(s)
} }
// String returns the matcher's raw comparison string. // String returns the matcher's raw comparison string.
func (olm *OutputLineMatcher) String() string { func (olm *OutputLineMatcher) String() string {
return olm.raw return string(olm.raw)
} }
// UnmarshalJSON unmarshals the startup lines into individual structs for easier // UnmarshalJSON unmarshals the startup lines into individual structs for easier
// matching abilities. // matching abilities.
func (olm *OutputLineMatcher) UnmarshalJSON(data []byte) error { func (olm *OutputLineMatcher) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &olm.raw); err != nil { var r string
if err := json.Unmarshal(data, &r); err != nil {
return err return err
} }
if strings.HasPrefix(olm.raw, "regex:") && len(olm.raw) > 6 { olm.raw = []byte(r)
r, err := regexp.Compile(strings.TrimPrefix(olm.raw, "regex:")) if bytes.HasPrefix(olm.raw, []byte("regex:")) && len(olm.raw) > 6 {
r, err := regexp.Compile(strings.TrimPrefix(string(olm.raw), "regex:"))
if err != nil { if err != nil {
log.WithField("error", err).WithField("raw", olm.raw).Warn("failed to compile output line marked as being regex") log.WithField("error", err).WithField("raw", string(olm.raw)).Warn("failed to compile output line marked as being regex")
} }
olm.reg = r olm.reg = r
} }

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"bytes"
"regexp" "regexp"
"strconv" "strconv"
"sync" "sync"
@ -50,6 +51,11 @@ func (dsl *diskSpaceLimiter) Trigger() {
}) })
} }
// processConsoleOutputEvent handles output from a server's Docker container
// and runs through different limiting logic to ensure that spam console output
// does not cause negative effects to the system. This will also monitor the
// output lines to determine if the server is started yet, and if the output is
// not being throttled, will send the data over to the websocket.
func (s *Server) processConsoleOutputEvent(v []byte) { func (s *Server) processConsoleOutputEvent(v []byte) {
t := s.Throttler() t := s.Throttler()
err := t.Increment(func() { err := t.Increment(func() {
@ -81,13 +87,15 @@ func (s *Server) processConsoleOutputEvent(v []byte) {
} }
} }
// Always process the console output, but do this in a seperate thread since we
// don't really care about side-effects from this call, and don't want it to block
// the console sending logic.
go s.onConsoleOutput(v)
// If we are not throttled, go ahead and output the data. // If we are not throttled, go ahead and output the data.
if !t.Throttled() { if !t.Throttled() {
s.Sink(LogSink).Push(v) s.Sink(LogSink).Push(v)
} }
// Also pass the data along to the console output channel.
s.onConsoleOutput(string(v))
} }
// StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be // StartEventListeners adds all the internal event listeners we want to use for a server. These listeners can only be
@ -153,27 +161,34 @@ var stripAnsiRegex = regexp.MustCompile("[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-z
// Custom listener for console output events that will check if the given line // Custom listener for console output events that will check if the given line
// of output matches one that should mark the server as started or not. // of output matches one that should mark the server as started or not.
func (s *Server) onConsoleOutput(data string) { func (s *Server) onConsoleOutput(data []byte) {
// Get the server's process configuration. if s.Environment.State() != environment.ProcessStartingState && !s.IsRunning() {
return
}
processConfiguration := s.ProcessConfiguration() processConfiguration := s.ProcessConfiguration()
// Make a copy of the data provided since it is by reference, otherwise you'll
// potentially introduce a race condition by modifying the value.
v := make([]byte, len(data))
copy(v, data)
// Check if the server is currently starting. // Check if the server is currently starting.
if s.Environment.State() == environment.ProcessStartingState { if s.Environment.State() == environment.ProcessStartingState {
// Check if we should strip ansi color codes. // Check if we should strip ansi color codes.
if processConfiguration.Startup.StripAnsi { if processConfiguration.Startup.StripAnsi {
// Strip ansi color codes from the data string. v = stripAnsiRegex.ReplaceAll(v, []byte(""))
data = stripAnsiRegex.ReplaceAllString(data, "")
} }
// Iterate over all the done lines. // Iterate over all the done lines.
for _, l := range processConfiguration.Startup.Done { for _, l := range processConfiguration.Startup.Done {
if !l.Matches(data) { if !l.Matches(v) {
continue continue
} }
s.Log().WithFields(log.Fields{ s.Log().WithFields(log.Fields{
"match": l.String(), "match": l.String(),
"against": strconv.QuoteToASCII(data), "against": strconv.QuoteToASCII(string(v)),
}).Debug("detected server in running state based on console line output") }).Debug("detected server in running state based on console line output")
// If the specific line of output is one that would mark the server as started, // If the specific line of output is one that would mark the server as started,
@ -190,7 +205,7 @@ func (s *Server) onConsoleOutput(data string) {
if s.IsRunning() { if s.IsRunning() {
stop := processConfiguration.Stop stop := processConfiguration.Stop
if stop.Type == remote.ProcessStopCommand && data == stop.Value { if stop.Type == remote.ProcessStopCommand && bytes.Equal(v, []byte(stop.Value)) {
s.Environment.SetState(environment.ProcessOfflineState) s.Environment.SetState(environment.ProcessOfflineState)
} }
} }

View File

@ -66,10 +66,10 @@ func TestSink(t *testing.T) {
g.It("removes a channel and maintains the order", func() { g.It("removes a channel and maintains the order", func() {
channels := make([]chan []byte, 8) channels := make([]chan []byte, 8)
for i := 0; i < len(channels); i++ { for i := 0; i < len(channels); i++ {
channels[i] = make(chan []byte, 1) channels[i] = make(chan []byte, 1)
pool.On(channels[i]) pool.On(channels[i])
} }
g.Assert(len(pool.sinks)).Equal(8) g.Assert(len(pool.sinks)).Equal(8)
@ -83,10 +83,10 @@ func TestSink(t *testing.T) {
g.It("does not panic if a nil channel is provided", func() { g.It("does not panic if a nil channel is provided", func() {
ch := make([]chan []byte, 1) ch := make([]chan []byte, 1)
defer func () { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
g.Fail("removing a nil channel should not cause a panic") g.Fail("removing a nil channel should not cause a panic")
} }
}() }()
pool.On(ch[0]) pool.On(ch[0])
@ -233,4 +233,4 @@ func TestSink(t *testing.T) {
g.Assert(MutexLocked(&pool.mu)).IsFalse() g.Assert(MutexLocked(&pool.mu)).IsFalse()
}) })
}) })
} }