Push draft of sftp reconcilation details
This commit is contained in:
parent
7bd11c1c28
commit
61baccb1a3
|
@ -79,7 +79,7 @@ func (ac *activityCron) Run(ctx context.Context) error {
|
||||||
if len(logs) == 0 {
|
if len(logs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := ac.manager.Client().SendActivityLogs(context.Background(), logs); err != nil {
|
if err := ac.manager.Client().SendActivityLogs(ctx, logs); err != nil {
|
||||||
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
|
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,9 +33,14 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error
|
||||||
max: config.Get().System.ActivitySendCount,
|
max: config.Get().System.ActivitySendCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sftp := sftpActivityCron{
|
||||||
|
mu: system.NewAtomicBool(false),
|
||||||
|
manager: m,
|
||||||
|
max: config.Get().System.ActivitySendCount,
|
||||||
|
}
|
||||||
|
|
||||||
s := gocron.NewScheduler(l)
|
s := gocron.NewScheduler(l)
|
||||||
// int(config.Get().System.ActivitySendInterval)
|
_, _ = s.Tag("activity").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() {
|
||||||
_, _ = s.Tag("activity").Every(5).Seconds().Do(func() {
|
|
||||||
if err := activity.Run(ctx); err != nil {
|
if err := activity.Run(ctx); err != nil {
|
||||||
if errors.Is(err, ErrCronRunning) {
|
if errors.Is(err, ErrCronRunning) {
|
||||||
log.WithField("cron", "activity").Warn("cron: process is already running, skipping...")
|
log.WithField("cron", "activity").Warn("cron: process is already running, skipping...")
|
||||||
|
@ -45,5 +50,15 @@ func Scheduler(ctx context.Context, m *server.Manager) (*gocron.Scheduler, error
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
_, _ = s.Tag("sftp_activity").Every(5).Seconds().Do(func() {
|
||||||
|
if err := sftp.Run(ctx); err != nil {
|
||||||
|
if errors.Is(err, ErrCronRunning) {
|
||||||
|
log.WithField("cron", "sftp").Warn("cron: process is already running, skipping...")
|
||||||
|
} else {
|
||||||
|
log.WithField("error", err).Error("cron: failed to process sftp events")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
145
internal/cron/sftp_activity_cron.go
Normal file
145
internal/cron/sftp_activity_cron.go
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
package cron
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"emperror.dev/errors"
|
||||||
|
"encoding/gob"
|
||||||
|
"github.com/pterodactyl/wings/internal/sqlite"
|
||||||
|
"github.com/pterodactyl/wings/server"
|
||||||
|
"github.com/pterodactyl/wings/system"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const querySftpActivity = `
|
||||||
|
SELECT
|
||||||
|
event,
|
||||||
|
user_uuid,
|
||||||
|
server_uuid,
|
||||||
|
ip,
|
||||||
|
GROUP_CONCAT(metadata, '::') AS metadata,
|
||||||
|
MIN(timestamp) AS first_timestamp
|
||||||
|
FROM activity_logs
|
||||||
|
WHERE event LIKE 'server:sftp.%'
|
||||||
|
GROUP BY event, STRFTIME('%Y-%m-%d %H:%M:00', DATETIME(timestamp, 'unixepoch', 'utc')), user_uuid, server_uuid, ip
|
||||||
|
LIMIT ?
|
||||||
|
`
|
||||||
|
|
||||||
|
type sftpActivityGroup struct {
|
||||||
|
Event server.Event
|
||||||
|
User string
|
||||||
|
Server string
|
||||||
|
IP string
|
||||||
|
Metadata []byte
|
||||||
|
Timestamp int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Activity takes the struct and converts it into a single activity entity to
|
||||||
|
// process and send back over to the Panel.
|
||||||
|
func (g *sftpActivityGroup) Activity() (server.Activity, error) {
|
||||||
|
m, err := g.processMetadata()
|
||||||
|
if err != nil {
|
||||||
|
return server.Activity{}, err
|
||||||
|
}
|
||||||
|
t := time.Unix(g.Timestamp, 0)
|
||||||
|
a := server.Activity{
|
||||||
|
User: g.User,
|
||||||
|
Server: g.Server,
|
||||||
|
Event: g.Event,
|
||||||
|
Metadata: m,
|
||||||
|
IP: g.IP,
|
||||||
|
Timestamp: t.UTC(),
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processMetadata takes all of the concatenated metadata returned by the SQL
|
||||||
|
// query and then processes it all into individual entity records before then
|
||||||
|
// merging them into a final, single metadata, object.
|
||||||
|
func (g *sftpActivityGroup) processMetadata() (server.ActivityMeta, error) {
|
||||||
|
b := bytes.Split(g.Metadata, []byte("::"))
|
||||||
|
if len(b) == 0 {
|
||||||
|
return server.ActivityMeta{}, nil
|
||||||
|
}
|
||||||
|
entities := make([]server.ActivityMeta, len(b))
|
||||||
|
for i, v := range b {
|
||||||
|
if len(v) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(&entities[i]); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not decode metadata bytes")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var files []interface{}
|
||||||
|
// Iterate over every entity that we've gotten back from the database's metadata fields
|
||||||
|
// and merge them all into a single entity by checking what the data type returned is and
|
||||||
|
// going from there.
|
||||||
|
//
|
||||||
|
// We only store a slice of strings, or a string/string map value in the database for SFTP
|
||||||
|
// actions, hence the case statement.
|
||||||
|
for _, e := range entities {
|
||||||
|
if e == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if f, ok := e["files"]; ok {
|
||||||
|
var a []interface{}
|
||||||
|
switch f.(type) {
|
||||||
|
case []string:
|
||||||
|
for _, v := range f.([]string) {
|
||||||
|
a = append(a, v)
|
||||||
|
}
|
||||||
|
case map[string]string:
|
||||||
|
a = append(a, f)
|
||||||
|
}
|
||||||
|
files = append(files, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return server.ActivityMeta{"files": files}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type sftpActivityCron struct {
|
||||||
|
mu *system.AtomicBool
|
||||||
|
manager *server.Manager
|
||||||
|
max int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run executes the cronjob and finds all associated SFTP events, bundles them up so
|
||||||
|
// that multiple events in the same timespan are recorded as a single event, and then
|
||||||
|
// cleans up the database.
|
||||||
|
func (sac *sftpActivityCron) Run(ctx context.Context) error {
|
||||||
|
if !sac.mu.SwapIf(true) {
|
||||||
|
return errors.WithStack(ErrCronRunning)
|
||||||
|
}
|
||||||
|
defer sac.mu.Store(false)
|
||||||
|
|
||||||
|
rows, err := sqlite.Instance().QueryContext(ctx, querySftpActivity, sac.max)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "cron: failed to query sftp activity")
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var out []server.Activity
|
||||||
|
for rows.Next() {
|
||||||
|
v := sftpActivityGroup{}
|
||||||
|
if err := rows.Scan(&v.Event, &v.User, &v.Server, &v.IP, &v.Metadata, &v.Timestamp); err != nil {
|
||||||
|
return errors.Wrap(err, "failed to scan row")
|
||||||
|
}
|
||||||
|
if a, err := v.Activity(); err != nil {
|
||||||
|
return errors.Wrap(err, "could not parse data into activity type")
|
||||||
|
} else {
|
||||||
|
out = append(out, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := sac.manager.Client().SendActivityLogs(ctx, out); err != nil {
|
||||||
|
return errors.Wrap(err, "could not send activity logs to Panel")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS "activity_logs" (
|
||||||
"server_uuid" varchar NOT NULL,
|
"server_uuid" varchar NOT NULL,
|
||||||
"metadata" blob,
|
"metadata" blob,
|
||||||
"ip" varchar,
|
"ip" varchar,
|
||||||
"timestamp" datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
"timestamp" integer NOT NULL,
|
||||||
PRIMARY KEY (id)
|
PRIMARY KEY (id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ func (a Activity) Save() error {
|
||||||
Debug("saving activity to database")
|
Debug("saving activity to database")
|
||||||
|
|
||||||
stmt := `INSERT INTO activity_logs(event, user_uuid, server_uuid, metadata, ip, timestamp) VALUES(?, ?, ?, ?, ?, ?)`
|
stmt := `INSERT INTO activity_logs(event, user_uuid, server_uuid, metadata, ip, timestamp) VALUES(?, ?, ?, ?, ?, ?)`
|
||||||
if _, err := sqlite.Instance().Exec(stmt, a.Event, a.User, a.Server, buf.Bytes(), a.IP, a.Timestamp); err != nil {
|
if _, err := sqlite.Instance().Exec(stmt, a.Event, a.User, a.Server, buf.Bytes(), a.IP, a.Timestamp.UTC().Unix()); err != nil {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue
Block a user