Configure cron to actually send to endpoint
This commit is contained in:
parent
28137c4c14
commit
49f3a61d16
|
@ -3,6 +3,8 @@ package cron
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"emperror.dev/errors"
|
"emperror.dev/errors"
|
||||||
|
"github.com/apex/log"
|
||||||
|
"github.com/goccy/go-json"
|
||||||
"github.com/pterodactyl/wings/internal/database"
|
"github.com/pterodactyl/wings/internal/database"
|
||||||
"github.com/pterodactyl/wings/server"
|
"github.com/pterodactyl/wings/server"
|
||||||
"github.com/pterodactyl/wings/system"
|
"github.com/pterodactyl/wings/system"
|
||||||
|
@ -19,12 +21,12 @@ func processActivityLogs(m *server.Manager) error {
|
||||||
}
|
}
|
||||||
defer processing.Store(false)
|
defer processing.Store(false)
|
||||||
|
|
||||||
var b [][]byte
|
var list [][]byte
|
||||||
err := database.DB().View(func(tx *nutsdb.Tx) error {
|
err := database.DB().View(func(tx *nutsdb.Tx) error {
|
||||||
// Grab the oldest 100 activity events that have been logged and send them back to the
|
// Grab the oldest 100 activity events that have been logged and send them back to the
|
||||||
// Panel for processing. Once completed, delete those events from the database and then
|
// Panel for processing. Once completed, delete those events from the database and then
|
||||||
// release the lock on this process.
|
// release the lock on this process.
|
||||||
list, err := tx.LRange(database.ServerActivityBucket, []byte("events"), 0, 1)
|
l, err := tx.LRange(database.ServerActivityBucket, []byte("events"), 0, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This error is returned when the bucket doesn't exist, which is likely on the
|
// This error is returned when the bucket doesn't exist, which is likely on the
|
||||||
// first invocations of Wings since we haven't yet logged any data. There is nothing
|
// first invocations of Wings since we haven't yet logged any data. There is nothing
|
||||||
|
@ -34,18 +36,25 @@ func processActivityLogs(m *server.Manager) error {
|
||||||
}
|
}
|
||||||
return errors.WithStackIf(err)
|
return errors.WithStackIf(err)
|
||||||
}
|
}
|
||||||
b = list
|
list = l
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// If there is an error, return it. If there is no data to send to the Panel don't waste
|
if err != nil || len(list) == 0 {
|
||||||
// an API call, just return here. WithStackIf will return "nil" when the value provided to
|
|
||||||
// it is also nil.
|
|
||||||
if err != nil || len(b) == 0 {
|
|
||||||
return errors.WithStackIf(err)
|
return errors.WithStackIf(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.Client().SendActivityLogs(context.Background(), b); err != nil {
|
var processed []json.RawMessage
|
||||||
|
for _, l := range list {
|
||||||
|
var v json.RawMessage
|
||||||
|
if err := json.Unmarshal(l, &v); err != nil {
|
||||||
|
log.WithField("error", errors.WithStack(err)).Warn("failed to parse activity event json, skipping entry")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
processed = append(processed, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.Client().SendActivityLogs(context.Background(), processed); err != nil {
|
||||||
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
|
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ type Client interface {
|
||||||
SetInstallationStatus(ctx context.Context, uuid string, successful bool) error
|
SetInstallationStatus(ctx context.Context, uuid string, successful bool) error
|
||||||
SetTransferStatus(ctx context.Context, uuid string, successful bool) error
|
SetTransferStatus(ctx context.Context, uuid string, successful bool) error
|
||||||
ValidateSftpCredentials(ctx context.Context, request SftpAuthRequest) (SftpAuthResponse, error)
|
ValidateSftpCredentials(ctx context.Context, request SftpAuthRequest) (SftpAuthResponse, error)
|
||||||
SendActivityLogs(ctx context.Context, activity [][]byte) error
|
SendActivityLogs(ctx context.Context, activity []json.RawMessage) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type client struct {
|
type client struct {
|
||||||
|
@ -134,6 +134,9 @@ func (c *client) request(ctx context.Context, method, path string, body *bytes.B
|
||||||
err := backoff.Retry(func() error {
|
err := backoff.Retry(func() error {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if body != nil {
|
if body != nil {
|
||||||
|
// We have to create a copy of the body, otherwise attempting this request again will
|
||||||
|
// send no data if there was initially a body since the "requestOnce" method will read
|
||||||
|
// the whole buffer, thus leaving it empty at the end.
|
||||||
if _, err := b.Write(body.Bytes()); err != nil {
|
if _, err := b.Write(body.Bytes()); err != nil {
|
||||||
return backoff.Permanent(errors.Wrap(err, "http: failed to copy body buffer"))
|
return backoff.Permanent(errors.Wrap(err, "http: failed to copy body buffer"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package remote
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/goccy/go-json"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -179,8 +180,8 @@ func (c *client) SendRestorationStatus(ctx context.Context, backup string, succe
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendActivityLogs sends activity logs back to the Panel for processing.
|
// SendActivityLogs sends activity logs back to the Panel for processing.
|
||||||
func (c *client) SendActivityLogs(ctx context.Context, activity [][]byte) error {
|
func (c *client) SendActivityLogs(ctx context.Context, activity []json.RawMessage) error {
|
||||||
resp, err := c.Post(ctx, "/activty", d{"data": activity})
|
resp, err := c.Post(ctx, "/activity", d{"data": activity})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WithStackIf(err)
|
return errors.WithStackIf(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"github.com/goccy/go-json"
|
"github.com/goccy/go-json"
|
||||||
"github.com/pterodactyl/wings/internal/database"
|
"github.com/pterodactyl/wings/internal/database"
|
||||||
"github.com/xujiajun/nutsdb"
|
"github.com/xujiajun/nutsdb"
|
||||||
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,6 +18,8 @@ const (
|
||||||
ActivityConsoleCommand = Event("console_command")
|
ActivityConsoleCommand = Event("console_command")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ipTrimRegex = regexp.MustCompile(`(:\d*)?$`)
|
||||||
|
|
||||||
type Activity struct {
|
type Activity struct {
|
||||||
// User is UUID of the user that triggered this event, or an empty string if the event
|
// User is UUID of the user that triggered this event, or an empty string if the event
|
||||||
// cannot be tied to a specific user, in which case we will assume it was the system
|
// cannot be tied to a specific user, in which case we will assume it was the system
|
||||||
|
@ -89,6 +92,10 @@ func (a Activity) Save() error {
|
||||||
a.Timestamp = time.Now().UTC()
|
a.Timestamp = time.Now().UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since the "RemoteAddr" field can often include a port on the end we need to
|
||||||
|
// trim that off, otherwise it'll fail validation when sent to the Panel.
|
||||||
|
a.IP = ipTrimRegex.ReplaceAllString(a.IP, "")
|
||||||
|
|
||||||
value, err := json.Marshal(a)
|
value, err := json.Marshal(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "database: failed to marshal activity into json bytes")
|
return errors.Wrap(err, "database: failed to marshal activity into json bytes")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user