Add initial backfilling on portal creation

This commit is contained in:
Tulir Asokan
2023-04-16 15:19:24 +03:00
parent 4194b4dfd9
commit 58befb3f96
3 changed files with 60 additions and 16 deletions

View File

@@ -16,6 +16,29 @@ import (
"go.mau.fi/mautrix-discord/database" "go.mau.fi/mautrix-discord/database"
) )
func (portal *Portal) forwardBackfillInitial(source *User) {
defer portal.forwardBackfillLock.Unlock()
// This should only be called from CreateMatrixRoom which locks forwardBackfillLock before creating the room.
if portal.forwardBackfillLock.TryLock() {
panic("forwardBackfillInitial() called without locking forwardBackfillLock")
}
limit := portal.bridge.Config.Bridge.Backfill.Limits.Initial.Channel
if portal.GuildID == "" {
limit = portal.bridge.Config.Bridge.Backfill.Limits.Initial.DM
}
if limit == 0 {
return
}
log := portal.zlog.With().
Str("action", "initial backfill").
Int("limit", limit).
Logger()
portal.backfillLimited(log, source, limit, "")
}
func (portal *Portal) ForwardBackfillMissed(source *User, meta *discordgo.Channel) { func (portal *Portal) ForwardBackfillMissed(source *User, meta *discordgo.Channel) {
limit := portal.bridge.Config.Bridge.Backfill.Limits.Missed.Channel limit := portal.bridge.Config.Bridge.Backfill.Limits.Missed.Channel
if portal.GuildID == "" { if portal.GuildID == "" {
@@ -50,7 +73,7 @@ func (portal *Portal) ForwardBackfillMissed(source *User, meta *discordgo.Channe
if limit < 0 { if limit < 0 {
portal.backfillUnlimitedMissed(log, source, lastMessage.DiscordID) portal.backfillUnlimitedMissed(log, source, lastMessage.DiscordID)
} else { } else {
portal.backfillLimitedMissed(log, source, limit, lastMessage.DiscordID) portal.backfillLimited(log, source, limit, lastMessage.DiscordID)
} }
} }
@@ -66,15 +89,17 @@ func (portal *Portal) collectBackfillMessages(log zerolog.Logger, source *User,
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
for i, msg := range newMessages { if until != "" {
if compareMessageIDs(msg.ID, until) <= 0 { for i, msg := range newMessages {
log.Debug(). if compareMessageIDs(msg.ID, until) <= 0 {
Str("message_id", msg.ID). log.Debug().
Str("until_id", until). Str("message_id", msg.ID).
Msg("Found message that was already bridged") Str("until_id", until).
newMessages = newMessages[:i] Msg("Found message that was already bridged")
foundAll = true newMessages = newMessages[:i]
break foundAll = true
break
}
} }
} }
messages = append(messages, newMessages...) messages = append(messages, newMessages...)
@@ -90,7 +115,7 @@ func (portal *Portal) collectBackfillMessages(log zerolog.Logger, source *User,
return messages, foundAll, nil return messages, foundAll, nil
} }
func (portal *Portal) backfillLimitedMissed(log zerolog.Logger, source *User, limit int, after string) { func (portal *Portal) backfillLimited(log zerolog.Logger, source *User, limit int, after string) {
messages, foundAll, err := portal.collectBackfillMessages(log, source, limit, after) messages, foundAll, err := portal.collectBackfillMessages(log, source, limit, after)
if err != nil { if err != nil {
log.Err(err).Msg("Error collecting messages to forward backfill") log.Err(err).Msg("Error collecting messages to forward backfill")

View File

@@ -189,13 +189,18 @@ bridge:
backfill: backfill:
# Should backfill be enabled at all? # Should backfill be enabled at all?
enabled: false enabled: false
# Limits for backfilling. Set to 0 to disable that type of backfill, or -1 for unlimited. # Limits for backfilling.
limits: limits:
# Initial backfill (when creating portal). # Initial backfill (when creating portal). 0 means backfill is disabled.
# A special unlimited value is not supported, you must set a limit. Initial backfill will
# fetch all messages first before backfilling anything, so high limits can take a lot of time.
initial: initial:
dm: 50 dm: 50
channel: 0 channel: 0
# Missed message backfill (on startup). # Missed message backfill (on startup).
# 0 means backfill is disabled, -1 means fetch all messages since last bridged message.
# When using unlimited backfill (-1), messages are backfilled as they are fetched.
# With limits, all messages up to the limit are fetched first and backfilled afterwards.
missed: missed:
dm: 50 dm: 50
channel: 0 channel: 0

View File

@@ -261,13 +261,10 @@ func (portal *Portal) messageLoop() {
for { for {
select { select {
case msg := <-portal.matrixMessages: case msg := <-portal.matrixMessages:
portal.forwardBackfillLock.Lock()
portal.handleMatrixMessages(msg) portal.handleMatrixMessages(msg)
case msg := <-portal.discordMessages: case msg := <-portal.discordMessages:
portal.forwardBackfillLock.Lock()
portal.handleDiscordMessages(msg) portal.handleDiscordMessages(msg)
} }
portal.forwardBackfillLock.Unlock()
} }
} }
@@ -468,6 +465,16 @@ func (portal *Portal) CreateMatrixRoom(user *User, channel *discordgo.Channel) e
if !portal.shouldSetDMRoomMetadata() { if !portal.shouldSetDMRoomMetadata() {
req.Name = "" req.Name = ""
} }
var backfillStarted bool
portal.forwardBackfillLock.Lock()
defer func() {
if !backfillStarted {
portal.log.Debugln("Backfill wasn't started, unlocking forward backfill lock")
portal.forwardBackfillLock.Unlock()
}
}()
resp, err := intent.CreateRoom(req) resp, err := intent.CreateRoom(req)
if err != nil { if err != nil {
portal.log.Warnln("Failed to create room:", err) portal.log.Warnln("Failed to create room:", err)
@@ -515,6 +522,9 @@ func (portal *Portal) CreateMatrixRoom(user *User, channel *discordgo.Channel) e
portal.Update() portal.Update()
} }
go portal.forwardBackfillInitial(user)
backfillStarted = true
return nil return nil
} }
@@ -532,6 +542,8 @@ func (portal *Portal) handleDiscordMessages(msg portalDiscordMessage) {
return return
} }
} }
portal.forwardBackfillLock.Lock()
defer portal.forwardBackfillLock.Unlock()
switch convertedMsg := msg.msg.(type) { switch convertedMsg := msg.msg.(type) {
case *discordgo.MessageCreate: case *discordgo.MessageCreate:
@@ -918,6 +930,8 @@ func (portal *Portal) sendMatrixMessage(intent *appservice.IntentAPI, eventType
} }
func (portal *Portal) handleMatrixMessages(msg portalMatrixMessage) { func (portal *Portal) handleMatrixMessages(msg portalMatrixMessage) {
portal.forwardBackfillLock.Lock()
defer portal.forwardBackfillLock.Unlock()
switch msg.evt.Type { switch msg.evt.Type {
case event.EventMessage, event.EventSticker: case event.EventMessage, event.EventSticker:
portal.handleMatrixMessage(msg.user, msg.evt) portal.handleMatrixMessage(msg.user, msg.evt)