wings/internal/cron/activity_cron.go
Ethan Alicea 4390bad36b
Please enter the commit message for your changes. Lines starting
with '#' will be ignored, and an empty message aborts the commit.

 Author:    Ethan Alicea <64653625+Tech-Gamer@users.noreply.github.com>

 On branch develop
 Your branch is up to date with 'origin/develop'.

 Changes to be committed:
	modified:   .github/workflows/push.yaml
	modified:   .github/workflows/release.yaml
	modified:   CHANGELOG.md
	modified:   Dockerfile
	modified:   Makefile
	modified:   README.md
	modified:   cmd/configure.go
	modified:   cmd/diagnostics.go
	modified:   cmd/root.go
	modified:   config/config.go
	modified:   environment/allocations.go
	modified:   environment/docker.go
	modified:   environment/docker/api.go
	modified:   environment/docker/container.go
	modified:   environment/docker/environment.go
	modified:   environment/docker/power.go
	modified:   environment/docker/stats.go
	modified:   environment/environment.go
	modified:   environment/settings.go
	modified:   events/events.go
	modified:   go.mod
	modified:   internal/cron/activity_cron.go
	modified:   internal/cron/cron.go
	modified:   internal/cron/sftp_cron.go
	modified:   internal/database/database.go
	modified:   internal/progress/progress.go
	modified:   internal/progress/progress_test.go
	modified:   loggers/cli/cli.go
	new file:   oryxBuildBinary
	modified:   parser/parser.go
	modified:   remote/http.go
	modified:   remote/servers.go
	modified:   remote/types.go
	modified:   router/downloader/downloader.go
	modified:   router/middleware.go
	modified:   router/middleware/middleware.go
	modified:   router/middleware/request_error.go
	modified:   router/router.go
	modified:   router/router_download.go
	modified:   router/router_server.go
	modified:   router/router_server_backup.go
	modified:   router/router_server_files.go
	modified:   router/router_server_transfer.go
	modified:   router/router_server_ws.go
	modified:   router/router_system.go
	modified:   router/router_transfer.go
	modified:   router/tokens/parser.go
	modified:   router/websocket/listeners.go
	modified:   router/websocket/websocket.go
	modified:   server/activity.go
	modified:   server/backup.go
	modified:   server/backup/backup.go
	modified:   server/backup/backup_local.go
	modified:   server/backup/backup_s3.go
	modified:   server/configuration.go
	modified:   server/console.go
	modified:   server/crash.go
	modified:   server/events.go
	modified:   server/filesystem/archive.go
	modified:   server/filesystem/filesystem.go
	modified:   server/filesystem/filesystem_test.go
	modified:   server/install.go
	modified:   server/installer/installer.go
	modified:   server/listeners.go
	modified:   server/manager.go
	modified:   server/mounts.go
	modified:   server/power.go
	modified:   server/power_test.go
	modified:   server/resources.go
	modified:   server/server.go
	modified:   server/transfer/archive.go
	modified:   server/transfer/source.go
	modified:   server/transfer/transfer.go
	modified:   server/update.go
	modified:   sftp/event.go
	modified:   sftp/handler.go
	modified:   sftp/server.go
	modified:   wings.go
2023-09-11 17:22:09 +00:00

87 lines
2.4 KiB
Go

package cron
import (
"context"
"net"
"emperror.dev/errors"
"github.com/Tech-Gamer/nwy-wings/internal/database"
"github.com/Tech-Gamer/nwy-wings/internal/models"
"github.com/Tech-Gamer/nwy-wings/server"
"github.com/Tech-Gamer/nwy-wings/system"
)
type activityCron struct {
mu *system.AtomicBool
manager *server.Manager
max int
}
// Run executes the cronjob and ensures we fetch and send all the stored activity to the
// Panel instance. Once activity is sent it is deleted from the local database instance. Any
// SFTP specific events are not handled in this cron, they're handled separately to account
// for de-duplication and event merging.
func (ac *activityCron) Run(ctx context.Context) error {
// Don't execute this cron if there is currently one running. Once this task is completed
// go ahead and mark it as no longer running.
if !ac.mu.SwapIf(true) {
return errors.WithStack(ErrCronRunning)
}
defer ac.mu.Store(false)
var activity []models.Activity
tx := database.Instance().WithContext(ctx).
Where("event NOT LIKE ?", "server:sftp.%").
Limit(ac.max).
Find(&activity)
if tx.Error != nil {
return errors.WithStack(tx.Error)
}
if len(activity) == 0 {
return nil
}
// ids to delete from the database.
ids := make([]int, 0, len(activity))
// activities to send to the panel.
activities := make([]models.Activity, 0, len(activity))
for _, v := range activity {
// Delete any activity that has an invalid IP address. This is a fix for
// a bug that truncated the last octet of an IPv6 address in the database.
if ip := net.ParseIP(v.IP); ip == nil {
ids = append(ids, v.ID)
continue
}
activities = append(activities, v)
}
if len(ids) > 0 {
tx = database.Instance().WithContext(ctx).Where("id IN ?", ids).Delete(&models.Activity{})
if tx.Error != nil {
return errors.WithStack(tx.Error)
}
}
if len(activities) == 0 {
return nil
}
if err := ac.manager.Client().SendActivityLogs(ctx, activities); err != nil {
return errors.WrapIf(err, "cron: failed to send activity events to Panel")
}
// Add all the successful activities to the list of IDs to delete.
ids = make([]int, len(activities))
for i, v := range activities {
ids[i] = v.ID
}
// Delete all the activities that were sent to the Panel (or that were invalid).
tx = database.Instance().WithContext(ctx).Where("id IN ?", ids).Delete(&models.Activity{})
if tx.Error != nil {
return errors.WithStack(tx.Error)
}
return nil
}