Finalize activity event sending logic and cron config
This commit is contained in:
parent
9830387f21
commit
f5baab4e88
|
@ -163,6 +163,15 @@ type SystemConfiguration struct {
|
||||||
// disk usage is not a concern.
|
// disk usage is not a concern.
|
||||||
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
|
DiskCheckInterval int64 `default:"150" yaml:"disk_check_interval"`
|
||||||
|
|
||||||
|
// ActivitySendInterval is the amount of time that should ellapse between aggregated server activity
|
||||||
|
// being sent to the Panel. By default this will send activity collected over the last minute. Keep
|
||||||
|
// in mind that only a fixed number of activity log entries, defined by ActivitySendCount, will be sent
|
||||||
|
// in each run.
|
||||||
|
ActivitySendInterval int64 `default:"60" yaml:"activity_send_interval"`
|
||||||
|
|
||||||
|
// ActivitySendCount is the number of activity events to send per batch.
|
||||||
|
ActivitySendCount int64 `default:"100" yaml:"activity_send_count"`
|
||||||
|
|
||||||
// If set to true, file permissions for a server will be checked when the process is
|
// If set to true, file permissions for a server will be checked when the process is
|
||||||
// booted. This can cause boot delays if the server has a large amount of files. In most
|
// booted. This can cause boot delays if the server has a large amount of files. In most
|
||||||
// cases disabling this should not have any major impact unless external processes are
|
// cases disabling this should not have any major impact unless external processes are
|
||||||
|
|
|
@ -11,9 +11,10 @@ import (
|
||||||
"github.com/xujiajun/nutsdb"
|
"github.com/xujiajun/nutsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var key = []byte("events")
|
||||||
var processing system.AtomicBool
|
var processing system.AtomicBool
|
||||||
|
|
||||||
func processActivityLogs(m *server.Manager) error {
|
func processActivityLogs(m *server.Manager, c int64) error {
|
||||||
// Don't execute this cron if there is currently one running. Once this task is completed
|
// Don't execute this cron if there is currently one running. Once this task is completed
|
||||||
// go ahead and mark it as no longer running.
|
// go ahead and mark it as no longer running.
|
||||||
if !processing.SwapIf(true) {
|
if !processing.SwapIf(true) {
|
||||||
|
@ -26,7 +27,16 @@ func processActivityLogs(m *server.Manager) error {
|
||||||
// Grab the oldest 100 activity events that have been logged and send them back to the
|
// Grab the oldest 100 activity events that have been logged and send them back to the
|
||||||
// Panel for processing. Once completed, delete those events from the database and then
|
// Panel for processing. Once completed, delete those events from the database and then
|
||||||
// release the lock on this process.
|
// release the lock on this process.
|
||||||
l, err := tx.LRange(database.ServerActivityBucket, []byte("events"), 0, 1)
|
end := int(c)
|
||||||
|
if s, err := tx.LSize(database.ServerActivityBucket, key); err != nil {
|
||||||
|
return errors.WithStackIf(err)
|
||||||
|
} else if s < end || s == 0 {
|
||||||
|
if s == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
end = s
|
||||||
|
}
|
||||||
|
l, err := tx.LRange(database.ServerActivityBucket, key, 0, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This error is returned when the bucket doesn't exist, which is likely on the
|
// This error is returned when the bucket doesn't exist, which is likely on the
|
||||||
// first invocations of Wings since we haven't yet logged any data. There is nothing
|
// first invocations of Wings since we haven't yet logged any data. There is nothing
|
||||||
|
@ -59,6 +69,30 @@ func processActivityLogs(m *server.Manager) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
return database.DB().Update(func(tx *nutsdb.Tx) error {
|
return database.DB().Update(func(tx *nutsdb.Tx) error {
|
||||||
return tx.LTrim(database.ServerActivityBucket, []byte("events"), 2, -1)
|
if m, err := tx.LSize(database.ServerActivityBucket, key); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
} else if m > len(list) {
|
||||||
|
// As long as there are more elements than we have in the length of our list
|
||||||
|
// we can just use the existing `LTrim` functionality of nutsdb. This will remove
|
||||||
|
// all of the values we've already pulled and sent to the API.
|
||||||
|
return errors.WithStack(tx.LTrim(database.ServerActivityBucket, key, len(list), -1))
|
||||||
|
} else {
|
||||||
|
i := 0
|
||||||
|
// This is the only way I can figure out to actually empty the items out of the list
|
||||||
|
// because you cannot use `LTrim` (or I cannot for the life of me figure out how) to
|
||||||
|
// trim the slice down to 0 items without it triggering an internal logic error. Perhaps
|
||||||
|
// in a future release they'll have a function to do this (based on my skimming of issues
|
||||||
|
// on GitHub that I cannot read due to translation barriers).
|
||||||
|
for {
|
||||||
|
if i >= m {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if _, err := tx.LPop(database.ServerActivityBucket, key); err != nil {
|
||||||
|
return errors.WithStack(err)
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,8 @@ func Scheduler(m *server.Manager) (*gocron.Scheduler, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s := gocron.NewScheduler(l)
|
s := gocron.NewScheduler(l)
|
||||||
_, _ = s.Every(5).Seconds().Do(func() {
|
_, _ = s.Tag("activity").Every(config.Get().System.ActivitySendInterval).Seconds().Do(func() {
|
||||||
if err := processActivityLogs(m); err != nil {
|
if err := processActivityLogs(m, config.Get().System.ActivitySendCount); err != nil {
|
||||||
log.WithField("error", err).Error("cron: failed to process activity events")
|
log.WithField("error", err).Error("cron: failed to process activity events")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue
Block a user