From 870adffc1482872f7583508ada46cffa4296c3bc Mon Sep 17 00:00:00 2001 From: Dane Everitt Date: Sat, 20 Apr 2019 16:20:08 -0700 Subject: [PATCH] Significant improvements to attaching/console handling --- server/console.go | 21 +++++ server/environment.go | 19 ++++- server/environment_docker.go | 149 +++++++++++++++++++++++++++++++---- server/server.go | 4 +- websocket.go | 2 +- wings.go | 9 +++ 6 files changed, 181 insertions(+), 23 deletions(-) create mode 100644 server/console.go diff --git a/server/console.go b/server/console.go new file mode 100644 index 0000000..1832dc0 --- /dev/null +++ b/server/console.go @@ -0,0 +1,21 @@ +package server + +import "io" + +type Console struct { + Server *Server + HandlerFunc *func(string) +} + +var _ io.Writer = Console{} + +func (c Console) Write(b []byte) (int, error) { + if c.HandlerFunc != nil { + l := make([]byte, len(b)) + copy(l, b) + + (*c.HandlerFunc)(string(l)) + } + + return len(b), nil +} \ No newline at end of file diff --git a/server/environment.go b/server/environment.go index c0fde11..1e66a05 100644 --- a/server/environment.go +++ b/server/environment.go @@ -1,7 +1,6 @@ package server import ( - "io" "os" ) @@ -11,6 +10,10 @@ type Environment interface { // Returns the name of the environment. Type() string + // Determines if the environment is currently active and running a server process + // for this specific server instance. + IsRunning() (bool, error) + // Starts a server instance. If the server instance is not in a state where it // can be started an error should be returned. Start() error @@ -22,7 +25,7 @@ type Environment interface { // Determines if the server instance exists. For example, in a docker environment // this should confirm that the container is created and in a bootable state. In // a basic CLI environment this can probably just return true right away. - Exists() bool + Exists() (bool, error) // Terminates a running server instance using the provided signal. If the server // is not running no error should be returned. @@ -34,8 +37,16 @@ type Environment interface { Create() error // Attaches to the server console environment and allows piping the output to a - // websocket or other internal tool to monitor output. - Attach() (io.ReadCloser, error) + // websocket or other internal tool to monitor output. Also allows you to later + // send data into the environment's stdin. + Attach() error + + // Follows the output from the server console and will begin piping the output to + // the server's emitter. + FollowConsoleOutput() error + + // Sends the provided command to the running server instance. + SendCommand(string) error // Reads the log file for the process from the end backwards until the provided // number of bytes is met. diff --git a/server/environment_docker.go b/server/environment_docker.go index 1b08310..e498684 100644 --- a/server/environment_docker.go +++ b/server/environment_docker.go @@ -12,6 +12,7 @@ import ( "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/go-connections/nat" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/net/context" "io" "os" @@ -33,6 +34,15 @@ type DockerEnvironment struct { // The Docker client being used for this instance. Client *client.Client + + // Tracks if we are currently attached to the server container. This allows us to attach + // once and then just use that attachment to stream logs out of the server and also stream + // commands back into it without constantly attaching and detaching. + attached bool + + // Controls the hijacked response stream which exists only when we're attached to + // the running container instance. + stream types.HijackedResponse } // Creates a new base Docker environment. A server must still be attached to it. @@ -64,10 +74,39 @@ func (d *DockerEnvironment) Type() string { } // Determines if the container exists in this environment. -func (d *DockerEnvironment) Exists() bool { +func (d *DockerEnvironment) Exists() (bool, error) { _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) - return err == nil + if err != nil { + // If this error is because the container instance wasn't found via Docker we + // can safely ignore the error and just return false. + if client.IsErrNotFound(err) { + return false, nil + } + + return false, err + } + + return true, nil +} + +// Determines if the server's docker container is currently running. If there is no container +// present, an error will be raised (since this shouldn't be a case that ever happens under +// correctly developed circumstances). +// +// You can confirm if the instance wasn't found by using client.IsErrNotFound from the Docker +// API. +// +// @see docker/client/errors.go +func (d *DockerEnvironment) IsRunning() (bool, error) { + ctx := context.Background() + + c, err := d.Client.ContainerInspect(ctx, d.Server.Uuid) + if err != nil { + return false, err + } + + return c.State.Running, nil } // Checks if there is a container that already exists for the server. If so that @@ -81,12 +120,21 @@ func (d *DockerEnvironment) Start() error { // No reason to try starting a container that is already running. if c.State.Running { + if !d.attached { + return d.Attach() + } + return nil } opts := types.ContainerStartOptions{} - return d.Client.ContainerStart(context.Background(), d.Server.Uuid, opts) + if err := d.Client.ContainerStart(context.Background(), d.Server.Uuid, opts); err != nil { + return err + } + + d.FollowConsoleOutput() + return d.Attach() } // Stops the container that the server is running in. This will allow up to 10 @@ -113,17 +161,57 @@ func (d *DockerEnvironment) Terminate(signal os.Signal) error { return d.Client.ContainerKill(ctx, d.Server.Uuid, signal.String()) } -// Contrary to the name, this doesn't actually attach to the Docker container itself, -// but rather attaches to the log for the container and then pipes that output to -// a websocket. -// -// This avoids us missing cruicial output that happens in the split seconds before the -// code moves from 'Starting' to 'Attaching' on the process. -// -// @todo add throttle code -func (d *DockerEnvironment) Attach() (io.ReadCloser, error) { - if !d.Exists() { - return nil, errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid)) +// Attaches to the docker container itself and ensures that we can pipe data in and out +// of the process stream. This should not be used for reading console data as you *will* +// miss important output at the beginning because of the time delay with attaching to the +// output. +func (d *DockerEnvironment) Attach() error { + if d.attached { + return nil + } + + ctx := context.Background() + + var err error + d.stream, err = d.Client.ContainerAttach(ctx, d.Server.Uuid, types.ContainerAttachOptions{ + Stdin: true, + Stdout: true, + Stderr: true, + Stream: true, + }) + + if err != nil { + return err + } + + console := Console{ + Server: d.Server, + } + + d.attached = true + + go func() { + defer d.stream.Close() + defer func() { + d.attached = false + }() + + io.Copy(console, d.stream.Reader) + }() + + return nil +} + +// Attaches to the log for the container. This avoids us missing cruicial output that +// happens in the split seconds before the code moves from 'Starting' to 'Attaching' +// on the process. +func (d *DockerEnvironment) FollowConsoleOutput() error { + if exists, err := d.Exists(); !exists { + if err != nil { + return err + } + + return errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid)) } ctx := context.Background() @@ -133,9 +221,22 @@ func (d *DockerEnvironment) Attach() (io.ReadCloser, error) { Follow: true, } - r, err := d.Client.ContainerLogs(ctx, d.Server.Uuid, opts) + reader, err := d.Client.ContainerLogs(ctx, d.Server.Uuid, opts) - return r, err + go func(r io.ReadCloser) { + defer r.Close() + + s := bufio.NewScanner(r) + for s.Scan() { + fmt.Println(s.Text()) + } + + if err := s.Err(); err != nil { + zap.S().Errorw("error in scanner", zap.Error(err)) + } + }(reader) + + return err } // Creates a new container for the server using all of the data that is currently @@ -154,6 +255,8 @@ func (d *DockerEnvironment) Create() error { // container anyways. if _, err := cli.ContainerInspect(ctx, d.Server.Uuid); err == nil { return nil + } else if !client.IsErrNotFound(err) { + return err } conf := &container.Config{ @@ -250,6 +353,18 @@ func (d *DockerEnvironment) Create() error { return nil } +// Sends the specified command to the stdin of the running container instance. There is no +// confirmation that this data is sent successfully, only that it gets pushed into the stdin. +func (d *DockerEnvironment) SendCommand(c string) error { + if !d.attached { + return errors.New("attempting to send command to non-attached instance") + } + + _, err := d.stream.Conn.Write([]byte(c + "\n")) + + return err +} + // Reads the log file for the server. This does not care if the server is running or not, it will // simply try to read the last X bytes of the file and return them. func (d *DockerEnvironment) Readlog(len int64) ([]string, error) { @@ -392,4 +507,4 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet { } return out -} +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index f610b7c..b54ca5e 100644 --- a/server/server.go +++ b/server/server.go @@ -252,7 +252,9 @@ func (s *Server) SetState(state ProcessState) error { // Determine if the server is bootable in it's current state or not. This will not // indicate why a server is not bootable, only if it is. func (s *Server) IsBootable() bool { - return s.environment.Exists() + exists, _ := s.environment.Exists() + + return exists } // Initalizes a server instance. This will run through and ensure that the environment diff --git a/websocket.go b/websocket.go index 3a7ecae..cd139e5 100644 --- a/websocket.go +++ b/websocket.go @@ -110,7 +110,7 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error { } case "send command": { - return nil + return wsm.server.Environment().SendCommand(strings.Join(wsm.Args, "")) } } diff --git a/wings.go b/wings.go index 6824cb1..900d6bd 100644 --- a/wings.go +++ b/wings.go @@ -67,6 +67,15 @@ func main() { if err := s.CreateEnvironment(); err != nil { zap.S().Errorw("failed to create an environment for server", zap.String("server", s.Uuid), zap.Error(err)) } + + if r, err := s.Environment().IsRunning(); err != nil { + zap.S().Errorw("error checking server environment status", zap.String("server", s.Uuid), zap.Error(err)) + } else if r { + zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid)) + if err := s.Environment().Attach(); err != nil { + zap.S().Errorw("error attaching to server environment", zap.String("server", s.Uuid), zap.Error(err)) + } + } } r := &Router{