Send installation data over socket when running

This commit is contained in:
Dane Everitt 2020-01-18 13:05:44 -08:00
parent 5c3823de9a
commit c6fcd8cabb
No known key found for this signature in database
GPG Key ID: EEA66103B3D71F53
4 changed files with 128 additions and 60 deletions

View File

@ -12,6 +12,8 @@ type EventListenerFunction *func(string)
// Defines all of the possible output events for a server. // Defines all of the possible output events for a server.
// noinspection GoNameStartsWithPackageName // noinspection GoNameStartsWithPackageName
const ( const (
DaemonMessageEvent = "daemon message"
InstallOutputEvent = "install output"
ConsoleOutputEvent = "console output" ConsoleOutputEvent = "console output"
StatusEvent = "status" StatusEvent = "status"
StatsEvent = "stats" StatsEvent = "stats"

View File

@ -32,26 +32,7 @@ func (s *Server) Install() error {
return errors.WithStack(err) return errors.WithStack(err)
} }
go func(proc *InstallationProcess) { go p.Run()
installPath, err := proc.BeforeExecute()
if err != nil {
zap.S().Errorw(
"failed to complete BeforeExecute step of installation process",
zap.String("server", proc.Server.Uuid),
zap.Error(errors.WithStack(err)),
)
return
}
if err := proc.Execute(installPath); err != nil {
zap.S().Errorw(
"failed to complete Execute step of installation process",
zap.String("server", proc.Server.Uuid),
zap.Error(errors.WithStack(err)),
)
}
}(p)
return nil return nil
} }
@ -82,6 +63,34 @@ func NewInstallationProcess(s *Server, script *api.InstallationScript) (*Install
return proc, nil return proc, nil
} }
// Runs the installation process, this is done as a backgrounded thread. This will configure
// the required environment, and then spin up the installation container.
//
// Once the container finishes installing the results will be stored in an installation
// log in the server's configuration directory.
func (ip *InstallationProcess) Run() {
installPath, err := ip.BeforeExecute()
if err != nil {
zap.S().Errorw(
"failed to complete BeforeExecute step of installation process",
zap.String("server", ip.Server.Uuid),
zap.Error(errors.WithStack(err)),
)
return
}
if _, err := ip.Execute(installPath); err != nil {
zap.S().Errorw(
"failed to complete Execute step of installation process",
zap.String("server", ip.Server.Uuid),
zap.Error(errors.WithStack(err)),
)
}
zap.S().Infow("completed installation process for server", zap.String("server", ip.Server.Uuid))
}
// Writes the installation script to a temporary file on the host machine so that it // Writes the installation script to a temporary file on the host machine so that it
// can be properly mounted into the installation container and then executed. // can be properly mounted into the installation container and then executed.
func (ip *InstallationProcess) writeScriptToDisk() (string, error) { func (ip *InstallationProcess) writeScriptToDisk() (string, error) {
@ -187,7 +196,7 @@ func (ip *InstallationProcess) BeforeExecute() (string, error) {
} }
// Executes the installation process inside a specially created docker container. // Executes the installation process inside a specially created docker container.
func (ip *InstallationProcess) Execute(installPath string) error { func (ip *InstallationProcess) Execute(installPath string) (string, error) {
ctx := context.Background() ctx := context.Background()
zap.S().Debugw( zap.S().Debugw(
@ -234,7 +243,7 @@ func (ip *InstallationProcess) Execute(installPath string) error {
LogConfig: container.LogConfig{ LogConfig: container.LogConfig{
Type: "local", Type: "local",
Config: map[string]string{ Config: map[string]string{
"max-size": "20m", "max-size": "5m",
"max-file": "1", "max-file": "1",
"compress": "false", "compress": "false",
}, },
@ -246,24 +255,71 @@ func (ip *InstallationProcess) Execute(installPath string) error {
zap.S().Infow("creating installer container for server process", zap.String("server", ip.Server.Uuid)) zap.S().Infow("creating installer container for server process", zap.String("server", ip.Server.Uuid))
r, err := ip.client.ContainerCreate(ctx, conf, hostConf, nil, ip.Server.Uuid+"_installer") r, err := ip.client.ContainerCreate(ctx, conf, hostConf, nil, ip.Server.Uuid+"_installer")
if err != nil { if err != nil {
return errors.WithStack(err) return "", errors.WithStack(err)
} }
zap.S().Infow("running installation process for server", zap.String("server", ip.Server.Uuid)) zap.S().Infow(
"running installation process for server...",
zap.String("server", ip.Server.Uuid),
zap.String("container_id", r.ID),
)
if err := ip.client.ContainerStart(ctx, r.ID, types.ContainerStartOptions{}); err != nil { if err := ip.client.ContainerStart(ctx, r.ID, types.ContainerStartOptions{}); err != nil {
return err return "", err
} }
go func(id string) {
ip.Server.Emit(DaemonMessageEvent, "Starting installation process, this could take a few minutes...")
if err := ip.StreamOutput(id); err != nil {
zap.S().Errorw(
"error handling streaming output for server install process",
zap.String("container_id", id),
zap.Error(err),
)
}
ip.Server.Emit(DaemonMessageEvent, "Installation process completed.")
}(r.ID)
sChann, eChann := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning) sChann, eChann := ip.client.ContainerWait(ctx, r.ID, container.WaitConditionNotRunning)
select { select {
case err := <-eChann: case err := <-eChann:
if err != nil { if err != nil {
return errors.WithStack(err) return "", errors.WithStack(err)
} }
case <-sChann: case <-sChann:
} }
zap.S().Infow("completed installation process", zap.String("server", ip.Server.Uuid)) return r.ID, nil
}
// Streams the output of the installation process to a log file in the server configuration
// directory, as well as to a websocket listener so that the process can be viewed in
// the panel by administrators.
func (ip *InstallationProcess) StreamOutput(id string) error {
reader, err := ip.client.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return errors.WithStack(err)
}
defer reader.Close()
s := bufio.NewScanner(reader)
for s.Scan() {
ip.Server.Emit(InstallOutputEvent, s.Text())
}
if err := s.Err(); err != nil {
zap.S().Warnw(
"error processing scanner line in installation output for server",
zap.String("server", ip.Server.Uuid),
zap.String("container_id", id),
zap.Error(errors.WithStack(err)),
)
}
return nil return nil
} }

View File

@ -66,6 +66,7 @@ const (
PermissionSendCommand = "send-command" PermissionSendCommand = "send-command"
PermissionSendPower = "send-power" PermissionSendPower = "send-power"
PermissionReceiveErrors = "receive-errors" PermissionReceiveErrors = "receive-errors"
PermissionReceiveInstall = "receive-install"
) )
// Checks if the given token payload has a permission string. // Checks if the given token payload has a permission string.
@ -163,35 +164,35 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
JWT: nil, JWT: nil,
} }
handleOutput := func(data string) { // Register all of the event handlers.
events := []string{
server.StatsEvent,
server.StatusEvent,
server.ConsoleOutputEvent,
server.InstallOutputEvent,
server.DaemonMessageEvent,
}
var eventFuncs = make(map[string]*func(string))
for _, event := range events {
var e = event
var fn = func(data string) {
handler.SendJson(&WebsocketMessage{ handler.SendJson(&WebsocketMessage{
Event: server.ConsoleOutputEvent, Event: e,
Args: []string{data}, Args: []string{data},
}) })
} }
handleServerStatus := func(data string) { eventFuncs[event] = &fn
handler.SendJson(&WebsocketMessage{ s.AddListener(event, &fn)
Event: server.StatusEvent,
Args: []string{data},
})
} }
handleResourceUse := func(data string) { // When done with the socket, remove all of the event handlers we had registered.
handler.SendJson(&WebsocketMessage{ defer func() {
Event: server.StatsEvent, for event, action := range eventFuncs {
Args: []string{data}, s.RemoveListener(event, action)
})
} }
}()
s.AddListener(server.StatusEvent, &handleServerStatus)
defer s.RemoveListener(server.StatusEvent, &handleServerStatus)
s.AddListener(server.ConsoleOutputEvent, &handleOutput)
defer s.RemoveListener(server.ConsoleOutputEvent, &handleOutput)
s.AddListener(server.StatsEvent, &handleResourceUse)
defer s.RemoveListener(server.StatsEvent, &handleResourceUse)
// Sit here and check the time to expiration on the JWT every 30 seconds until // Sit here and check the time to expiration on the JWT every 30 seconds until
// the token has expired. If we are within 3 minutes of the token expiring, send // the token has expired. If we are within 3 minutes of the token expiring, send
@ -250,13 +251,22 @@ func (rt *Router) routeWebsocket(w http.ResponseWriter, r *http.Request, ps http
// Perform a blocking send operation on the websocket since we want to avoid any // Perform a blocking send operation on the websocket since we want to avoid any
// concurrent writes to the connection, which would cause a runtime panic and cause // concurrent writes to the connection, which would cause a runtime panic and cause
// the program to crash out. // the program to crash out.
func (wsh *WebsocketHandler) SendJson(v interface{}) error { func (wsh *WebsocketHandler) SendJson(v *WebsocketMessage) error {
// Do not send JSON down the line if the JWT on the connection is not // Do not send JSON down the line if the JWT on the connection is not
// valid! // valid!
if err := wsh.TokenValid(); err != nil { if err := wsh.TokenValid(); err != nil {
return nil return nil
} }
// If we're sending installation output but the user does not have the required
// permissions to see the output, don't send it down the line.
if v.Event == server.InstallOutputEvent {
zap.S().Debugf("%+v", v.Args)
if wsh.JWT != nil && !wsh.JWT.HasPermission(PermissionReceiveInstall) {
return nil
}
}
return wsh.unsafeSendJson(v) return wsh.unsafeSendJson(v)
} }