Switch to gorm for activity logging

This commit is contained in:
DaneEveritt
2022-07-24 11:43:48 -04:00
parent 61baccb1a3
commit 8a867ccc44
17 changed files with 514 additions and 421 deletions

View File

@@ -1,15 +1,12 @@
package cron
import (
"bytes"
"context"
"database/sql"
"emperror.dev/errors"
"encoding/gob"
"github.com/pterodactyl/wings/internal/sqlite"
"github.com/pterodactyl/wings/internal/database"
"github.com/pterodactyl/wings/internal/models"
"github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/system"
"strings"
)
type activityCron struct {
@@ -18,32 +15,6 @@ type activityCron struct {
max int64
}
const queryRegularActivity = `
SELECT id, event, user_uuid, server_uuid, metadata, ip, timestamp FROM activity_logs
WHERE event NOT LIKE 'server:sftp.%'
ORDER BY timestamp
LIMIT ?
`
type QueriedActivity struct {
id int
b []byte
server.Activity
}
// Parse parses the internal query results into the QueriedActivity type and then properly
// sets the Metadata onto it. This also sets the ID that was returned to ensure we're able
// to then delete all of the matching rows in the database after we're done.
func (qa *QueriedActivity) Parse(r *sql.Rows) error {
if err := r.Scan(&qa.id, &qa.Event, &qa.User, &qa.Server, &qa.b, &qa.IP, &qa.Timestamp); err != nil {
return errors.Wrap(err, "cron: failed to parse activity log")
}
if err := gob.NewDecoder(bytes.NewBuffer(qa.b)).Decode(&qa.Metadata); err != nil {
return errors.WithStack(err)
}
return nil
}
// Run executes the cronjob and ensures we fetch and send all of the stored activity to the
// Panel instance. Once activity is sent it is deleted from the local database instance. Any
// SFTP specific events are not handled in this cron, they're handled seperately to account
@@ -56,47 +27,31 @@ func (ac *activityCron) Run(ctx context.Context) error {
}
defer ac.mu.Store(false)
rows, err := sqlite.Instance().QueryContext(ctx, queryRegularActivity, ac.max)
if err != nil {
return errors.Wrap(err, "cron: failed to query activity logs")
}
defer rows.Close()
var activity []models.Activity
tx := database.Instance().WithContext(ctx).
Where("event NOT LIKE ?", "server:sftp.%").
Limit(int(ac.max)).
Find(&activity)
var logs []server.Activity
var ids []int
for rows.Next() {
var qa QueriedActivity
if err := qa.Parse(rows); err != nil {
return err
}
ids = append(ids, qa.id)
logs = append(logs, qa.Activity)
if tx.Error != nil {
return errors.WithStack(tx.Error)
}
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
if len(logs) == 0 {
if len(activity) == 0 {
return nil
}
if err := ac.manager.Client().SendActivityLogs(ctx, logs); err != nil {
if err := ac.manager.Client().SendActivityLogs(ctx, activity); err != nil {
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
}
if tx, err := sqlite.Instance().Begin(); err != nil {
return err
} else {
t := make([]string, len(ids))
params := make([]interface{}, len(ids))
for i := 0; i < len(ids); i++ {
t[i] = "?"
params[i] = ids[i]
}
q := strings.Join(t, ",")
_, err := tx.Exec(`DELETE FROM activity_logs WHERE id IN(`+q+`)`, params...)
if err != nil {
return errors.Combine(errors.WithStack(err), tx.Rollback())
}
return errors.WithStack(tx.Commit())
var ids []int
for _, v := range activity {
ids = append(ids, v.ID)
}
tx = database.Instance().WithContext(ctx).Where("id IN ?", ids).Delete(&models.Activity{})
if tx.Error != nil {
return errors.WithStack(tx.Error)
}
return nil
}

View File

@@ -33,14 +33,8 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error
max: config.Get().System.ActivitySendCount,
}
sftp := sftpActivityCron{
mu: system.NewAtomicBool(false),
manager: m,
max: config.Get().System.ActivitySendCount,
}
s := gocron.NewScheduler(l)
_, _ = s.Tag("activity").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() {
_, _ = s.Tag("activity").Every(5).Seconds().Do(func() {
if err := activity.Run(ctx); err != nil {
if errors.Is(err, ErrCronRunning) {
log.WithField("cron", "activity").Warn("cron: process is already running, skipping...")
@@ -50,15 +44,5 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error
}
})
_, _ = s.Tag("sftp_activity").Every(5).Seconds().Do(func() {
if err := sftp.Run(ctx); err != nil {
if errors.Is(err, ErrCronRunning) {
log.WithField("cron", "sftp").Warn("cron: process is already running, skipping...")
} else {
log.WithField("error", err).Error("cron: failed to process sftp events")
}
}
})
return s, nil
}

View File

@@ -1,145 +0,0 @@
package cron
import (
"bytes"
"context"
"emperror.dev/errors"
"encoding/gob"
"github.com/pterodactyl/wings/internal/sqlite"
"github.com/pterodactyl/wings/server"
"github.com/pterodactyl/wings/system"
"time"
)
const querySftpActivity = `
SELECT
event,
user_uuid,
server_uuid,
ip,
GROUP_CONCAT(metadata, '::') AS metadata,
MIN(timestamp) AS first_timestamp
FROM activity_logs
WHERE event LIKE 'server:sftp.%'
GROUP BY event, STRFTIME('%Y-%m-%d %H:%M:00', DATETIME(timestamp, 'unixepoch', 'utc')), user_uuid, server_uuid, ip
LIMIT ?
`
type sftpActivityGroup struct {
Event server.Event
User string
Server string
IP string
Metadata []byte
Timestamp int64
}
// Activity takes the struct and converts it into a single activity entity to
// process and send back over to the Panel.
func (g *sftpActivityGroup) Activity() (server.Activity, error) {
m, err := g.processMetadata()
if err != nil {
return server.Activity{}, err
}
t := time.Unix(g.Timestamp, 0)
a := server.Activity{
User: g.User,
Server: g.Server,
Event: g.Event,
Metadata: m,
IP: g.IP,
Timestamp: t.UTC(),
}
return a, nil
}
// processMetadata takes all of the concatenated metadata returned by the SQL
// query and then processes it all into individual entity records before then
// merging them into a final, single metadata, object.
func (g *sftpActivityGroup) processMetadata() (server.ActivityMeta, error) {
b := bytes.Split(g.Metadata, []byte("::"))
if len(b) == 0 {
return server.ActivityMeta{}, nil
}
entities := make([]server.ActivityMeta, len(b))
for i, v := range b {
if len(v) == 0 {
continue
}
if err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(&entities[i]); err != nil {
return nil, errors.Wrap(err, "could not decode metadata bytes")
}
}
var files []interface{}
// Iterate over every entity that we've gotten back from the database's metadata fields
// and merge them all into a single entity by checking what the data type returned is and
// going from there.
//
// We only store a slice of strings, or a string/string map value in the database for SFTP
// actions, hence the case statement.
for _, e := range entities {
if e == nil {
continue
}
if f, ok := e["files"]; ok {
var a []interface{}
switch f.(type) {
case []string:
for _, v := range f.([]string) {
a = append(a, v)
}
case map[string]string:
a = append(a, f)
}
files = append(files, a)
}
}
return server.ActivityMeta{"files": files}, nil
}
type sftpActivityCron struct {
mu *system.AtomicBool
manager *server.Manager
max int64
}
// Run executes the cronjob and finds all associated SFTP events, bundles them up so
// that multiple events in the same timespan are recorded as a single event, and then
// cleans up the database.
func (sac *sftpActivityCron) Run(ctx context.Context) error {
if !sac.mu.SwapIf(true) {
return errors.WithStack(ErrCronRunning)
}
defer sac.mu.Store(false)
rows, err := sqlite.Instance().QueryContext(ctx, querySftpActivity, sac.max)
if err != nil {
return errors.Wrap(err, "cron: failed to query sftp activity")
}
defer rows.Close()
if err := rows.Err(); err != nil {
return errors.WithStack(err)
}
var out []server.Activity
for rows.Next() {
v := sftpActivityGroup{}
if err := rows.Scan(&v.Event, &v.User, &v.Server, &v.IP, &v.Metadata, &v.Timestamp); err != nil {
return errors.Wrap(err, "failed to scan row")
}
if a, err := v.Activity(); err != nil {
return errors.Wrap(err, "could not parse data into activity type")
} else {
out = append(out, a)
}
}
if len(out) == 0 {
return nil
}
if err := sac.manager.Client().SendActivityLogs(ctx, out); err != nil {
return errors.Wrap(err, "could not send activity logs to Panel")
}
return nil
}