Quick little code cleanup and adding some commentary

This commit is contained in:
Dane Everitt 2021-03-07 17:31:45 -08:00
parent 81a411a42c
commit 5f5b2bc84e
2 changed files with 127 additions and 175 deletions

View File

@ -3,9 +3,14 @@ package docker
import ( import (
"bufio" "bufio"
"context" "context"
"emperror.dev/errors"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"strconv"
"strings"
"time"
"emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
@ -15,16 +20,9 @@ import (
"github.com/pterodactyl/wings/config" "github.com/pterodactyl/wings/config"
"github.com/pterodactyl/wings/environment" "github.com/pterodactyl/wings/environment"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
"io"
"strconv"
"strings"
"time"
) )
type imagePullStatus struct { var ErrNotAttached = errors.Sentinel("not attached to instance")
Status string `json:"status"`
Progress string `json:"progress"`
}
// A custom console writer that allows us to keep a function blocked until the // A custom console writer that allows us to keep a function blocked until the
// given stream is properly closed. This does nothing special, only exists to // given stream is properly closed. This does nothing special, only exists to
@ -38,14 +36,14 @@ func (nw noopWriter) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
// Attaches to the docker container itself and ensures that we can pipe data in and out // Attach attaches to the docker container itself and ensures that we can pipe
// of the process stream. This should not be used for reading console data as you *will* // data in and out of the process stream. This should not be used for reading
// miss important output at the beginning because of the time delay with attaching to the // console data as you *will* miss important output at the beginning because of
// output. // the time delay with attaching to the output.
// //
// Calling this function will poll resources for the container in the background until the // Calling this function will poll resources for the container in the background
// provided context is canceled by the caller. Failure to cancel said context will cause // until the provided context is canceled by the caller. Failure to cancel said
// background memory leaks as the goroutine will not exit. // context will cause background memory leaks as the goroutine will not exit.
func (e *Environment) Attach() error { func (e *Environment) Attach() error {
if e.IsAttached() { if e.IsAttached() {
return nil return nil
@ -108,27 +106,15 @@ func (e *Environment) Attach() error {
return nil return nil
} }
func (e *Environment) resources() container.Resources { // InSituUpdate performs an in-place update of the Docker container's resource
l := e.Configuration.Limits() // limits without actually making any changes to the operational state of the
// container. This allows memory, cpu, and IO limitations to be adjusted on the
return container.Resources{ // fly for individual instances.
Memory: l.BoundedMemoryLimit(),
MemoryReservation: l.MemoryLimit * 1_000_000,
MemorySwap: l.ConvertedSwap(),
CPUQuota: l.ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: l.IoWeight,
OomKillDisable: &l.OOMDisabled,
CpusetCpus: l.Threads,
}
}
// Performs an in-place update of the Docker container's resource limits without actually
// making any changes to the operational state of the container. This allows memory, cpu,
// and IO limitations to be adjusted on the fly for individual instances.
func (e *Environment) InSituUpdate() error { func (e *Environment) InSituUpdate() error {
if _, err := e.client.ContainerInspect(context.Background(), e.Id); err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if _, err := e.client.ContainerInspect(ctx, e.Id); err != nil {
// If the container doesn't exist for some reason there really isn't anything // If the container doesn't exist for some reason there really isn't anything
// we can do to fix that in this process (it doesn't make sense at least). In those // we can do to fix that in this process (it doesn't make sense at least). In those
// cases just return without doing anything since we still want to save the configuration // cases just return without doing anything since we still want to save the configuration
@ -138,25 +124,24 @@ func (e *Environment) InSituUpdate() error {
if client.IsErrNotFound(err) { if client.IsErrNotFound(err) {
return nil return nil
} }
return errors.Wrap(err, "environment/docker: could not inspect container")
return err
} }
u := container.UpdateConfig{ // CPU pinning cannot be removed once it is applied to a container. The same is true
// for removing memory limits, a container must be re-created.
//
// @see https://github.com/moby/moby/issues/41946
if _, err := e.client.ContainerUpdate(ctx, e.Id, container.UpdateConfig{
Resources: e.resources(), Resources: e.resources(),
}); err != nil {
return errors.Wrap(err, "environment/docker: could not update container")
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if _, err := e.client.ContainerUpdate(ctx, e.Id, u); err != nil {
return err
}
return nil return nil
} }
// Creates a new container for the server using all of the data that is currently // Create creates a new container for the server using all of the data that is
// available for it. If the container already exists it will be returnee. // currently available for it. If the container already exists it will be
// returned.
func (e *Environment) Create() error { func (e *Environment) Create() error {
// If the container already exists don't hit the user with an error, just return // If the container already exists don't hit the user with an error, just return
// the current information about it which is what we would do when creating the // the current information about it which is what we would do when creating the
@ -251,23 +236,8 @@ func (e *Environment) Create() error {
return nil return nil
} }
func (e *Environment) convertMounts() []mount.Mount { // Destroy will remove the Docker container from the server. If the container
var out []mount.Mount // is currently running it will be forcibly stopped by Docker.
for _, m := range e.Configuration.Mounts() {
out = append(out, mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Target,
ReadOnly: m.ReadOnly,
})
}
return out
}
// Remove the Docker container from the machine. If the container is currently running
// it will be forcibly stopped by Docker.
func (e *Environment) Destroy() error { func (e *Environment) Destroy() error {
// We set it to stopping than offline to prevent crash detection from being triggered. // We set it to stopping than offline to prevent crash detection from being triggered.
e.SetState(environment.ProcessStoppingState) e.SetState(environment.ProcessStoppingState)
@ -291,9 +261,55 @@ func (e *Environment) Destroy() error {
return err return err
} }
// Attaches to the log for the container. This avoids us missing crucial output that // SendCommand sends the specified command to the stdin of the running container
// happens in the split seconds before the code moves from 'Starting' to 'Attaching' // instance. There is no confirmation that this data is sent successfully, only
// on the process. // that it gets pushed into the stdin.
func (e *Environment) SendCommand(c string) error {
if !e.IsAttached() {
return errors.Wrap(ErrNotAttached, "environment/docker: cannot send command to container")
}
e.mu.RLock()
defer e.mu.RUnlock()
// If the command being processed is the same as the process stop command then we
// want to mark the server as entering the stopping state otherwise the process will
// stop and Wings will think it has crashed and attempt to restart it.
if e.meta.Stop.Type == "command" && c == e.meta.Stop.Value {
e.SetState(environment.ProcessStoppingState)
}
_, err := e.stream.Conn.Write([]byte(c + "\n"))
return errors.Wrap(err, "environment/docker: could not write to container stream")
}
// Readlog reads the log file for the server. This does not care if the server
// is running or not, it will simply try to read the last X bytes of the file
// and return them.
func (e *Environment) Readlog(lines int) ([]string, error) {
r, err := e.client.ContainerLogs(context.Background(), e.Id, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Tail: strconv.Itoa(lines),
})
if err != nil {
return nil, errors.WithStack(err)
}
defer r.Close()
var out []string
scanner := bufio.NewScanner(r)
for scanner.Scan() {
out = append(out, scanner.Text())
}
return out, nil
}
// Attaches to the log for the container. This avoids us missing crucial output
// that happens in the split seconds before the code moves from 'Starting' to
// 'Attaching' on the process.
func (e *Environment) followOutput() error { func (e *Environment) followOutput() error {
if exists, err := e.Exists(); !exists { if exists, err := e.Exists(); !exists {
if err != nil { if err != nil {
@ -346,14 +362,19 @@ func (e *Environment) scanOutput(reader io.ReadCloser) {
go e.followOutput() go e.followOutput()
} }
// Pulls the image from Docker. If there is an error while pulling the image from the source type imagePullStatus struct {
// but the image already exists locally, we will report that error to the logger but continue Status string `json:"status"`
// with the process. Progress string `json:"progress"`
}
// Pulls the image from Docker. If there is an error while pulling the image
// from the source but the image already exists locally, we will report that
// error to the logger but continue with the process.
// //
// The reasoning behind this is that Quay has had some serious outages as of late, and we don't // The reasoning behind this is that Quay has had some serious outages as of
// need to block all of the servers from booting just because of that. I'd imagine in a lot of // late, and we don't need to block all of the servers from booting just because
// cases an outage shouldn't affect users too badly. It'll at least keep existing servers working // of that. I'd imagine in a lot of cases an outage shouldn't affect users too
// correctly if anything. // badly. It'll at least keep existing servers working correctly if anything.
func (e *Environment) ensureImageExists(image string) error { func (e *Environment) ensureImageExists(image string) error {
e.Events().Publish(environment.DockerImagePullStarted, "") e.Events().Publish(environment.DockerImagePullStarted, "")
defer e.Events().Publish(environment.DockerImagePullCompleted, "") defer e.Events().Publish(environment.DockerImagePullCompleted, "")
@ -447,3 +468,34 @@ func (e *Environment) ensureImageExists(image string) error {
return nil return nil
} }
func (e *Environment) convertMounts() []mount.Mount {
var out []mount.Mount
for _, m := range e.Configuration.Mounts() {
out = append(out, mount.Mount{
Type: mount.TypeBind,
Source: m.Source,
Target: m.Target,
ReadOnly: m.ReadOnly,
})
}
return out
}
func (e *Environment) resources() container.Resources {
l := e.Configuration.Limits()
return container.Resources{
Memory: l.BoundedMemoryLimit(),
MemoryReservation: l.MemoryLimit * 1_000_000,
MemorySwap: l.ConvertedSwap(),
CPUQuota: l.ConvertedCpuLimit(),
CPUPeriod: 100_000,
CPUShares: 1024,
BlkioWeight: l.IoWeight,
OomKillDisable: &l.OOMDisabled,
CpusetCpus: l.Threads,
}
}

View File

@ -1,100 +0,0 @@
package docker
import (
"bufio"
"bytes"
"context"
"emperror.dev/errors"
"encoding/json"
"github.com/docker/docker/api/types"
"github.com/pterodactyl/wings/environment"
"strconv"
)
type dockerLogLine struct {
Log string `json:"log"`
}
var ErrNotAttached = errors.New("not attached to instance")
func (e *Environment) setStream(s *types.HijackedResponse) {
e.mu.Lock()
defer e.mu.Unlock()
e.stream = s
}
// Sends the specified command to the stdin of the running container instance. There is no
// confirmation that this data is sent successfully, only that it gets pushed into the stdin.
func (e *Environment) SendCommand(c string) error {
if !e.IsAttached() {
return ErrNotAttached
}
e.mu.RLock()
defer e.mu.RUnlock()
// If the command being processed is the same as the process stop command then we want to mark
// the server as entering the stopping state otherwise the process will stop and Wings will think
// it has crashed and attempt to restart it.
if e.meta.Stop.Type == "command" && c == e.meta.Stop.Value {
e.SetState(environment.ProcessStoppingState)
}
_, err := e.stream.Conn.Write([]byte(c + "\n"))
return err
}
// Reads the log file for the server. This does not care if the server is running or not, it will
// simply try to read the last X bytes of the file and return them.
func (e *Environment) Readlog(lines int) ([]string, error) {
r, err := e.client.ContainerLogs(context.Background(), e.Id, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Tail: strconv.Itoa(lines),
})
if err != nil {
return nil, err
}
defer r.Close()
var out []string
scanner := bufio.NewScanner(r)
for scanner.Scan() {
out = append(out, scanner.Text())
}
return out, nil
}
// Docker stores the logs for server output in a JSON format. This function will iterate over the JSON
// that was read from the log file and parse it into a more human readable format.
func (e *Environment) parseLogToStrings(b []byte) ([]string, error) {
hasError := false
var out []string
scanner := bufio.NewScanner(bytes.NewReader(b))
for scanner.Scan() {
var l dockerLogLine
// Unmarshal the contents and allow up to a single error before bailing out of the process. We
// do this because if you're arbitrarily reading a length of the file you'll likely end up
// with the first line in the output being improperly formatted JSON. In those cases we want to
// just skip over it. However if we see another error we're going to bail out because that is an
// abnormal situation.
if err := json.Unmarshal([]byte(scanner.Text()), &l); err != nil {
if hasError {
return nil, err
}
hasError = true
continue
}
out = append(out, l.Log)
}
return out, nil
}