Significant improvements to attaching/console handling

This commit is contained in:
Dane Everitt 2019-04-20 16:20:08 -07:00
parent ebe98aa860
commit 870adffc14
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
6 changed files with 181 additions and 23 deletions

21
server/console.go Normal file
View File

@ -0,0 +1,21 @@
package server
import "io"
type Console struct {
Server *Server
HandlerFunc *func(string)
}
var _ io.Writer = Console{}
func (c Console) Write(b []byte) (int, error) {
if c.HandlerFunc != nil {
l := make([]byte, len(b))
copy(l, b)
(*c.HandlerFunc)(string(l))
}
return len(b), nil
}

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"io"
"os" "os"
) )
@ -11,6 +10,10 @@ type Environment interface {
// Returns the name of the environment. // Returns the name of the environment.
Type() string Type() string
// Determines if the environment is currently active and running a server process
// for this specific server instance.
IsRunning() (bool, error)
// Starts a server instance. If the server instance is not in a state where it // Starts a server instance. If the server instance is not in a state where it
// can be started an error should be returned. // can be started an error should be returned.
Start() error Start() error
@ -22,7 +25,7 @@ type Environment interface {
// Determines if the server instance exists. For example, in a docker environment // Determines if the server instance exists. For example, in a docker environment
// this should confirm that the container is created and in a bootable state. In // this should confirm that the container is created and in a bootable state. In
// a basic CLI environment this can probably just return true right away. // a basic CLI environment this can probably just return true right away.
Exists() bool Exists() (bool, error)
// Terminates a running server instance using the provided signal. If the server // Terminates a running server instance using the provided signal. If the server
// is not running no error should be returned. // is not running no error should be returned.
@ -34,8 +37,16 @@ type Environment interface {
Create() error Create() error
// Attaches to the server console environment and allows piping the output to a // Attaches to the server console environment and allows piping the output to a
// websocket or other internal tool to monitor output. // websocket or other internal tool to monitor output. Also allows you to later
Attach() (io.ReadCloser, error) // send data into the environment's stdin.
Attach() error
// Follows the output from the server console and will begin piping the output to
// the server's emitter.
FollowConsoleOutput() error
// Sends the provided command to the running server instance.
SendCommand(string) error
// Reads the log file for the process from the end backwards until the provided // Reads the log file for the process from the end backwards until the provided
// number of bytes is met. // number of bytes is met.

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/go-connections/nat" "github.com/docker/go-connections/nat"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/net/context" "golang.org/x/net/context"
"io" "io"
"os" "os"
@ -33,6 +34,15 @@ type DockerEnvironment struct {
// The Docker client being used for this instance. // The Docker client being used for this instance.
Client *client.Client Client *client.Client
// Tracks if we are currently attached to the server container. This allows us to attach
// once and then just use that attachment to stream logs out of the server and also stream
// commands back into it without constantly attaching and detaching.
attached bool
// Controls the hijacked response stream which exists only when we're attached to
// the running container instance.
stream types.HijackedResponse
} }
// Creates a new base Docker environment. A server must still be attached to it. // Creates a new base Docker environment. A server must still be attached to it.
@ -64,10 +74,39 @@ func (d *DockerEnvironment) Type() string {
} }
// Determines if the container exists in this environment. // Determines if the container exists in this environment.
func (d *DockerEnvironment) Exists() bool { func (d *DockerEnvironment) Exists() (bool, error) {
_, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid) _, err := d.Client.ContainerInspect(context.Background(), d.Server.Uuid)
return err == nil if err != nil {
// If this error is because the container instance wasn't found via Docker we
// can safely ignore the error and just return false.
if client.IsErrNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
// Determines if the server's docker container is currently running. If there is no container
// present, an error will be raised (since this shouldn't be a case that ever happens under
// correctly developed circumstances).
//
// You can confirm if the instance wasn't found by using client.IsErrNotFound from the Docker
// API.
//
// @see docker/client/errors.go
func (d *DockerEnvironment) IsRunning() (bool, error) {
ctx := context.Background()
c, err := d.Client.ContainerInspect(ctx, d.Server.Uuid)
if err != nil {
return false, err
}
return c.State.Running, nil
} }
// Checks if there is a container that already exists for the server. If so that // Checks if there is a container that already exists for the server. If so that
@ -81,12 +120,21 @@ func (d *DockerEnvironment) Start() error {
// No reason to try starting a container that is already running. // No reason to try starting a container that is already running.
if c.State.Running { if c.State.Running {
if !d.attached {
return d.Attach()
}
return nil return nil
} }
opts := types.ContainerStartOptions{} opts := types.ContainerStartOptions{}
return d.Client.ContainerStart(context.Background(), d.Server.Uuid, opts) if err := d.Client.ContainerStart(context.Background(), d.Server.Uuid, opts); err != nil {
return err
}
d.FollowConsoleOutput()
return d.Attach()
} }
// Stops the container that the server is running in. This will allow up to 10 // Stops the container that the server is running in. This will allow up to 10
@ -113,17 +161,57 @@ func (d *DockerEnvironment) Terminate(signal os.Signal) error {
return d.Client.ContainerKill(ctx, d.Server.Uuid, signal.String()) return d.Client.ContainerKill(ctx, d.Server.Uuid, signal.String())
} }
// Contrary to the name, this doesn't actually attach to the Docker container itself, // Attaches to the docker container itself and ensures that we can pipe data in and out
// but rather attaches to the log for the container and then pipes that output to // of the process stream. This should not be used for reading console data as you *will*
// a websocket. // miss important output at the beginning because of the time delay with attaching to the
// // output.
// This avoids us missing cruicial output that happens in the split seconds before the func (d *DockerEnvironment) Attach() error {
// code moves from 'Starting' to 'Attaching' on the process. if d.attached {
// return nil
// @todo add throttle code }
func (d *DockerEnvironment) Attach() (io.ReadCloser, error) {
if !d.Exists() { ctx := context.Background()
return nil, errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid))
var err error
d.stream, err = d.Client.ContainerAttach(ctx, d.Server.Uuid, types.ContainerAttachOptions{
Stdin: true,
Stdout: true,
Stderr: true,
Stream: true,
})
if err != nil {
return err
}
console := Console{
Server: d.Server,
}
d.attached = true
go func() {
defer d.stream.Close()
defer func() {
d.attached = false
}()
io.Copy(console, d.stream.Reader)
}()
return nil
}
// Attaches to the log for the container. This avoids us missing cruicial output that
// happens in the split seconds before the code moves from 'Starting' to 'Attaching'
// on the process.
func (d *DockerEnvironment) FollowConsoleOutput() error {
if exists, err := d.Exists(); !exists {
if err != nil {
return err
}
return errors.New(fmt.Sprintf("no such container: %s", d.Server.Uuid))
} }
ctx := context.Background() ctx := context.Background()
@ -133,9 +221,22 @@ func (d *DockerEnvironment) Attach() (io.ReadCloser, error) {
Follow: true, Follow: true,
} }
r, err := d.Client.ContainerLogs(ctx, d.Server.Uuid, opts) reader, err := d.Client.ContainerLogs(ctx, d.Server.Uuid, opts)
return r, err go func(r io.ReadCloser) {
defer r.Close()
s := bufio.NewScanner(r)
for s.Scan() {
fmt.Println(s.Text())
}
if err := s.Err(); err != nil {
zap.S().Errorw("error in scanner", zap.Error(err))
}
}(reader)
return err
} }
// Creates a new container for the server using all of the data that is currently // Creates a new container for the server using all of the data that is currently
@ -154,6 +255,8 @@ func (d *DockerEnvironment) Create() error {
// container anyways. // container anyways.
if _, err := cli.ContainerInspect(ctx, d.Server.Uuid); err == nil { if _, err := cli.ContainerInspect(ctx, d.Server.Uuid); err == nil {
return nil return nil
} else if !client.IsErrNotFound(err) {
return err
} }
conf := &container.Config{ conf := &container.Config{
@ -250,6 +353,18 @@ func (d *DockerEnvironment) Create() error {
return nil return nil
} }
// Sends the specified command to the stdin of the running container instance. There is no
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
func (d *DockerEnvironment) SendCommand(c string) error {
if !d.attached {
return errors.New("attempting to send command to non-attached instance")
}
_, err := d.stream.Conn.Write([]byte(c + "\n"))
return err
}
// Reads the log file for the server. This does not care if the server is running or not, it will // Reads the log file for the server. This does not care if the server is running or not, it will
// simply try to read the last X bytes of the file and return them. // simply try to read the last X bytes of the file and return them.
func (d *DockerEnvironment) Readlog(len int64) ([]string, error) { func (d *DockerEnvironment) Readlog(len int64) ([]string, error) {
@ -392,4 +507,4 @@ func (d *DockerEnvironment) exposedPorts() nat.PortSet {
} }
return out return out
} }

View File

@ -252,7 +252,9 @@ func (s *Server) SetState(state ProcessState) error {
// Determine if the server is bootable in it's current state or not. This will not // Determine if the server is bootable in it's current state or not. This will not
// indicate why a server is not bootable, only if it is. // indicate why a server is not bootable, only if it is.
func (s *Server) IsBootable() bool { func (s *Server) IsBootable() bool {
return s.environment.Exists() exists, _ := s.environment.Exists()
return exists
} }
// Initalizes a server instance. This will run through and ensure that the environment // Initalizes a server instance. This will run through and ensure that the environment

View File

@ -110,7 +110,7 @@ func (wsm *WebsocketMessage) HandleInbound(c *websocket.Conn) error {
} }
case "send command": case "send command":
{ {
return nil return wsm.server.Environment().SendCommand(strings.Join(wsm.Args, ""))
} }
} }

View File

@ -67,6 +67,15 @@ func main() {
if err := s.CreateEnvironment(); err != nil { if err := s.CreateEnvironment(); err != nil {
zap.S().Errorw("failed to create an environment for server", zap.String("server", s.Uuid), zap.Error(err)) zap.S().Errorw("failed to create an environment for server", zap.String("server", s.Uuid), zap.Error(err))
} }
if r, err := s.Environment().IsRunning(); err != nil {
zap.S().Errorw("error checking server environment status", zap.String("server", s.Uuid), zap.Error(err))
} else if r {
zap.S().Infow("detected server is running, re-attaching to process", zap.String("server", s.Uuid))
if err := s.Environment().Attach(); err != nil {
zap.S().Errorw("error attaching to server environment", zap.String("server", s.Uuid), zap.Error(err))
}
}
} }
r := &Router{ r := &Router{