diff --git a/internal/cron/activity_cron.go b/internal/cron/activity_cron.go index 6ffd474..472e6db 100644 --- a/internal/cron/activity_cron.go +++ b/internal/cron/activity_cron.go @@ -12,16 +12,16 @@ import ( ) var key = []byte("events") -var processing system.AtomicBool +var activityCron system.AtomicBool func processActivityLogs(m *server.Manager, c int64) error { // Don't execute this cron if there is currently one running. Once this task is completed // go ahead and mark it as no longer running. - if !processing.SwapIf(true) { - log.WithField("subsystem", "cron").Warn("cron: process overlap detected, skipping this run") + if !activityCron.SwapIf(true) { + log.WithField("subsystem", "cron").WithField("cron", "activity_logs").Warn("cron: process overlap detected, skipping this run") return nil } - defer processing.Store(false) + defer activityCron.Store(false) var list [][]byte err := database.DB().View(func(tx *nutsdb.Tx) error { @@ -30,6 +30,9 @@ func processActivityLogs(m *server.Manager, c int64) error { // release the lock on this process. end := int(c) if s, err := tx.LSize(database.ServerActivityBucket, key); err != nil { + if errors.Is(err, nutsdb.ErrBucket) { + return nil + } return errors.WithStackIf(err) } else if s < end || s == 0 { if s == 0 { diff --git a/internal/cron/cron.go b/internal/cron/cron.go index be3de53..44ebf24 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -10,16 +10,17 @@ import ( "time" ) +const ErrCronRunning = errors.Sentinel("cron: job already running") + var o system.AtomicBool // Scheduler configures the internal cronjob system for Wings and returns the scheduler // instance to the caller. This should only be called once per application lifecycle, additional // calls will result in an error being returned. func Scheduler(m *server.Manager) (*gocron.Scheduler, error) { - if o.Load() { + if !o.SwapIf(true) { return nil, errors.New("cron: cannot call scheduler more than once in application lifecycle") } - o.Store(true) l, err := time.LoadLocation(config.Get().System.Timezone) if err != nil { return nil, errors.Wrap(err, "cron: failed to parse configured system timezone") @@ -32,5 +33,16 @@ func Scheduler(m *server.Manager) (*gocron.Scheduler, error) { } }) + _, _ = s.Tag("sftp").Every(20).Seconds().Do(func() { + runner := sftpEventProcessor{mu: system.NewAtomicBool(false), manager: m} + if err := runner.Run(); err != nil { + if errors.Is(err, ErrCronRunning) { + log.WithField("cron", "sftp_events").Warn("cron: job already running, skipping...") + } else { + log.WithField("error", err).Error("cron: failed to process sftp events") + } + } + }) + return s, nil } diff --git a/internal/cron/sftp_cron.go b/internal/cron/sftp_cron.go new file mode 100644 index 0000000..bd05d85 --- /dev/null +++ b/internal/cron/sftp_cron.go @@ -0,0 +1,174 @@ +package cron + +import ( + "bytes" + "emperror.dev/errors" + "encoding/gob" + "github.com/pterodactyl/wings/internal/database" + "github.com/pterodactyl/wings/server" + "github.com/pterodactyl/wings/sftp" + "github.com/pterodactyl/wings/system" + "github.com/xujiajun/nutsdb" + "path/filepath" +) + +type UserDetail struct { + UUID string + IP string +} + +type Users map[UserDetail][]sftp.EventRecord +type Events map[sftp.Event]Users + +type sftpEventProcessor struct { + mu *system.AtomicBool + manager *server.Manager +} + +// Run executes the cronjob and processes sftp activities into normal activity log entries +// by merging together similar records. This helps to reduce the sheer amount of data that +// gets passed back to the Panel and provides simpler activity logging. +func (sep *sftpEventProcessor) Run() error { + if !sep.mu.SwapIf(true) { + return errors.WithStack(ErrCronRunning) + } + defer sep.mu.Store(false) + + set, err := sep.Events() + if err != nil { + return err + } + + for s, el := range set { + events := make(Events) + // Take all of the events that we've pulled out of the system for every server and then + // parse them into a more usable format in order to create activity log entries for each + // user, ip, and server instance. + for _, e := range el { + u := UserDetail{UUID: e.User, IP: e.IP} + if _, ok := events[e.Event]; !ok { + events[e.Event] = make(Users) + } + if _, ok := events[e.Event][u]; !ok { + events[e.Event][u] = []sftp.EventRecord{} + } + events[e.Event][u] = append(events[e.Event][u], e) + } + + // Now that we have all of the events, go ahead and create a normal activity log entry + // for each instance grouped by user & IP for easier Panel reporting. + for k, v := range events { + for u, records := range v { + files := make([]interface{}, len(records)) + for i, r := range records { + if r.Action.Target != "" { + files[i] = map[string]string{ + "from": filepath.Clean(r.Action.Entity), + "to": filepath.Clean(r.Action.Target), + } + } else { + files[i] = filepath.Clean(r.Action.Entity) + } + } + + entry := server.Activity{ + Server: s, + User: u.UUID, + Event: server.Event("server:sftp." + string(k)), + Metadata: server.ActivityMeta{"files": files}, + IP: u.IP, + // Just assume that the first record in the set is the oldest and the most relevant + // of the timestamps to use. + Timestamp: records[0].Timestamp, + } + + if err := entry.Save(); err != nil { + return errors.Wrap(err, "cron: failed to save new event for server") + } + + if err := sep.Cleanup([]byte(s)); err != nil { + return errors.Wrap(err, "cron: failed to cleanup events") + } + } + } + } + + return nil +} + +// Cleanup runs through all of the events we have currently tracked in the bucket and removes +// them once we've managed to process them and created the associated server activity events. +func (sep *sftpEventProcessor) Cleanup(key []byte) error { + return database.DB().Update(func(tx *nutsdb.Tx) error { + s, err := sep.sizeOf(tx, key) + if err != nil { + return err + } + if s == 0 { + return nil + } else if s < sep.limit() { + for i := 0; i < s; i++ { + if _, err := tx.LPop(database.SftpActivityBucket, key); err != nil { + return errors.WithStack(err) + } + } + } else { + if err := tx.LTrim(database.ServerActivityBucket, key, sep.limit()-1, -1); err != nil { + return errors.WithStack(err) + } + } + return nil + }) +} + +// Events pulls all of the events in the SFTP event bucket and parses them into an iterable +// set allowing Wings to process the events and send them back to the Panel instance. +func (sep *sftpEventProcessor) Events() (map[string][]sftp.EventRecord, error) { + set := make(map[string][]sftp.EventRecord, len(sep.manager.Keys())) + err := database.DB().View(func(tx *nutsdb.Tx) error { + for _, k := range sep.manager.Keys() { + lim := sep.limit() + if s, err := sep.sizeOf(tx, []byte(k)); err != nil { + return err + } else if s == 0 { + continue + } else if s < lim { + lim = -1 + } + list, err := tx.LRange(database.SftpActivityBucket, []byte(k), 0, lim) + if err != nil { + return errors.WithStack(err) + } + set[k] = make([]sftp.EventRecord, len(list)) + for i, l := range list { + if err := gob.NewDecoder(bytes.NewBuffer(l)).Decode(&set[k][i]); err != nil { + return errors.WithStack(err) + } + } + } + return nil + }) + + return set, err +} + +// sizeOf is a wrapper around a nutsdb transaction to get the size of a key in the +// bucket while also accounting for some expected error conditions and handling those +// automatically. +func (sep *sftpEventProcessor) sizeOf(tx *nutsdb.Tx, key []byte) (int, error) { + s, err := tx.LSize(database.SftpActivityBucket, key) + if err != nil { + if errors.Is(err, nutsdb.ErrBucket) { + return 0, nil + } + return 0, errors.WithStack(err) + } + return s, nil +} + +// limit returns the number of records that are processed for each server at +// once. This will then be translated into a variable number of activity log +// events, with the worst case being a single event with "n" associated files. +func (sep *sftpEventProcessor) limit() int { + return 500 +} diff --git a/internal/database/database.go b/internal/database/database.go index 27180d9..e8c994c 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -14,6 +14,7 @@ var syncer sync.Once const ( ServerActivityBucket = "server_activity" + SftpActivityBucket = "sftp_activity" ) func initialize() error { diff --git a/server/activity.go b/server/activity.go index 309ce21..75df7aa 100644 --- a/server/activity.go +++ b/server/activity.go @@ -82,6 +82,10 @@ func (ra RequestActivity) IP() string { return ra.ip } +func (ra *RequestActivity) User() string { + return ra.user +} + // SetUser clones the RequestActivity struct and sets a new user value on the copy // before returning it. func (ra RequestActivity) SetUser(u string) RequestActivity { diff --git a/server/manager.go b/server/manager.go index 8b3ed2d..6733b35 100644 --- a/server/manager.go +++ b/server/manager.go @@ -52,6 +52,24 @@ func (m *Manager) Client() remote.Client { return m.client } +// Len returns the count of servers stored in the manager instance. +func (m *Manager) Len() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.servers) +} + +// Keys returns all of the server UUIDs stored in the manager set. +func (m *Manager) Keys() []string { + m.mu.RLock() + defer m.mu.RUnlock() + keys := make([]string, len(m.servers)) + for i, s := range m.servers { + keys[i] = s.ID() + } + return keys +} + // Put replaces all the current values in the collection with the value that // is passed through. func (m *Manager) Put(s []*Server) { diff --git a/sftp/event.go b/sftp/event.go new file mode 100644 index 0000000..1b2e2f2 --- /dev/null +++ b/sftp/event.go @@ -0,0 +1,82 @@ +package sftp + +import ( + "bytes" + "emperror.dev/errors" + "encoding/gob" + "github.com/apex/log" + "github.com/pterodactyl/wings/internal/database" + "github.com/xujiajun/nutsdb" + "regexp" + "time" +) + +type eventHandler struct { + ip string + user string + server string +} + +type Event string +type FileAction struct { + // Entity is the targeted file or directory (depending on the event) that the action + // is being performed _against_, such as "/foo/test.txt". This will always be the full + // path to the element. + Entity string + // Target is an optional (often blank) field that only has a value in it when the event + // is specifically modifying the entity, such as a rename or move event. In that case + // the Target field will be the final value, such as "/bar/new.txt" + Target string +} + +type EventRecord struct { + Event Event + Action FileAction + IP string + User string + Timestamp time.Time +} + +const ( + EventWrite = Event("write") + EventCreate = Event("create") + EventCreateDirectory = Event("create-directory") + EventRename = Event("rename") + EventDelete = Event("delete") +) + +var ipTrimRegex = regexp.MustCompile(`(:\d*)?$`) + +// Log logs an event into the Wings bucket for SFTP activity which then allows a seperate +// cron to run and parse the events into a more manageable stream of event data to send +// back to the Panel instance. +func (eh *eventHandler) Log(e Event, fa FileAction) error { + r := EventRecord{ + Event: e, + Action: fa, + IP: ipTrimRegex.ReplaceAllString(eh.ip, ""), + User: eh.user, + Timestamp: time.Now().UTC(), + } + + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(r); err != nil { + return errors.Wrap(err, "sftp: failed to encode event") + } + + return database.DB().Update(func(tx *nutsdb.Tx) error { + if err := tx.RPush(database.SftpActivityBucket, []byte(eh.server), buf.Bytes()); err != nil { + return errors.Wrap(err, "sftp: failed to push event to stack") + } + return nil + }) +} + +// MustLog is a wrapper around log that will trigger a fatal error and exit the application +// if an error is encountered during the logging of the event. +func (eh *eventHandler) MustLog(e Event, fa FileAction) { + if err := eh.Log(e, fa); err != nil { + log.WithField("error", err).Fatal("sftp: failed to log event") + } +} diff --git a/sftp/handler.go b/sftp/handler.go index 5c2535d..e2f4e1b 100644 --- a/sftp/handler.go +++ b/sftp/handler.go @@ -26,16 +26,12 @@ const ( PermissionFileDelete = "file.delete" ) -type handlerMeta struct { - ra server.RequestActivity - server *server.Server -} - type Handler struct { mu sync.Mutex - meta handlerMeta - permissions []string + server *server.Server fs *filesystem.Filesystem + events *eventHandler + permissions []string logger *log.Entry ro bool } @@ -48,19 +44,23 @@ func NewHandler(sc *ssh.ServerConn, srv *server.Server) (*Handler, error) { return nil, errors.New("sftp: mismatched Wings and Panel versions — Panel 1.10 is required for this version of Wings.") } + events := eventHandler{ + ip: sc.RemoteAddr().String(), + user: uuid, + server: srv.ID(), + } + return &Handler{ permissions: strings.Split(sc.Permissions.Extensions["permissions"], ","), - meta: handlerMeta{ - server: srv, - ra: srv.NewRequestActivity(uuid, sc.RemoteAddr().String()), - }, - fs: srv.Filesystem(), - ro: config.Get().System.Sftp.ReadOnly, - logger: log.WithFields(log.Fields{"subsystem": "sftp", "user": uuid, "ip": sc.RemoteAddr()}), + server: srv, + fs: srv.Filesystem(), + events: &events, + ro: config.Get().System.Sftp.ReadOnly, + logger: log.WithFields(log.Fields{"subsystem": "sftp", "user": uuid, "ip": sc.RemoteAddr()}), }, nil } -// Returns the sftp.Handlers for this struct. +// Handlers returns the sftp.Handlers for this struct. func (h *Handler) Handlers() sftp.Handlers { return sftp.Handlers{ FileGet: h, @@ -88,7 +88,6 @@ func (h *Handler) Fileread(request *sftp.Request) (io.ReaderAt, error) { } return nil, sftp.ErrSSHFxNoSuchFile } - h.event(server.ActivityFileRead, server.ActivityMeta{"file": filepath.Clean(request.Filepath)}) return f, nil } @@ -131,7 +130,11 @@ func (h *Handler) Filewrite(request *sftp.Request) (io.WriterAt, error) { // Chown may or may not have been called in the touch function, so always do // it at this point to avoid the file being improperly owned. _ = h.fs.Chown(request.Filepath) - h.event(server.ActivityFileWrite, server.ActivityMeta{"file": filepath.Clean(request.Filepath)}) + event := EventWrite + if permission == PermissionFileCreate { + event = EventCreate + } + h.events.MustLog(event, FileAction{Entity: request.Filepath}) return f, nil } @@ -175,21 +178,14 @@ func (h *Handler) Filecmd(request *sftp.Request) error { if !h.can(PermissionFileUpdate) { return sftp.ErrSSHFxPermissionDenied } - p := filepath.Clean(request.Filepath) - t := filepath.Clean(request.Target) - if err := h.fs.Rename(p, t); err != nil { + if err := h.fs.Rename(request.Filepath, request.Target); err != nil { if errors.Is(err, os.ErrNotExist) { return sftp.ErrSSHFxNoSuchFile } l.WithField("error", err).Error("failed to rename file") return sftp.ErrSSHFxFailure } - h.event(server.ActivityFileRename, server.ActivityMeta{ - "directory": filepath.Dir(p), - "files": []map[string]string{ - {"from": filepath.Base(p), "to": t}, - }, - }) + h.events.MustLog(EventRename, FileAction{Entity: request.Filepath, Target: request.Target}) break // Handle deletion of a directory. This will properly delete all of the files and // folders within that directory if it is not already empty (unlike a lot of SFTP @@ -203,10 +199,7 @@ func (h *Handler) Filecmd(request *sftp.Request) error { l.WithField("error", err).Error("failed to remove directory") return sftp.ErrSSHFxFailure } - h.event(server.ActivityFileDeleted, server.ActivityMeta{ - "directory": filepath.Dir(p), - "files": []string{filepath.Base(p)}, - }) + h.events.MustLog(EventDelete, FileAction{Entity: request.Filepath}) return sftp.ErrSSHFxOk // Handle requests to create a new Directory. case "Mkdir": @@ -219,10 +212,7 @@ func (h *Handler) Filecmd(request *sftp.Request) error { l.WithField("error", err).Error("failed to create directory") return sftp.ErrSSHFxFailure } - h.event(server.ActivityFileCreateDirectory, server.ActivityMeta{ - "directory": p, - "name": name[len(name)-1], - }) + h.events.MustLog(EventCreateDirectory, FileAction{Entity: request.Filepath}) break // Support creating symlinks between files. The source and target must resolve within // the server home directory. @@ -248,18 +238,14 @@ func (h *Handler) Filecmd(request *sftp.Request) error { if !h.can(PermissionFileDelete) { return sftp.ErrSSHFxPermissionDenied } - p := filepath.Clean(request.Filepath) - if err := h.fs.Delete(p); err != nil { + if err := h.fs.Delete(request.Filepath); err != nil { if errors.Is(err, os.ErrNotExist) { return sftp.ErrSSHFxNoSuchFile } l.WithField("error", err).Error("failed to remove a file") return sftp.ErrSSHFxFailure } - h.event(server.ActivityFileDeleted, server.ActivityMeta{ - "directory": filepath.Dir(p), - "files": []string{filepath.Base(p)}, - }) + h.events.MustLog(EventDelete, FileAction{Entity: request.Filepath}) return sftp.ErrSSHFxOk default: return sftp.ErrSSHFxOpUnsupported @@ -316,7 +302,7 @@ func (h *Handler) Filelist(request *sftp.Request) (sftp.ListerAt, error) { // Determines if a user has permission to perform a specific action on the SFTP server. These // permissions are defined and returned by the Panel API. func (h *Handler) can(permission string) bool { - if h.meta.server.IsSuspended() { + if h.server.IsSuspended() { return false } for _, p := range h.permissions { @@ -328,16 +314,3 @@ func (h *Handler) can(permission string) bool { } return false } - -// event is a wrapper around the server.RequestActivity struct for this handler to -// make logging events a little less noisy for SFTP. This also tags every event logged -// using it with a "{using_sftp: true}" metadata field to make this easier to understand -// in the Panel's activity logs. -func (h *Handler) event(event server.Event, metadata server.ActivityMeta) { - m := metadata - if m == nil { - m = make(map[string]interface{}) - } - m["using_sftp"] = true - _ = h.meta.ra.Save(h.meta.server, event, m) -} diff --git a/wings.go b/wings.go index 52ea91b..416b23e 100644 --- a/wings.go +++ b/wings.go @@ -1,12 +1,16 @@ package main import ( + "encoding/gob" "github.com/pterodactyl/wings/cmd" + "github.com/pterodactyl/wings/sftp" "math/rand" "time" ) func main() { + gob.Register(sftp.EventRecord{}) + // Since we make use of the math/rand package in the code, especially for generating // non-cryptographically secure random strings we need to seed the RNG. Just make use // of the current time for this.