Avoid race conditions due to stream not being completely detached correctly
This commit is contained in:
parent
78c5fd219a
commit
834bcf251e
|
@ -34,14 +34,9 @@ type DockerEnvironment struct {
|
||||||
// The Docker client being used for this instance.
|
// The Docker client being used for this instance.
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
||||||
// Tracks if we are currently attached to the server container. This allows us to attach
|
|
||||||
// once and then just use that attachment to stream logs out of the server and also stream
|
|
||||||
// commands back into it without constantly attaching and detaching.
|
|
||||||
attached bool
|
|
||||||
|
|
||||||
// Controls the hijacked response stream which exists only when we're attached to
|
// Controls the hijacked response stream which exists only when we're attached to
|
||||||
// the running container instance.
|
// the running container instance.
|
||||||
stream types.HijackedResponse
|
stream *types.HijackedResponse
|
||||||
|
|
||||||
// Holds the stats stream used by the polling commands so that we can easily close
|
// Holds the stats stream used by the polling commands so that we can easily close
|
||||||
// it out.
|
// it out.
|
||||||
|
@ -49,9 +44,9 @@ type DockerEnvironment struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set if this process is currently attached to the process.
|
// Set if this process is currently attached to the process.
|
||||||
func (d *DockerEnvironment) SetAttached(a bool) {
|
func (d *DockerEnvironment) SetStream(s *types.HijackedResponse) {
|
||||||
d.Lock()
|
d.Lock()
|
||||||
d.attached = a
|
d.stream = s
|
||||||
d.Unlock()
|
d.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +55,7 @@ func (d *DockerEnvironment) IsAttached() bool {
|
||||||
d.RLock()
|
d.RLock()
|
||||||
defer d.RUnlock()
|
defer d.RUnlock()
|
||||||
|
|
||||||
return d.attached
|
return d.stream != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new base Docker environment. A server must still be attached to it.
|
// Creates a new base Docker environment. A server must still be attached to it.
|
||||||
|
@ -308,7 +303,7 @@ func (d *DockerEnvironment) Stop() error {
|
||||||
// If the container does not exist just mark the process as stopped and return without
|
// If the container does not exist just mark the process as stopped and return without
|
||||||
// an error.
|
// an error.
|
||||||
if client.IsErrNotFound(err) {
|
if client.IsErrNotFound(err) {
|
||||||
d.SetAttached(false)
|
d.SetStream(nil)
|
||||||
d.Server.SetState(ProcessOfflineState)
|
d.Server.SetState(ProcessOfflineState)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -449,25 +444,24 @@ func (d *DockerEnvironment) Attach() error {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
opts := types.ContainerAttachOptions{
|
||||||
d.Lock()
|
|
||||||
d.stream, err = d.Client.ContainerAttach(context.Background(), d.Server.Id(), types.ContainerAttachOptions{
|
|
||||||
Stdin: true,
|
Stdin: true,
|
||||||
Stdout: true,
|
Stdout: true,
|
||||||
Stderr: true,
|
Stderr: true,
|
||||||
Stream: true,
|
Stream: true,
|
||||||
})
|
}
|
||||||
d.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
// Set the stream again with the container.
|
||||||
|
if st, err := d.Client.ContainerAttach(context.Background(), d.Server.Id(), opts); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
|
} else {
|
||||||
|
d.SetStream(&st)
|
||||||
}
|
}
|
||||||
|
|
||||||
console := Console{
|
console := Console{
|
||||||
Server: d.Server,
|
Server: d.Server,
|
||||||
}
|
}
|
||||||
|
|
||||||
d.SetAttached(true)
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := d.EnableResourcePolling(); err != nil {
|
if err := d.EnableResourcePolling(); err != nil {
|
||||||
d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server")
|
d.Server.Log().WithField("error", errors.WithStack(err)).Warn("failed to enable resource polling on server")
|
||||||
|
@ -478,7 +472,7 @@ func (d *DockerEnvironment) Attach() error {
|
||||||
defer d.stream.Close()
|
defer d.stream.Close()
|
||||||
defer func() {
|
defer func() {
|
||||||
d.Server.SetState(ProcessOfflineState)
|
d.Server.SetState(ProcessOfflineState)
|
||||||
d.SetAttached(false)
|
d.SetStream(nil)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
io.Copy(console, d.stream.Reader)
|
io.Copy(console, d.stream.Reader)
|
||||||
|
@ -818,6 +812,9 @@ func (d *DockerEnvironment) Create() error {
|
||||||
// Sends the specified command to the stdin of the running container instance. There is no
|
// Sends the specified command to the stdin of the running container instance. There is no
|
||||||
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
|
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
|
||||||
func (d *DockerEnvironment) SendCommand(c string) error {
|
func (d *DockerEnvironment) SendCommand(c string) error {
|
||||||
|
d.RLock()
|
||||||
|
defer d.RUnlock()
|
||||||
|
|
||||||
if !d.IsAttached() {
|
if !d.IsAttached() {
|
||||||
return errors.New("attempting to send command to non-attached instance")
|
return errors.New("attempting to send command to non-attached instance")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user