Check for error before prefix; fixes abandoned routine; closes pterodactyl/panel#3911

Due to the order of the previous logic in ScanReader, an error not caused by EOF would effectively get ignored since an error will always be returned with `isPrefix` equal to false, thus triggering the first break, and error checking is not performed beyond that point.

Thus, canceling an installation process for a server while this process was running would hang the routine and cause the loop to run endlessly, even with a canceled context.
This commit is contained in:
Dane Everitt 2022-02-05 11:56:17 -05:00
parent 1372eba84e
commit 9f985ae044
2 changed files with 29 additions and 34 deletions

View File

@ -18,18 +18,18 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/remote" "github.com/pterodactyl/wings/remote"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
) )
// Executes the installation stack for a server process. Bubbles any errors up to the calling // Install executes the installation stack for a server process. Bubbles any
// function which should handle contacting the panel to notify it of the server state. // errors up to the calling function which should handle contacting the panel to
// notify it of the server state.
// //
// Pass true as the first argument in order to execute a server sync before the process to // Pass true as the first argument in order to execute a server sync before the
// ensure the latest information is used. // process to ensure the latest information is used.
func (s *Server) Install(sync bool) error { func (s *Server) Install(sync bool) error {
if sync { if sync {
s.Log().Info("syncing server state with remote source before executing installation process") s.Log().Info("syncing server state with remote source before executing installation process")
@ -111,9 +111,7 @@ func (s *Server) internalInstall() error {
type InstallationProcess struct { type InstallationProcess struct {
Server *Server Server *Server
Script *remote.InstallationScript Script *remote.InstallationScript
client *client.Client client *client.Client
context context.Context
} }
// Generates a new installation process struct that will be used to create containers, // Generates a new installation process struct that will be used to create containers,
@ -128,7 +126,6 @@ func NewInstallationProcess(s *Server, script *remote.InstallationScript) (*Inst
return nil, err return nil, err
} else { } else {
proc.client = c proc.client = c
proc.context = s.Context()
} }
return proc, nil return proc, nil
@ -158,7 +155,7 @@ func (s *Server) SetRestoring(state bool) {
// Removes the installer container for the server. // Removes the installer container for the server.
func (ip *InstallationProcess) RemoveContainer() error { func (ip *InstallationProcess) RemoveContainer() error {
err := ip.client.ContainerRemove(ip.context, ip.Server.ID()+"_installer", types.ContainerRemoveOptions{ err := ip.client.ContainerRemove(ip.Server.Context(), ip.Server.ID()+"_installer", types.ContainerRemoveOptions{
RemoveVolumes: true, RemoveVolumes: true,
Force: true, Force: true,
}) })
@ -168,11 +165,10 @@ func (ip *InstallationProcess) RemoveContainer() error {
return nil return nil
} }
// Runs the installation process, this is done as in a background thread. This will configure // Run runs the installation process, this is done as in a background thread.
// the required environment, and then spin up the installation container. // This will configure the required environment, and then spin up the
// // installation container. Once the container finishes installing the results
// Once the container finishes installing the results will be stored in an installation // are stored in an installation log in the server's configuration directory.
// log in the server's configuration directory.
func (ip *InstallationProcess) Run() error { func (ip *InstallationProcess) Run() error {
ip.Server.Log().Debug("acquiring installation process lock") ip.Server.Log().Debug("acquiring installation process lock")
if !ip.Server.installing.SwapIf(true) { if !ip.Server.installing.SwapIf(true) {
@ -268,9 +264,9 @@ func (ip *InstallationProcess) pullInstallationImage() error {
imagePullOptions.RegistryAuth = b64 imagePullOptions.RegistryAuth = b64
} }
r, err := ip.client.ImagePull(context.Background(), ip.Script.ContainerImage, imagePullOptions) r, err := ip.client.ImagePull(ip.Server.Context(), ip.Script.ContainerImage, imagePullOptions)
if err != nil { if err != nil {
images, ierr := ip.client.ImageList(context.Background(), types.ImageListOptions{}) images, ierr := ip.client.ImageList(ip.Server.Context(), types.ImageListOptions{})
if ierr != nil { if ierr != nil {
// Well damn, something has gone really wrong here, just go ahead and abort there // Well damn, something has gone really wrong here, just go ahead and abort there
// isn't much anything we can do to try and self-recover from this. // isn't much anything we can do to try and self-recover from this.
@ -313,9 +309,10 @@ func (ip *InstallationProcess) pullInstallationImage() error {
return nil return nil
} }
// Runs before the container is executed. This pulls down the required docker container image // BeforeExecute runs before the container is executed. This pulls down the
// as well as writes the installation script to the disk. This process is executed in an async // required docker container image as well as writes the installation script to
// manner, if either one fails the error is returned. // the disk. This process is executed in an async manner, if either one fails
// the error is returned.
func (ip *InstallationProcess) BeforeExecute() error { func (ip *InstallationProcess) BeforeExecute() error {
if err := ip.writeScriptToDisk(); err != nil { if err := ip.writeScriptToDisk(); err != nil {
return errors.WithMessage(err, "failed to write installation script to disk") return errors.WithMessage(err, "failed to write installation script to disk")
@ -341,7 +338,7 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
defer ip.RemoveContainer() defer ip.RemoveContainer()
ip.Server.Log().WithField("container_id", containerId).Debug("pulling installation logs for server") ip.Server.Log().WithField("container_id", containerId).Debug("pulling installation logs for server")
reader, err := ip.client.ContainerLogs(ip.context, containerId, types.ContainerLogsOptions{ reader, err := ip.client.ContainerLogs(ip.Server.Context(), containerId, types.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
ShowStderr: true, ShowStderr: true,
Follow: false, Follow: false,
@ -396,12 +393,13 @@ func (ip *InstallationProcess) AfterExecute(containerId string) error {
return nil return nil
} }
// Executes the installation process inside a specially created docker container. // Execute executes the installation process inside a specially created docker
// container.
func (ip *InstallationProcess) Execute() (string, error) { func (ip *InstallationProcess) Execute() (string, error) {
// Create a child context that is canceled once this function is done running. This // Create a child context that is canceled once this function is done running. This
// will also be canceled if the parent context (from the Server struct) is canceled // will also be canceled if the parent context (from the Server struct) is canceled
// which occurs if the server is deleted. // which occurs if the server is deleted.
ctx, cancel := context.WithCancel(ip.context) ctx, cancel := context.WithCancel(ip.Server.Context())
defer cancel() defer cancel()
conf := &container.Config{ conf := &container.Config{
@ -512,18 +510,15 @@ func (ip *InstallationProcess) Execute() (string, error) {
// the server configuration directory, as well as to a websocket listener so // the server configuration directory, as well as to a websocket listener so
// that the process can be viewed in the panel by administrators. // that the process can be viewed in the panel by administrators.
func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error { func (ip *InstallationProcess) StreamOutput(ctx context.Context, id string) error {
reader, err := ip.client.ContainerLogs(ctx, id, types.ContainerLogsOptions{ opts := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true}
ShowStdout: true, reader, err := ip.client.ContainerLogs(ctx, id, opts)
ShowStderr: true,
Follow: true,
})
if err != nil { if err != nil {
return err return err
} }
defer reader.Close() defer reader.Close()
err = system.ScanReader(reader, ip.Server.Sink(system.InstallSink).Push) err = system.ScanReader(reader, ip.Server.Sink(system.InstallSink).Push)
if err != nil { if err != nil && !errors.Is(err, context.Canceled) {
ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines") ip.Server.Log().WithFields(log.Fields{"container_id": id, "error": err}).Warn("error processing install output lines")
} }
return nil return nil

View File

@ -88,16 +88,16 @@ func ScanReader(r io.Reader, callback func(line []byte)) error {
} else { } else {
buf.Write(line) buf.Write(line)
} }
// If we encountered an error with something in ReadLine that was not an
// EOF just abort the entire process here.
if err != nil && err != io.EOF {
return err
}
// Finish this loop and begin outputting the line if there is no prefix // Finish this loop and begin outputting the line if there is no prefix
// (the line fit into the default buffer), or if we hit the end of the line. // (the line fit into the default buffer), or if we hit the end of the line.
if !isPrefix || err == io.EOF { if !isPrefix || err == io.EOF {
break break
} }
// If we encountered an error with something in ReadLine that was not an
// EOF just abort the entire process here.
if err != nil {
return err
}
} }
// Send the full buffer length over to the event handler to be emitted in // Send the full buffer length over to the event handler to be emitted in