Add backoff retries to API calls from Wings

This commit is contained in:
Dane Everitt 2021-05-02 15:16:30 -07:00
parent ddfd6d9cce
commit 3f47bfd292
3 changed files with 124 additions and 56 deletions

View File

@ -8,11 +8,13 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"strings" "strings"
"time" "time"
"emperror.dev/errors" "emperror.dev/errors"
"github.com/apex/log" "github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/pterodactyl/wings/system" "github.com/pterodactyl/wings/system"
) )
@ -31,11 +33,11 @@ type Client interface {
} }
type client struct { type client struct {
httpClient *http.Client httpClient *http.Client
baseUrl string baseUrl string
tokenId string tokenId string
token string token string
attempts int maxAttempts int
} }
// New returns a new HTTP request client that is used for making authenticated // New returns a new HTTP request client that is used for making authenticated
@ -46,7 +48,7 @@ func New(base string, opts ...ClientOption) Client {
httpClient: &http.Client{ httpClient: &http.Client{
Timeout: time.Second * 15, Timeout: time.Second * 15,
}, },
attempts: 1, maxAttempts: 0,
} }
for _, opt := range opts { for _, opt := range opts {
opt(&c) opt(&c)
@ -71,6 +73,26 @@ func WithHttpClient(httpClient *http.Client) ClientOption {
} }
} }
// Get executes a HTTP GET request.
func (c *client) Get(ctx context.Context, path string, query q) (*Response, error) {
return c.request(ctx, http.MethodGet, path, nil, func(r *http.Request) {
q := r.URL.Query()
for k, v := range query {
q.Set(k, v)
}
r.URL.RawQuery = q.Encode()
})
}
// Post executes a HTTP POST request.
func (c *client) Post(ctx context.Context, path string, data interface{}) (*Response, error) {
b, err := json.Marshal(data)
if err != nil {
return nil, err
}
return c.request(ctx, http.MethodPost, path, bytes.NewBuffer(b))
}
// requestOnce creates a http request and executes it once. Prefer request() // requestOnce creates a http request and executes it once. Prefer request()
// over this method when possible. It appends the path to the endpoint of the // over this method when possible. It appends the path to the endpoint of the
// client and adds the authentication token to the request. // client and adds the authentication token to the request.
@ -96,41 +118,82 @@ func (c *client) requestOnce(ctx context.Context, method, path string, body io.R
return &Response{res}, err return &Response{res}, err
} }
// request executes a http request and attempts when errors occur. // request executes a HTTP request against the Panel API. If there is an error
// It appends the path to the endpoint of the client and adds the authentication token to the request. // encountered with the request it will be retried using an exponential backoff.
func (c *client) request(ctx context.Context, method, path string, body io.Reader, opts ...func(r *http.Request)) (res *Response, err error) { // If the error returned from the Panel is due to API throttling or because there
for i := 0; i < c.attempts; i++ { // are invalid authentication credentials provided the request will _not_ be
res, err = c.requestOnce(ctx, method, path, body, opts...) // retried by the backoff.
if err == nil && //
res.StatusCode < http.StatusInternalServerError && // This function automatically appends the path to the current client endpoint
res.StatusCode != http.StatusTooManyRequests { // and adds the required authentication headers to the request that is being
break // created. Errors returned will be of the RequestError type if there was some
// type of response from the API that can be parsed.
func (c *client) request(ctx context.Context, method, path string, body io.Reader, opts ...func(r *http.Request)) (*Response, error) {
var res *Response
err := backoff.Retry(func() error {
r, err := c.requestOnce(ctx, method, path, body, opts...)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return backoff.Permanent(err)
}
return errors.WrapIf(err, "http: request creation failed")
} }
} res = r
if err != nil { if r.HasError() {
return nil, errors.WithStack(err) // Don't keep spamming the endpoint if we've already made too many requests or
} // if we're not even authenticated correctly. Retrying generally won't fix either
return // of these issues.
} if r.StatusCode == http.StatusTooManyRequests || r.StatusCode == http.StatusUnauthorized {
return backoff.Permanent(r.Error())
// get executes a http get request. }
func (c *client) get(ctx context.Context, path string, query q) (*Response, error) { return r.Error()
return c.request(ctx, http.MethodGet, path, nil, func(r *http.Request) {
q := r.URL.Query()
for k, v := range query {
q.Set(k, v)
} }
r.URL.RawQuery = q.Encode() return nil
}) }, c.backoff(ctx))
}
// post executes a http post request.
func (c *client) post(ctx context.Context, path string, data interface{}) (*Response, error) {
b, err := json.Marshal(data)
if err != nil { if err != nil {
var rerr *RequestError
if errors.As(err, &rerr) {
return res, nil
}
if v, ok := err.(*backoff.PermanentError); ok {
return nil, v.Unwrap()
}
return nil, err return nil, err
} }
return c.request(ctx, http.MethodPost, path, bytes.NewBuffer(b)) return res, nil
}
// backoff returns an exponential backoff function for use with remote API
// requests. This will allow an API call to be executed approximately 10 times
// before it is finally reported back as an error.
//
// This allows for issues with DNS resolution, or rare race conditions due to
// slower SQL queries on the Panel to potentially self-resolve without just
// immediately failing the first request. The example below shows the amount of
// time that has ellapsed between each call to the handler when an error is
// returned. You can tweak these values as needed to get the effect you desire.
//
// If maxAttempts is a value greater than 0 the backoff will be capped at a total
// number of executions, or the MaxElapsedTime, whichever comes first.
//
// call(): 0s
// call(): 552.330144ms
// call(): 1.63271196s
// call(): 2.94284202s
// call(): 4.525234711s
// call(): 6.865723375s
// call(): 11.37194223s
// call(): 14.593421816s
// call(): 20.202045293s
// call(): 27.36567952s <-- Stops here as MaxElapsedTime is 30 seconds
func (c *client) backoff(ctx context.Context) backoff.BackOffContext {
b := backoff.NewExponentialBackOff()
b.MaxInterval = time.Second * 12
b.MaxElapsedTime = time.Second * 30
if c.maxAttempts > 0 {
return backoff.WithContext(backoff.WithMaxRetries(b, uint64(c.maxAttempts)), ctx)
}
return backoff.WithContext(b, ctx)
} }
// Response is a custom response type that allows for commonly used error // Response is a custom response type that allows for commonly used error
@ -185,7 +248,9 @@ func (r *Response) BindJSON(v interface{}) error {
} }
// Returns the first error message from the API call as a string. The error // Returns the first error message from the API call as a string. The error
// message will be formatted similar to the below example: // message will be formatted similar to the below example. If there is no error
// that can be parsed out of the API you'll still get a RequestError returned
// but the RequestError.Code will be "_MissingResponseCode".
// //
// HttpNotFoundException: The requested resource does not exist. (HTTP/404) // HttpNotFoundException: The requested resource does not exist. (HTTP/404)
func (r *Response) Error() error { func (r *Response) Error() error {
@ -196,7 +261,11 @@ func (r *Response) Error() error {
var errs RequestErrors var errs RequestErrors
_ = r.BindJSON(&errs) _ = r.BindJSON(&errs)
e := &RequestError{} e := &RequestError{
Code: "_MissingResponseCode",
Status: strconv.Itoa(r.StatusCode),
Detail: "No error response returned from API endpoint.",
}
if len(errs.Errors) > 0 { if len(errs.Errors) > 0 {
e = &errs.Errors[0] e = &errs.Errors[0]
} }

View File

@ -14,8 +14,7 @@ func createTestClient(h http.HandlerFunc) (*client, *httptest.Server) {
c := &client{ c := &client{
httpClient: s.Client(), httpClient: s.Client(),
baseUrl: s.URL, baseUrl: s.URL,
maxAttempts: 1,
attempts: 1,
tokenId: "testid", tokenId: "testid",
token: "testtoken", token: "testtoken",
} }
@ -47,7 +46,7 @@ func TestRequestRetry(t *testing.T) {
} }
i++ i++
}) })
c.attempts = 2 c.maxAttempts = 2
r, err := c.request(context.Background(), "", "", nil) r, err := c.request(context.Background(), "", "", nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, r) assert.NotNil(t, r)
@ -60,12 +59,12 @@ func TestRequestRetry(t *testing.T) {
rw.WriteHeader(http.StatusInternalServerError) rw.WriteHeader(http.StatusInternalServerError)
i++ i++
}) })
c.attempts = 2 c.maxAttempts = 2
r, err = c.request(context.Background(), "get", "", nil) r, err = c.request(context.Background(), "get", "", nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, r) assert.NotNil(t, r)
assert.Equal(t, http.StatusInternalServerError, r.StatusCode) assert.Equal(t, http.StatusInternalServerError, r.StatusCode)
assert.Equal(t, 2, i) assert.Equal(t, 3, i)
} }
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
@ -74,7 +73,7 @@ func TestGet(t *testing.T) {
assert.Len(t, r.URL.Query(), 1) assert.Len(t, r.URL.Query(), 1)
assert.Equal(t, "world", r.URL.Query().Get("hello")) assert.Equal(t, "world", r.URL.Query().Get("hello"))
}) })
r, err := c.get(context.Background(), "/test", q{"hello": "world"}) r, err := c.Get(context.Background(), "/test", q{"hello": "world"})
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, r) assert.NotNil(t, r)
} }
@ -87,7 +86,7 @@ func TestPost(t *testing.T) {
assert.Equal(t, http.MethodPost, r.Method) assert.Equal(t, http.MethodPost, r.Method)
}) })
r, err := c.post(context.Background(), "/test", test) r, err := c.Post(context.Background(), "/test", test)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, r) assert.NotNil(t, r)
} }

View File

@ -58,7 +58,7 @@ func (c *client) GetServers(ctx context.Context, limit int) ([]RawServerData, er
// things in a bad state within the Panel. This API call is executed once Wings // things in a bad state within the Panel. This API call is executed once Wings
// has fully booted all of the servers. // has fully booted all of the servers.
func (c *client) ResetServersState(ctx context.Context) error { func (c *client) ResetServersState(ctx context.Context) error {
res, err := c.post(ctx, "/servers/reset", nil) res, err := c.Post(ctx, "/servers/reset", nil)
if err != nil { if err != nil {
return errors.WrapIf(err, "remote/servers: failed to reset server state on Panel") return errors.WrapIf(err, "remote/servers: failed to reset server state on Panel")
} }
@ -68,7 +68,7 @@ func (c *client) ResetServersState(ctx context.Context) error {
func (c *client) GetServerConfiguration(ctx context.Context, uuid string) (ServerConfigurationResponse, error) { func (c *client) GetServerConfiguration(ctx context.Context, uuid string) (ServerConfigurationResponse, error) {
var config ServerConfigurationResponse var config ServerConfigurationResponse
res, err := c.get(ctx, fmt.Sprintf("/servers/%s", uuid), nil) res, err := c.Get(ctx, fmt.Sprintf("/servers/%s", uuid), nil)
if err != nil { if err != nil {
return config, err return config, err
} }
@ -83,7 +83,7 @@ func (c *client) GetServerConfiguration(ctx context.Context, uuid string) (Serve
} }
func (c *client) GetInstallationScript(ctx context.Context, uuid string) (InstallationScript, error) { func (c *client) GetInstallationScript(ctx context.Context, uuid string) (InstallationScript, error) {
res, err := c.get(ctx, fmt.Sprintf("/servers/%s/install", uuid), nil) res, err := c.Get(ctx, fmt.Sprintf("/servers/%s/install", uuid), nil)
if err != nil { if err != nil {
return InstallationScript{}, err return InstallationScript{}, err
} }
@ -99,7 +99,7 @@ func (c *client) GetInstallationScript(ctx context.Context, uuid string) (Instal
} }
func (c *client) SetInstallationStatus(ctx context.Context, uuid string, successful bool) error { func (c *client) SetInstallationStatus(ctx context.Context, uuid string, successful bool) error {
resp, err := c.post(ctx, fmt.Sprintf("/servers/%s/install", uuid), d{"successful": successful}) resp, err := c.Post(ctx, fmt.Sprintf("/servers/%s/install", uuid), d{"successful": successful})
if err != nil { if err != nil {
return err return err
} }
@ -108,7 +108,7 @@ func (c *client) SetInstallationStatus(ctx context.Context, uuid string, success
} }
func (c *client) SetArchiveStatus(ctx context.Context, uuid string, successful bool) error { func (c *client) SetArchiveStatus(ctx context.Context, uuid string, successful bool) error {
resp, err := c.post(ctx, fmt.Sprintf("/servers/%s/archive", uuid), d{"successful": successful}) resp, err := c.Post(ctx, fmt.Sprintf("/servers/%s/archive", uuid), d{"successful": successful})
if err != nil { if err != nil {
return err return err
} }
@ -121,7 +121,7 @@ func (c *client) SetTransferStatus(ctx context.Context, uuid string, successful
if successful { if successful {
state = "success" state = "success"
} }
resp, err := c.get(ctx, fmt.Sprintf("/servers/%s/transfer/%s", uuid, state), nil) resp, err := c.Get(ctx, fmt.Sprintf("/servers/%s/transfer/%s", uuid, state), nil)
if err != nil { if err != nil {
return err return err
} }
@ -136,7 +136,7 @@ func (c *client) SetTransferStatus(ctx context.Context, uuid string, successful
// all of the authorization security logic to the Panel. // all of the authorization security logic to the Panel.
func (c *client) ValidateSftpCredentials(ctx context.Context, request SftpAuthRequest) (SftpAuthResponse, error) { func (c *client) ValidateSftpCredentials(ctx context.Context, request SftpAuthRequest) (SftpAuthResponse, error) {
var auth SftpAuthResponse var auth SftpAuthResponse
res, err := c.post(ctx, "/sftp/auth", request) res, err := c.Post(ctx, "/sftp/auth", request)
if err != nil { if err != nil {
return auth, err return auth, err
} }
@ -163,7 +163,7 @@ func (c *client) ValidateSftpCredentials(ctx context.Context, request SftpAuthRe
func (c *client) GetBackupRemoteUploadURLs(ctx context.Context, backup string, size int64) (BackupRemoteUploadResponse, error) { func (c *client) GetBackupRemoteUploadURLs(ctx context.Context, backup string, size int64) (BackupRemoteUploadResponse, error) {
var data BackupRemoteUploadResponse var data BackupRemoteUploadResponse
res, err := c.get(ctx, fmt.Sprintf("/backups/%s", backup), q{"size": strconv.FormatInt(size, 10)}) res, err := c.Get(ctx, fmt.Sprintf("/backups/%s", backup), q{"size": strconv.FormatInt(size, 10)})
if err != nil { if err != nil {
return data, err return data, err
} }
@ -178,7 +178,7 @@ func (c *client) GetBackupRemoteUploadURLs(ctx context.Context, backup string, s
} }
func (c *client) SetBackupStatus(ctx context.Context, backup string, data BackupRequest) error { func (c *client) SetBackupStatus(ctx context.Context, backup string, data BackupRequest) error {
resp, err := c.post(ctx, fmt.Sprintf("/backups/%s", backup), data) resp, err := c.Post(ctx, fmt.Sprintf("/backups/%s", backup), data)
if err != nil { if err != nil {
return err return err
} }
@ -190,7 +190,7 @@ func (c *client) SetBackupStatus(ctx context.Context, backup string, data Backup
// restoration has been completed and the server should be marked as being // restoration has been completed and the server should be marked as being
// activated again. // activated again.
func (c *client) SendRestorationStatus(ctx context.Context, backup string, successful bool) error { func (c *client) SendRestorationStatus(ctx context.Context, backup string, successful bool) error {
resp, err := c.post(ctx, fmt.Sprintf("/backups/%s/restore", backup), d{"successful": successful}) resp, err := c.Post(ctx, fmt.Sprintf("/backups/%s/restore", backup), d{"successful": successful})
if err != nil { if err != nil {
return err return err
} }
@ -206,7 +206,7 @@ func (c *client) getServersPaged(ctx context.Context, page, limit int) ([]RawSer
Meta Pagination `json:"meta"` Meta Pagination `json:"meta"`
} }
res, err := c.get(ctx, "/servers", q{ res, err := c.Get(ctx, "/servers", q{
"page": strconv.Itoa(page), "page": strconv.Itoa(page),
"per_page": strconv.Itoa(limit), "per_page": strconv.Itoa(limit),
}) })