From 61baccb1a3891bfa25e5b866a61b47cba08145a4 Mon Sep 17 00:00:00 2001 From: DaneEveritt Date: Sun, 24 Jul 2022 10:28:42 -0400 Subject: [PATCH] Push draft of sftp reconcilation details --- internal/cron/activity_cron.go | 2 +- internal/cron/cron.go | 19 +++- internal/cron/sftp_activity_cron.go | 145 ++++++++++++++++++++++++++++ internal/sqlite/database.go | 2 +- server/activity.go | 2 +- 5 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 internal/cron/sftp_activity_cron.go diff --git a/internal/cron/activity_cron.go b/internal/cron/activity_cron.go index 14cce40..59ec98f 100644 --- a/internal/cron/activity_cron.go +++ b/internal/cron/activity_cron.go @@ -79,7 +79,7 @@ func (ac *activityCron) Run(ctx context.Context) error { if len(logs) == 0 { return nil } - if err := ac.manager.Client().SendActivityLogs(context.Background(), logs); err != nil { + if err := ac.manager.Client().SendActivityLogs(ctx, logs); err != nil { return errors.WrapIf(err, "cron: failed to send activity events to Panel") } diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 04dc98b..7af15e0 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -33,9 +33,14 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error max: config.Get().System.ActivitySendCount, } + sftp := sftpActivityCron{ + mu: system.NewAtomicBool(false), + manager: m, + max: config.Get().System.ActivitySendCount, + } + s := gocron.NewScheduler(l) - // int(config.Get().System.ActivitySendInterval) - _, _ = s.Tag("activity").Every(5).Seconds().Do(func() { + _, _ = s.Tag("activity").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() { if err := activity.Run(ctx); err != nil { if errors.Is(err, ErrCronRunning) { log.WithField("cron", "activity").Warn("cron: process is already running, skipping...") @@ -45,5 +50,15 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error } }) + _, _ = s.Tag("sftp_activity").Every(5).Seconds().Do(func() { + if err := sftp.Run(ctx); err != nil { + if errors.Is(err, ErrCronRunning) { + log.WithField("cron", "sftp").Warn("cron: process is already running, skipping...") + } else { + log.WithField("error", err).Error("cron: failed to process sftp events") + } + } + }) + return s, nil } diff --git a/internal/cron/sftp_activity_cron.go b/internal/cron/sftp_activity_cron.go new file mode 100644 index 0000000..46a4170 --- /dev/null +++ b/internal/cron/sftp_activity_cron.go @@ -0,0 +1,145 @@ +package cron + +import ( + "bytes" + "context" + "emperror.dev/errors" + "encoding/gob" + "github.com/pterodactyl/wings/internal/sqlite" + "github.com/pterodactyl/wings/server" + "github.com/pterodactyl/wings/system" + "time" +) + +const querySftpActivity = ` +SELECT + event, + user_uuid, + server_uuid, + ip, + GROUP_CONCAT(metadata, '::') AS metadata, + MIN(timestamp) AS first_timestamp +FROM activity_logs +WHERE event LIKE 'server:sftp.%' +GROUP BY event, STRFTIME('%Y-%m-%d %H:%M:00', DATETIME(timestamp, 'unixepoch', 'utc')), user_uuid, server_uuid, ip +LIMIT ? +` + +type sftpActivityGroup struct { + Event server.Event + User string + Server string + IP string + Metadata []byte + Timestamp int64 +} + +// Activity takes the struct and converts it into a single activity entity to +// process and send back over to the Panel. +func (g *sftpActivityGroup) Activity() (server.Activity, error) { + m, err := g.processMetadata() + if err != nil { + return server.Activity{}, err + } + t := time.Unix(g.Timestamp, 0) + a := server.Activity{ + User: g.User, + Server: g.Server, + Event: g.Event, + Metadata: m, + IP: g.IP, + Timestamp: t.UTC(), + } + return a, nil +} + +// processMetadata takes all of the concatenated metadata returned by the SQL +// query and then processes it all into individual entity records before then +// merging them into a final, single metadata, object. +func (g *sftpActivityGroup) processMetadata() (server.ActivityMeta, error) { + b := bytes.Split(g.Metadata, []byte("::")) + if len(b) == 0 { + return server.ActivityMeta{}, nil + } + entities := make([]server.ActivityMeta, len(b)) + for i, v := range b { + if len(v) == 0 { + continue + } + if err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(&entities[i]); err != nil { + return nil, errors.Wrap(err, "could not decode metadata bytes") + } + } + var files []interface{} + // Iterate over every entity that we've gotten back from the database's metadata fields + // and merge them all into a single entity by checking what the data type returned is and + // going from there. + // + // We only store a slice of strings, or a string/string map value in the database for SFTP + // actions, hence the case statement. + for _, e := range entities { + if e == nil { + continue + } + if f, ok := e["files"]; ok { + var a []interface{} + switch f.(type) { + case []string: + for _, v := range f.([]string) { + a = append(a, v) + } + case map[string]string: + a = append(a, f) + } + files = append(files, a) + } + } + return server.ActivityMeta{"files": files}, nil +} + +type sftpActivityCron struct { + mu *system.AtomicBool + manager *server.Manager + max int64 +} + +// Run executes the cronjob and finds all associated SFTP events, bundles them up so +// that multiple events in the same timespan are recorded as a single event, and then +// cleans up the database. +func (sac *sftpActivityCron) Run(ctx context.Context) error { + if !sac.mu.SwapIf(true) { + return errors.WithStack(ErrCronRunning) + } + defer sac.mu.Store(false) + + rows, err := sqlite.Instance().QueryContext(ctx, querySftpActivity, sac.max) + if err != nil { + return errors.Wrap(err, "cron: failed to query sftp activity") + } + defer rows.Close() + + if err := rows.Err(); err != nil { + return errors.WithStack(err) + } + + var out []server.Activity + for rows.Next() { + v := sftpActivityGroup{} + if err := rows.Scan(&v.Event, &v.User, &v.Server, &v.IP, &v.Metadata, &v.Timestamp); err != nil { + return errors.Wrap(err, "failed to scan row") + } + if a, err := v.Activity(); err != nil { + return errors.Wrap(err, "could not parse data into activity type") + } else { + out = append(out, a) + } + } + + if len(out) == 0 { + return nil + } + if err := sac.manager.Client().SendActivityLogs(ctx, out); err != nil { + return errors.Wrap(err, "could not send activity logs to Panel") + } + return nil +} diff --git a/internal/sqlite/database.go b/internal/sqlite/database.go index 9ad0f18..5061f1d 100644 --- a/internal/sqlite/database.go +++ b/internal/sqlite/database.go @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS "activity_logs" ( "server_uuid" varchar NOT NULL, "metadata" blob, "ip" varchar, - "timestamp" datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + "timestamp" integer NOT NULL, PRIMARY KEY (id) ); diff --git a/server/activity.go b/server/activity.go index 4d54d9a..96b9bc4 100644 --- a/server/activity.go +++ b/server/activity.go @@ -117,7 +117,7 @@ func (a Activity) Save() error { Debug("saving activity to database") stmt := `INSERT INTO activity_logs(event, user_uuid, server_uuid, metadata, ip, timestamp) VALUES(?, ?, ?, ?, ?, ?)` - if _, err := sqlite.Instance().Exec(stmt, a.Event, a.User, a.Server, buf.Bytes(), a.IP, a.Timestamp); err != nil { + if _, err := sqlite.Instance().Exec(stmt, a.Event, a.User, a.Server, buf.Bytes(), a.IP, a.Timestamp.UTC().Unix()); err != nil { return errors.WithStack(err) } return nil