From 58befb3f9686e011e1ebefc80d7ac5a0fce65c37 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 16 Apr 2023 15:19:24 +0300 Subject: [PATCH] Add initial backfilling on portal creation --- backfill.go | 47 ++++++++++++++++++++++++++++++++++----------- example-config.yaml | 9 +++++++-- portal.go | 20 ++++++++++++++++--- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/backfill.go b/backfill.go index 93396ae..bcad0c0 100644 --- a/backfill.go +++ b/backfill.go @@ -16,6 +16,29 @@ import ( "go.mau.fi/mautrix-discord/database" ) +func (portal *Portal) forwardBackfillInitial(source *User) { + defer portal.forwardBackfillLock.Unlock() + // This should only be called from CreateMatrixRoom which locks forwardBackfillLock before creating the room. + if portal.forwardBackfillLock.TryLock() { + panic("forwardBackfillInitial() called without locking forwardBackfillLock") + } + + limit := portal.bridge.Config.Bridge.Backfill.Limits.Initial.Channel + if portal.GuildID == "" { + limit = portal.bridge.Config.Bridge.Backfill.Limits.Initial.DM + } + if limit == 0 { + return + } + + log := portal.zlog.With(). + Str("action", "initial backfill"). + Int("limit", limit). + Logger() + + portal.backfillLimited(log, source, limit, "") +} + func (portal *Portal) ForwardBackfillMissed(source *User, meta *discordgo.Channel) { limit := portal.bridge.Config.Bridge.Backfill.Limits.Missed.Channel if portal.GuildID == "" { @@ -50,7 +73,7 @@ func (portal *Portal) ForwardBackfillMissed(source *User, meta *discordgo.Channe if limit < 0 { portal.backfillUnlimitedMissed(log, source, lastMessage.DiscordID) } else { - portal.backfillLimitedMissed(log, source, limit, lastMessage.DiscordID) + portal.backfillLimited(log, source, limit, lastMessage.DiscordID) } } @@ -66,15 +89,17 @@ func (portal *Portal) collectBackfillMessages(log zerolog.Logger, source *User, if err != nil { return nil, false, err } - for i, msg := range newMessages { - if compareMessageIDs(msg.ID, until) <= 0 { - log.Debug(). - Str("message_id", msg.ID). - Str("until_id", until). - Msg("Found message that was already bridged") - newMessages = newMessages[:i] - foundAll = true - break + if until != "" { + for i, msg := range newMessages { + if compareMessageIDs(msg.ID, until) <= 0 { + log.Debug(). + Str("message_id", msg.ID). + Str("until_id", until). + Msg("Found message that was already bridged") + newMessages = newMessages[:i] + foundAll = true + break + } } } messages = append(messages, newMessages...) @@ -90,7 +115,7 @@ func (portal *Portal) collectBackfillMessages(log zerolog.Logger, source *User, return messages, foundAll, nil } -func (portal *Portal) backfillLimitedMissed(log zerolog.Logger, source *User, limit int, after string) { +func (portal *Portal) backfillLimited(log zerolog.Logger, source *User, limit int, after string) { messages, foundAll, err := portal.collectBackfillMessages(log, source, limit, after) if err != nil { log.Err(err).Msg("Error collecting messages to forward backfill") diff --git a/example-config.yaml b/example-config.yaml index 472ecfd..c420304 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -189,13 +189,18 @@ bridge: backfill: # Should backfill be enabled at all? enabled: false - # Limits for backfilling. Set to 0 to disable that type of backfill, or -1 for unlimited. + # Limits for backfilling. limits: - # Initial backfill (when creating portal). + # Initial backfill (when creating portal). 0 means backfill is disabled. + # A special unlimited value is not supported, you must set a limit. Initial backfill will + # fetch all messages first before backfilling anything, so high limits can take a lot of time. initial: dm: 50 channel: 0 # Missed message backfill (on startup). + # 0 means backfill is disabled, -1 means fetch all messages since last bridged message. + # When using unlimited backfill (-1), messages are backfilled as they are fetched. + # With limits, all messages up to the limit are fetched first and backfilled afterwards. missed: dm: 50 channel: 0 diff --git a/portal.go b/portal.go index 20ced51..903dea5 100644 --- a/portal.go +++ b/portal.go @@ -261,13 +261,10 @@ func (portal *Portal) messageLoop() { for { select { case msg := <-portal.matrixMessages: - portal.forwardBackfillLock.Lock() portal.handleMatrixMessages(msg) case msg := <-portal.discordMessages: - portal.forwardBackfillLock.Lock() portal.handleDiscordMessages(msg) } - portal.forwardBackfillLock.Unlock() } } @@ -468,6 +465,16 @@ func (portal *Portal) CreateMatrixRoom(user *User, channel *discordgo.Channel) e if !portal.shouldSetDMRoomMetadata() { req.Name = "" } + + var backfillStarted bool + portal.forwardBackfillLock.Lock() + defer func() { + if !backfillStarted { + portal.log.Debugln("Backfill wasn't started, unlocking forward backfill lock") + portal.forwardBackfillLock.Unlock() + } + }() + resp, err := intent.CreateRoom(req) if err != nil { portal.log.Warnln("Failed to create room:", err) @@ -515,6 +522,9 @@ func (portal *Portal) CreateMatrixRoom(user *User, channel *discordgo.Channel) e portal.Update() } + go portal.forwardBackfillInitial(user) + backfillStarted = true + return nil } @@ -532,6 +542,8 @@ func (portal *Portal) handleDiscordMessages(msg portalDiscordMessage) { return } } + portal.forwardBackfillLock.Lock() + defer portal.forwardBackfillLock.Unlock() switch convertedMsg := msg.msg.(type) { case *discordgo.MessageCreate: @@ -918,6 +930,8 @@ func (portal *Portal) sendMatrixMessage(intent *appservice.IntentAPI, eventType } func (portal *Portal) handleMatrixMessages(msg portalMatrixMessage) { + portal.forwardBackfillLock.Lock() + defer portal.forwardBackfillLock.Unlock() switch msg.evt.Type { case event.EventMessage, event.EventSticker: portal.handleMatrixMessage(msg.user, msg.evt)