From 4634c931829938ffe7eeb4fca4611fe9c9b151ed Mon Sep 17 00:00:00 2001 From: DaneEveritt Date: Sun, 24 Jul 2022 14:40:06 -0400 Subject: [PATCH] Add cron to handle parsing SFTP events --- internal/cron/cron.go | 22 ++++- internal/cron/sftp_cron.go | 180 +++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 3 deletions(-) create mode 100644 internal/cron/sftp_cron.go diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 05187c9..a754e0b 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -33,13 +33,29 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error max: config.Get().System.ActivitySendCount, } + sftp := sftpCron{ + mu: system.NewAtomicBool(false), + manager: m, + max: config.Get().System.ActivitySendCount, + } + s := gocron.NewScheduler(l) - _, _ = s.Tag("activity").Every(5).Seconds().Do(func() { + _, _ = s.Tag("activity").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() { if err := activity.Run(ctx); err != nil { if errors.Is(err, ErrCronRunning) { - log.WithField("cron", "activity").Warn("cron: process is already running, skipping...") + log.WithField("cron", "activity").Warn("activity process is already running, skipping...") } else { - log.WithField("error", err).Error("cron: failed to process activity events") + log.WithField("cron", "activity").WithField("error", err).Error("activity process failed to execute") + } + } + }) + + _, _ = s.Tag("sftp").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() { + if err := sftp.Run(ctx); err != nil { + if errors.Is(err, ErrCronRunning) { + log.WithField("cron", "sftp").Warn("sftp events process already running, skipping...") + } else { + log.WithField("cron", "sftp").WithField("error", err).Error("sftp events process failed to execute") } } }) diff --git a/internal/cron/sftp_cron.go b/internal/cron/sftp_cron.go new file mode 100644 index 0000000..f11bd8c --- /dev/null +++ b/internal/cron/sftp_cron.go @@ -0,0 +1,180 @@ +package cron + +import ( + "context" + "emperror.dev/errors" + "github.com/pterodactyl/wings/internal/database" + "github.com/pterodactyl/wings/internal/models" + "github.com/pterodactyl/wings/server" + "github.com/pterodactyl/wings/system" + "gorm.io/gorm" + "reflect" +) + +type sftpCron struct { + mu *system.AtomicBool + manager *server.Manager + max int64 +} + +type mapKey struct { + User string + Server string + IP string + Event models.Event + Timestamp string +} + +type eventMap struct { + max int + ids []int + m map[mapKey]*models.Activity +} + +// Run executes the SFTP reconciliation cron. This job will pull all of the SFTP specific events +// and merge them together across user, server, ip, and event. This allows a SFTP event that deletes +// tens or hundreds of files to be tracked as a single "deletion" event so long as they all occur +// within the same one minute period of time (starting at the first timestamp for the group). Without +// this we'd end up flooding the Panel event log with excessive data that is of no use to end users. +func (sc *sftpCron) Run(ctx context.Context) error { + if !sc.mu.SwapIf(true) { + return errors.WithStack(ErrCronRunning) + } + defer sc.mu.Store(false) + + var o int + activity, err := sc.fetchRecords(ctx, o) + if err != nil { + return err + } + o += len(activity) + + events := &eventMap{ + m: map[mapKey]*models.Activity{}, + ids: []int{}, + max: int(sc.max), + } + + for { + if len(activity) == 0 { + break + } + slen := len(events.ids) + for _, a := range activity { + events.Push(a) + } + if len(events.ids) > slen { + // Execute the query again, we found some events so we want to continue + // with this. Start at the next offset. + activity, err = sc.fetchRecords(ctx, o) + if err != nil { + return errors.WithStack(err) + } + o += len(activity) + } else { + break + } + } + + if len(events.m) == 0 { + return nil + } + + err = database.Instance().Transaction(func(tx *gorm.DB) error { + tx.Where("id IN ?", events.ids).Delete(&models.Activity{}) + if tx.Error != nil { + return tx.Error + } + + return sc.manager.Client().SendActivityLogs(ctx, events.Elements()) + }) + + return errors.WithStack(err) +} + +// fetchRecords returns a group of activity events starting at the given offset. This is used +// since we might need to make multiple database queries to select enough events to properly +// fill up our request to the given maximum. This is due to the fact that this cron merges any +// activity that line up across user, server, ip, and event into a single activity record when +// sending the data to the Panel. +func (sc *sftpCron) fetchRecords(ctx context.Context, offset int) (activity []models.Activity, err error) { + tx := database.Instance().WithContext(ctx). + Where("event LIKE ?", "server:sftp.%"). + Order("event DESC"). + Offset(offset). + Limit(int(sc.max)). + Find(&activity) + if tx.Error != nil { + err = errors.WithStack(tx.Error) + } + return +} + +// Push adds an activity to the event mapping, or de-duplicates it and merges the files metadata +// into the existing entity that exists. +func (em *eventMap) Push(a models.Activity) { + m := em.forActivity(a) + // If no activity entity is returned we've hit the cap for the number of events to + // send along to the Panel. Just skip over this record and we'll account for it in + // the next iteration. + if m == nil { + return + } + em.ids = append(em.ids, a.ID) + // Always reduce this to the first timestamp that was recorded for the set + // of events, and not + if a.Timestamp.Before(m.Timestamp) { + m.Timestamp = a.Timestamp + } + list := m.Metadata["files"].([]interface{}) + if s, ok := a.Metadata["files"]; ok { + v := reflect.ValueOf(s) + if v.Kind() != reflect.Slice || v.IsNil() { + return + } + for i := 0; i < v.Len(); i++ { + list = append(list, v.Index(i).Interface()) + } + // You must set it again at the end of the process, otherwise you've only updated the file + // slice in this one loop since it isn't passed by reference. This is just shorter than having + // to explicitly keep casting it to the slice. + m.Metadata["files"] = list + } +} + +// Elements returns the finalized activity models. +func (em *eventMap) Elements() (out []models.Activity) { + for _, v := range em.m { + out = append(out, *v) + } + return +} + +// forActivity returns an event entity from our map which allows existing matches to be +// updated with additional files. +func (em *eventMap) forActivity(a models.Activity) *models.Activity { + key := mapKey{ + User: a.User.String, + Server: a.Server, + IP: a.IP, + Event: a.Event, + // We group by the minute, don't care about the seconds for this logic. + Timestamp: a.Timestamp.Format("2006-01-02_15:04"), + } + if v, ok := em.m[key]; ok { + return v + } + // Cap the size of the events map at the defined maximum events to send to the Panel. Just + // return nil and let the caller handle it. + if len(em.m) >= em.max { + return nil + } + // Doesn't exist in our map yet, create a copy of the activity passed into this + // function and then assign it into the map with an empty metadata value. + v := a + v.Metadata = models.ActivityMeta{ + "files": make([]interface{}, 0), + } + em.m[key] = &v + return &v +}