End to bridge encryption implementation
So far this is passing my basic tests, but could use some testing from people that are much more familiar with how this is supposed to work. Refs #27
This commit is contained in:
@@ -48,6 +48,8 @@ type Bridge struct {
|
||||
puppetsLock sync.Mutex
|
||||
|
||||
StateStore *database.SQLStateStore
|
||||
|
||||
crypto Crypto
|
||||
}
|
||||
|
||||
func New(cfg *config.Config) (*Bridge, error) {
|
||||
@@ -104,6 +106,8 @@ func New(cfg *config.Config) (*Bridge, error) {
|
||||
StateStore: stateStore,
|
||||
}
|
||||
|
||||
bridge.crypto = NewCryptoHelper(bridge)
|
||||
|
||||
if cfg.Appservice.Provisioning.Enabled() {
|
||||
bridge.provisioning = newProvisioningAPI(bridge)
|
||||
}
|
||||
@@ -151,6 +155,13 @@ func (b *Bridge) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if b.crypto != nil {
|
||||
if err := b.crypto.Init(); err != nil {
|
||||
b.log.Fatalln("Error initializing end-to-bridge encryption:", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
b.log.Debugln("Starting application service HTTP server")
|
||||
go b.as.Start()
|
||||
|
||||
@@ -159,6 +170,10 @@ func (b *Bridge) Start() error {
|
||||
|
||||
go b.updateBotProfile()
|
||||
|
||||
if b.crypto != nil {
|
||||
go b.crypto.Start()
|
||||
}
|
||||
|
||||
go b.startUsers()
|
||||
|
||||
// Finally tell the appservice we're ready
|
||||
@@ -168,5 +183,21 @@ func (b *Bridge) Start() error {
|
||||
}
|
||||
|
||||
func (b *Bridge) Stop() {
|
||||
if b.crypto != nil {
|
||||
b.crypto.Stop()
|
||||
}
|
||||
|
||||
b.as.Stop()
|
||||
b.eventProcessor.Stop()
|
||||
|
||||
for _, user := range b.usersByMXID {
|
||||
if user.Session == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
b.log.Debugln("Disconnecting", user.MXID)
|
||||
user.Session.Close()
|
||||
}
|
||||
|
||||
b.log.Infoln("Bridge stopped")
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package bridge
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/alecthomas/kong"
|
||||
|
||||
@@ -33,11 +34,17 @@ func (g *globals) reply(msg string) {
|
||||
content.MsgType = event.MsgNotice
|
||||
intent := g.bot
|
||||
|
||||
if g.portal != nil && g.portal.IsPrivateChat() {
|
||||
if g.portal == nil {
|
||||
g.handler.log.Errorfln("we don't have a portal for this command")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if g.portal.IsPrivateChat() {
|
||||
intent = g.portal.MainIntent()
|
||||
}
|
||||
|
||||
_, err := intent.SendMessageEvent(g.roomID, event.EventMessage, content)
|
||||
_, err := g.portal.sendMatrixMessage(intent, event.EventMessage, &content, nil, time.Now().UTC().UnixMilli())
|
||||
if err != nil {
|
||||
g.handler.log.Warnfln("Failed to reply to command from %q: %v", g.user.MXID, err)
|
||||
}
|
||||
|
||||
339
bridge/crypto.go
Normal file
339
bridge/crypto.go
Normal file
@@ -0,0 +1,339 @@
|
||||
package bridge
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"maunium.net/go/maulogger/v2"
|
||||
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/crypto"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
|
||||
"gitlab.com/beeper/discord/database"
|
||||
)
|
||||
|
||||
var NoSessionFound = crypto.NoSessionFound
|
||||
|
||||
var levelTrace = maulogger.Level{
|
||||
Name: "TRACE",
|
||||
Severity: -10,
|
||||
Color: -1,
|
||||
}
|
||||
|
||||
type Crypto interface {
|
||||
HandleMemberEvent(*event.Event)
|
||||
Decrypt(*event.Event) (*event.Event, error)
|
||||
Encrypt(id.RoomID, event.Type, event.Content) (*event.EncryptedEventContent, error)
|
||||
WaitForSession(id.RoomID, id.SenderKey, id.SessionID, time.Duration) bool
|
||||
RequestSession(id.RoomID, id.SenderKey, id.SessionID, id.UserID, id.DeviceID)
|
||||
ResetSession(id.RoomID)
|
||||
Init() error
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
|
||||
type CryptoHelper struct {
|
||||
bridge *Bridge
|
||||
client *mautrix.Client
|
||||
mach *crypto.OlmMachine
|
||||
store *database.SQLCryptoStore
|
||||
log maulogger.Logger
|
||||
baseLog maulogger.Logger
|
||||
}
|
||||
|
||||
func NewCryptoHelper(bridge *Bridge) Crypto {
|
||||
if !bridge.Config.Bridge.Encryption.Allow {
|
||||
bridge.log.Debugln("Bridge built with end-to-bridge encryption, but disabled in config")
|
||||
return nil
|
||||
}
|
||||
|
||||
baseLog := bridge.log.Sub("Crypto")
|
||||
return &CryptoHelper{
|
||||
bridge: bridge,
|
||||
log: baseLog.Sub("Helper"),
|
||||
baseLog: baseLog,
|
||||
}
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) Init() error {
|
||||
helper.log.Debugln("Initializing end-to-bridge encryption...")
|
||||
|
||||
helper.store = database.NewSQLCryptoStore(helper.bridge.db, helper.bridge.as.BotMXID(),
|
||||
fmt.Sprintf("@%s:%s", helper.bridge.Config.Bridge.FormatUsername("%"), helper.bridge.as.HomeserverDomain))
|
||||
|
||||
var err error
|
||||
helper.client, err = helper.loginBot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
helper.log.Debugln("Logged in as bridge bot with device ID", helper.client.DeviceID)
|
||||
|
||||
logger := &cryptoLogger{helper.baseLog}
|
||||
stateStore := &cryptoStateStore{helper.bridge}
|
||||
helper.mach = crypto.NewOlmMachine(helper.client, logger, helper.store, stateStore)
|
||||
helper.mach.AllowKeyShare = helper.allowKeyShare
|
||||
|
||||
helper.client.Syncer = &cryptoSyncer{helper.mach}
|
||||
helper.client.Store = &cryptoClientStore{helper.store}
|
||||
|
||||
return helper.mach.Load()
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) allowKeyShare(device *crypto.DeviceIdentity, info event.RequestedKeyInfo) *crypto.KeyShareRejection {
|
||||
cfg := helper.bridge.Config.Bridge.Encryption.KeySharing
|
||||
if !cfg.Allow {
|
||||
return &crypto.KeyShareRejectNoResponse
|
||||
} else if device.Trust == crypto.TrustStateBlacklisted {
|
||||
return &crypto.KeyShareRejectBlacklisted
|
||||
} else if device.Trust == crypto.TrustStateVerified || !cfg.RequireVerification {
|
||||
portal := helper.bridge.GetPortalByMXID(info.RoomID)
|
||||
if portal == nil {
|
||||
helper.log.Debugfln("Rejecting key request for %s from %s/%s: room is not a portal", info.SessionID, device.UserID, device.DeviceID)
|
||||
|
||||
return &crypto.KeyShareRejection{Code: event.RoomKeyWithheldUnavailable, Reason: "Requested room is not a portal room"}
|
||||
}
|
||||
user := helper.bridge.GetUserByMXID(device.UserID)
|
||||
// FIXME reimplement IsInPortal
|
||||
if !user.Admin /*&& !user.IsInPortal(portal.Key)*/ {
|
||||
helper.log.Debugfln("Rejecting key request for %s from %s/%s: user is not in portal", info.SessionID, device.UserID, device.DeviceID)
|
||||
|
||||
return &crypto.KeyShareRejection{Code: event.RoomKeyWithheldUnauthorized, Reason: "You're not in that portal"}
|
||||
}
|
||||
helper.log.Debugfln("Accepting key request for %s from %s/%s", info.SessionID, device.UserID, device.DeviceID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return &crypto.KeyShareRejectUnverified
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) loginBot() (*mautrix.Client, error) {
|
||||
deviceID := helper.store.FindDeviceID()
|
||||
if len(deviceID) > 0 {
|
||||
helper.log.Debugln("Found existing device ID for bot in database:", deviceID)
|
||||
}
|
||||
|
||||
client, err := mautrix.NewClient(helper.bridge.as.HomeserverURL, "", "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize client: %w", err)
|
||||
}
|
||||
|
||||
client.Logger = helper.baseLog.Sub("Bot")
|
||||
client.Client = helper.bridge.as.HTTPClient
|
||||
client.DefaultHTTPRetries = helper.bridge.as.DefaultHTTPRetries
|
||||
flows, err := client.GetLoginFlows()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get supported login flows: %w", err)
|
||||
}
|
||||
|
||||
flow := flows.FirstFlowOfType(mautrix.AuthTypeAppservice, mautrix.AuthTypeHalfyAppservice)
|
||||
if flow == nil {
|
||||
return nil, fmt.Errorf("homeserver does not support appservice login")
|
||||
}
|
||||
|
||||
// We set the API token to the AS token here to authenticate the appservice login
|
||||
// It'll get overridden after the login
|
||||
client.AccessToken = helper.bridge.as.Registration.AppToken
|
||||
resp, err := client.Login(&mautrix.ReqLogin{
|
||||
Type: flow.Type,
|
||||
Identifier: mautrix.UserIdentifier{Type: mautrix.IdentifierTypeUser, User: string(helper.bridge.as.BotMXID())},
|
||||
DeviceID: deviceID,
|
||||
InitialDeviceDisplayName: "Discord Bridge",
|
||||
StoreCredentials: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to log in as bridge bot: %w", err)
|
||||
}
|
||||
|
||||
helper.store.DeviceID = resp.DeviceID
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) Start() {
|
||||
helper.log.Debugln("Starting syncer for receiving to-device messages")
|
||||
|
||||
err := helper.client.Sync()
|
||||
if err != nil {
|
||||
helper.log.Errorln("Fatal error syncing:", err)
|
||||
} else {
|
||||
helper.log.Infoln("Bridge bot to-device syncer stopped without error")
|
||||
}
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) Stop() {
|
||||
helper.log.Debugln("CryptoHelper.Stop() called, stopping bridge bot sync")
|
||||
helper.client.StopSync()
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) Decrypt(evt *event.Event) (*event.Event, error) {
|
||||
return helper.mach.DecryptMegolmEvent(evt)
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) Encrypt(roomID id.RoomID, evtType event.Type, content event.Content) (*event.EncryptedEventContent, error) {
|
||||
encrypted, err := helper.mach.EncryptMegolmEvent(roomID, evtType, &content)
|
||||
|
||||
if err != nil {
|
||||
if err != crypto.SessionExpired && err != crypto.SessionNotShared && err != crypto.NoGroupSession {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
helper.log.Debugfln("Got %v while encrypting event for %s, sharing group session and trying again...", err, roomID)
|
||||
users, err := helper.store.GetRoomMembers(roomID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get room member list: %w", err)
|
||||
}
|
||||
|
||||
err = helper.mach.ShareGroupSession(roomID, users)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to share group session: %w", err)
|
||||
}
|
||||
|
||||
encrypted, err = helper.mach.EncryptMegolmEvent(roomID, evtType, &content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to encrypt event after re-sharing group session: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return encrypted, nil
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) WaitForSession(roomID id.RoomID, senderKey id.SenderKey, sessionID id.SessionID, timeout time.Duration) bool {
|
||||
return helper.mach.WaitForSession(roomID, senderKey, sessionID, timeout)
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) RequestSession(roomID id.RoomID, senderKey id.SenderKey, sessionID id.SessionID, userID id.UserID, deviceID id.DeviceID) {
|
||||
err := helper.mach.SendRoomKeyRequest(roomID, senderKey, sessionID, "", map[id.UserID][]id.DeviceID{userID: {deviceID}})
|
||||
if err != nil {
|
||||
helper.log.Warnfln("Failed to send key request to %s/%s for %s in %s: %v", userID, deviceID, sessionID, roomID, err)
|
||||
} else {
|
||||
helper.log.Debugfln("Sent key request to %s/%s for %s in %s", userID, deviceID, sessionID, roomID)
|
||||
}
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) ResetSession(roomID id.RoomID) {
|
||||
err := helper.mach.CryptoStore.RemoveOutboundGroupSession(roomID)
|
||||
if err != nil {
|
||||
helper.log.Debugfln("Error manually removing outbound group session in %s: %v", roomID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (helper *CryptoHelper) HandleMemberEvent(evt *event.Event) {
|
||||
helper.mach.HandleMemberEvent(evt)
|
||||
}
|
||||
|
||||
type cryptoSyncer struct {
|
||||
*crypto.OlmMachine
|
||||
}
|
||||
|
||||
func (syncer *cryptoSyncer) ProcessResponse(resp *mautrix.RespSync, since string) error {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
syncer.Log.Error("Processing sync response (%s) panicked: %v\n%s", since, err, debug.Stack())
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
syncer.Log.Trace("Starting sync response handling (%s)", since)
|
||||
syncer.ProcessSyncResponse(resp, since)
|
||||
syncer.Log.Trace("Successfully handled sync response (%s)", since)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(30 * time.Second):
|
||||
syncer.Log.Warn("Handling sync response (%s) is taking unusually long", since)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *cryptoSyncer) OnFailedSync(_ *mautrix.RespSync, err error) (time.Duration, error) {
|
||||
syncer.Log.Error("Error /syncing, waiting 10 seconds: %v", err)
|
||||
|
||||
return 10 * time.Second, nil
|
||||
}
|
||||
|
||||
func (syncer *cryptoSyncer) GetFilterJSON(_ id.UserID) *mautrix.Filter {
|
||||
everything := []event.Type{{Type: "*"}}
|
||||
|
||||
return &mautrix.Filter{
|
||||
Presence: mautrix.FilterPart{NotTypes: everything},
|
||||
AccountData: mautrix.FilterPart{NotTypes: everything},
|
||||
Room: mautrix.RoomFilter{
|
||||
IncludeLeave: false,
|
||||
Ephemeral: mautrix.FilterPart{NotTypes: everything},
|
||||
AccountData: mautrix.FilterPart{NotTypes: everything},
|
||||
State: mautrix.FilterPart{NotTypes: everything},
|
||||
Timeline: mautrix.FilterPart{NotTypes: everything},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type cryptoLogger struct {
|
||||
int maulogger.Logger
|
||||
}
|
||||
|
||||
func (c *cryptoLogger) Error(message string, args ...interface{}) {
|
||||
c.int.Errorfln(message, args...)
|
||||
}
|
||||
|
||||
func (c *cryptoLogger) Warn(message string, args ...interface{}) {
|
||||
c.int.Warnfln(message, args...)
|
||||
}
|
||||
|
||||
func (c *cryptoLogger) Debug(message string, args ...interface{}) {
|
||||
c.int.Debugfln(message, args...)
|
||||
}
|
||||
|
||||
func (c *cryptoLogger) Trace(message string, args ...interface{}) {
|
||||
c.int.Logfln(levelTrace, message, args...)
|
||||
}
|
||||
|
||||
type cryptoClientStore struct {
|
||||
int *database.SQLCryptoStore
|
||||
}
|
||||
|
||||
func (c cryptoClientStore) SaveFilterID(_ id.UserID, _ string) {}
|
||||
func (c cryptoClientStore) LoadFilterID(_ id.UserID) string { return "" }
|
||||
func (c cryptoClientStore) SaveRoom(_ *mautrix.Room) {}
|
||||
func (c cryptoClientStore) LoadRoom(_ id.RoomID) *mautrix.Room { return nil }
|
||||
|
||||
func (c cryptoClientStore) SaveNextBatch(_ id.UserID, nextBatchToken string) {
|
||||
c.int.PutNextBatch(nextBatchToken)
|
||||
}
|
||||
|
||||
func (c cryptoClientStore) LoadNextBatch(_ id.UserID) string {
|
||||
return c.int.GetNextBatch()
|
||||
}
|
||||
|
||||
var _ mautrix.Storer = (*cryptoClientStore)(nil)
|
||||
|
||||
type cryptoStateStore struct {
|
||||
bridge *Bridge
|
||||
}
|
||||
|
||||
var _ crypto.StateStore = (*cryptoStateStore)(nil)
|
||||
|
||||
func (c *cryptoStateStore) IsEncrypted(id id.RoomID) bool {
|
||||
portal := c.bridge.GetPortalByMXID(id)
|
||||
if portal != nil {
|
||||
return portal.Encrypted
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *cryptoStateStore) FindSharedRooms(id id.UserID) []id.RoomID {
|
||||
return c.bridge.StateStore.FindSharedRooms(id)
|
||||
}
|
||||
|
||||
func (c *cryptoStateStore) GetEncryptionEvent(id.RoomID) *event.EncryptionEventContent {
|
||||
// TODO implement
|
||||
return nil
|
||||
}
|
||||
119
bridge/matrix.go
119
bridge/matrix.go
@@ -1,7 +1,10 @@
|
||||
package bridge
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"maunium.net/go/maulogger/v2"
|
||||
"maunium.net/go/mautrix"
|
||||
@@ -29,9 +32,11 @@ func (b *Bridge) setupEvents() {
|
||||
}
|
||||
|
||||
b.eventProcessor.On(event.EventMessage, b.matrixHandler.handleMessage)
|
||||
b.eventProcessor.On(event.EventEncrypted, b.matrixHandler.handleEncrypted)
|
||||
b.eventProcessor.On(event.EventReaction, b.matrixHandler.handleReaction)
|
||||
b.eventProcessor.On(event.EventRedaction, b.matrixHandler.handleRedaction)
|
||||
b.eventProcessor.On(event.StateMember, b.matrixHandler.handleMembership)
|
||||
b.eventProcessor.On(event.StateEncryption, b.matrixHandler.handleEncryption)
|
||||
}
|
||||
|
||||
func (mh *matrixHandler) join(evt *event.Event, intent *appservice.IntentAPI) *mautrix.RespJoinedMembers {
|
||||
@@ -185,6 +190,10 @@ func (mh *matrixHandler) handleMembership(evt *event.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
if mh.bridge.crypto != nil {
|
||||
mh.bridge.crypto.HandleMemberEvent(evt)
|
||||
}
|
||||
|
||||
// Grab the content of the event.
|
||||
content := evt.Content.AsMember()
|
||||
|
||||
@@ -255,3 +264,113 @@ func (mh *matrixHandler) handleRedaction(evt *event.Event) {
|
||||
portal.handleMatrixRedaction(evt)
|
||||
}
|
||||
}
|
||||
|
||||
func (mh *matrixHandler) handleEncryption(evt *event.Event) {
|
||||
if evt.Content.AsEncryption().Algorithm != id.AlgorithmMegolmV1 {
|
||||
return
|
||||
}
|
||||
|
||||
portal := mh.bridge.GetPortalByMXID(evt.RoomID)
|
||||
if portal != nil && !portal.Encrypted {
|
||||
mh.log.Debugfln("%s enabled encryption in %s", evt.Sender, evt.RoomID)
|
||||
portal.Encrypted = true
|
||||
portal.Update()
|
||||
}
|
||||
}
|
||||
|
||||
const sessionWaitTimeout = 5 * time.Second
|
||||
|
||||
func (mh *matrixHandler) handleEncrypted(evt *event.Event) {
|
||||
if mh.ignoreEvent(evt) || mh.bridge.crypto == nil {
|
||||
return
|
||||
}
|
||||
|
||||
decrypted, err := mh.bridge.crypto.Decrypt(evt)
|
||||
decryptionRetryCount := 0
|
||||
if errors.Is(err, NoSessionFound) {
|
||||
content := evt.Content.AsEncrypted()
|
||||
mh.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d seconds...", content.SessionID, evt.ID, int(sessionWaitTimeout.Seconds()))
|
||||
mh.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, false, decryptionRetryCount)
|
||||
decryptionRetryCount++
|
||||
|
||||
if mh.bridge.crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, sessionWaitTimeout) {
|
||||
mh.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID)
|
||||
decrypted, err = mh.bridge.crypto.Decrypt(evt)
|
||||
} else {
|
||||
mh.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), false, decryptionRetryCount)
|
||||
|
||||
go mh.waitLongerForSession(evt)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
mh.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, decryptionRetryCount)
|
||||
|
||||
mh.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
|
||||
_, _ = mh.bridge.bot.SendNotice(evt.RoomID, fmt.Sprintf(
|
||||
"\u26a0 Your message was not bridged: %v", err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
mh.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, decryptionRetryCount)
|
||||
mh.bridge.eventProcessor.Dispatch(decrypted)
|
||||
}
|
||||
|
||||
func (mh *matrixHandler) waitLongerForSession(evt *event.Event) {
|
||||
const extendedTimeout = sessionWaitTimeout * 3
|
||||
|
||||
content := evt.Content.AsEncrypted()
|
||||
mh.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d more seconds...",
|
||||
content.SessionID, evt.ID, int(extendedTimeout.Seconds()))
|
||||
|
||||
go mh.bridge.crypto.RequestSession(evt.RoomID, content.SenderKey, content.SessionID, evt.Sender, content.DeviceID)
|
||||
|
||||
resp, err := mh.bridge.bot.SendNotice(evt.RoomID, fmt.Sprintf(
|
||||
"\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. "+
|
||||
"The bridge will retry for %d seconds. If this error keeps happening, try restarting your client.",
|
||||
int(extendedTimeout.Seconds())))
|
||||
if err != nil {
|
||||
mh.log.Errorfln("Failed to send decryption error to %s: %v", evt.RoomID, err)
|
||||
}
|
||||
|
||||
update := event.MessageEventContent{MsgType: event.MsgNotice}
|
||||
|
||||
if mh.bridge.crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, extendedTimeout) {
|
||||
mh.log.Debugfln("Got session %s after waiting more, trying to decrypt %s again", content.SessionID, evt.ID)
|
||||
|
||||
decrypted, err := mh.bridge.crypto.Decrypt(evt)
|
||||
if err == nil {
|
||||
mh.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, 2)
|
||||
mh.bridge.eventProcessor.Dispatch(decrypted)
|
||||
_, _ = mh.bridge.bot.RedactEvent(evt.RoomID, resp.EventID)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
mh.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
|
||||
mh.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, 2)
|
||||
update.Body = fmt.Sprintf("\u26a0 Your message was not bridged: %v", err)
|
||||
} else {
|
||||
mh.log.Debugfln("Didn't get %s, giving up on %s", content.SessionID, evt.ID)
|
||||
mh.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), true, 2)
|
||||
update.Body = "\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. " +
|
||||
"If this error keeps happening, try restarting your client."
|
||||
}
|
||||
|
||||
newContent := update
|
||||
update.NewContent = &newContent
|
||||
if resp != nil {
|
||||
update.RelatesTo = &event.RelatesTo{
|
||||
Type: event.RelReplace,
|
||||
EventID: resp.EventID,
|
||||
}
|
||||
}
|
||||
|
||||
_, err = mh.bridge.bot.SendMessageEvent(evt.RoomID, event.EventMessage, &update)
|
||||
if err != nil {
|
||||
mh.log.Debugfln("Failed to update decryption error notice %s: %v", resp.EventID, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ type Portal struct {
|
||||
log log.Logger
|
||||
|
||||
roomCreateLock sync.Mutex
|
||||
encryptLock sync.Mutex
|
||||
|
||||
discordMessages chan portalDiscordMessage
|
||||
matrixMessages chan portalMatrixMessage
|
||||
@@ -144,7 +145,7 @@ func (p *Portal) handleMatrixInvite(sender *User, evt *event.Event) {
|
||||
p.log.Infoln("no puppet for %v", sender)
|
||||
// Open a conversation on the discord side?
|
||||
}
|
||||
p.log.Infoln("puppet:", puppet)
|
||||
p.log.Infoln("matrixInvite: puppet:", puppet)
|
||||
}
|
||||
|
||||
func (p *Portal) messageLoop() {
|
||||
@@ -212,14 +213,25 @@ func (p *Portal) createMatrixRoom(user *User, channel *discordgo.Channel) error
|
||||
|
||||
var invite []id.UserID
|
||||
|
||||
if p.IsPrivateChat() {
|
||||
invite = append(invite, p.bridge.bot.UserID)
|
||||
if p.bridge.Config.Bridge.Encryption.Default {
|
||||
initialState = append(initialState, &event.Event{
|
||||
Type: event.StateEncryption,
|
||||
Content: event.Content{
|
||||
Parsed: event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1},
|
||||
},
|
||||
})
|
||||
p.Encrypted = true
|
||||
|
||||
if p.IsPrivateChat() {
|
||||
invite = append(invite, p.bridge.bot.UserID)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := intent.CreateRoom(&mautrix.ReqCreateRoom{
|
||||
Visibility: "private",
|
||||
Name: p.Name,
|
||||
Topic: p.Topic,
|
||||
Invite: invite,
|
||||
Preset: "private_chat",
|
||||
IsDirect: p.IsPrivateChat(),
|
||||
InitialState: initialState,
|
||||
@@ -325,7 +337,7 @@ func (p *Portal) sendMediaFailedMessage(intent *appservice.IntentAPI, bridgeErr
|
||||
MsgType: event.MsgNotice,
|
||||
}
|
||||
|
||||
_, err := intent.SendMessageEvent(p.MXID, event.EventMessage, content)
|
||||
_, err := p.sendMatrixMessage(intent, event.EventMessage, content, nil, time.Now().UTC().UnixMilli())
|
||||
if err != nil {
|
||||
p.log.Warnfln("failed to send error message to matrix: %v", err)
|
||||
}
|
||||
@@ -379,7 +391,7 @@ func (p *Portal) handleDiscordAttachment(intent *appservice.IntentAPI, msgID str
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := intent.SendMessageEvent(p.MXID, event.EventMessage, content)
|
||||
resp, err := p.sendMatrixMessage(intent, event.EventMessage, content, nil, time.Now().UTC().UnixMilli())
|
||||
if err != nil {
|
||||
p.log.Warnfln("failed to send media message to matrix: %v", err)
|
||||
}
|
||||
@@ -426,7 +438,7 @@ func (p *Portal) handleDiscordMessageCreate(user *User, msg *discordgo.Message)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := intent.SendMessageEvent(p.MXID, event.EventMessage, content)
|
||||
resp, err := p.sendMatrixMessage(intent, event.EventMessage, content, nil, time.Now().UTC().UnixMilli())
|
||||
if err != nil {
|
||||
p.log.Warnfln("failed to send message %q to matrix: %v", msg.ID, err)
|
||||
|
||||
@@ -498,7 +510,7 @@ func (p *Portal) handleDiscordMessagesUpdate(user *User, msg *discordgo.Message)
|
||||
|
||||
content.SetEdit(existing.MatrixID)
|
||||
|
||||
resp, err := intent.SendMessageEvent(p.MXID, event.EventMessage, content)
|
||||
resp, err := p.sendMatrixMessage(intent, event.EventMessage, content, nil, time.Now().UTC().UnixMilli())
|
||||
if err != nil {
|
||||
p.log.Warnfln("failed to send message %q to matrix: %v", msg.ID, err)
|
||||
|
||||
@@ -567,6 +579,57 @@ func (p *Portal) syncParticipants(source *User, participants []*discordgo.User)
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) encrypt(content *event.Content, eventType event.Type) (event.Type, error) {
|
||||
if portal.Encrypted && portal.bridge.crypto != nil {
|
||||
// TODO maybe the locking should be inside mautrix-go?
|
||||
portal.encryptLock.Lock()
|
||||
encrypted, err := portal.bridge.crypto.Encrypt(portal.MXID, eventType, *content)
|
||||
portal.encryptLock.Unlock()
|
||||
if err != nil {
|
||||
return eventType, fmt.Errorf("failed to encrypt event: %w", err)
|
||||
}
|
||||
eventType = event.EventEncrypted
|
||||
content.Parsed = encrypted
|
||||
}
|
||||
return eventType, nil
|
||||
}
|
||||
|
||||
const doublePuppetKey = "fi.mau.double_puppet_source"
|
||||
const doublePuppetValue = "mautrix-discord"
|
||||
|
||||
func (portal *Portal) sendMatrixMessage(intent *appservice.IntentAPI, eventType event.Type, content *event.MessageEventContent, extraContent map[string]interface{}, timestamp int64) (*mautrix.RespSendEvent, error) {
|
||||
wrappedContent := event.Content{Parsed: content, Raw: extraContent}
|
||||
if timestamp != 0 && intent.IsCustomPuppet {
|
||||
if wrappedContent.Raw == nil {
|
||||
wrappedContent.Raw = map[string]interface{}{}
|
||||
}
|
||||
if intent.IsCustomPuppet {
|
||||
wrappedContent.Raw[doublePuppetKey] = doublePuppetValue
|
||||
}
|
||||
}
|
||||
var err error
|
||||
eventType, err = portal.encrypt(&wrappedContent, eventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if eventType == event.EventEncrypted {
|
||||
// Clear other custom keys if the event was encrypted, but keep the double puppet identifier
|
||||
if intent.IsCustomPuppet {
|
||||
wrappedContent.Raw = map[string]interface{}{doublePuppetKey: doublePuppetValue}
|
||||
} else {
|
||||
wrappedContent.Raw = nil
|
||||
}
|
||||
}
|
||||
|
||||
_, _ = intent.UserTyping(portal.MXID, false, 0)
|
||||
if timestamp == 0 {
|
||||
return intent.SendMessageEvent(portal.MXID, eventType, &wrappedContent)
|
||||
} else {
|
||||
return intent.SendMassagedMessageEvent(portal.MXID, eventType, &wrappedContent, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Portal) handleMatrixMessages(msg portalMatrixMessage) {
|
||||
switch msg.evt.Type {
|
||||
case event.EventMessage:
|
||||
|
||||
@@ -32,6 +32,9 @@ type User struct {
|
||||
bridge *Bridge
|
||||
log log.Logger
|
||||
|
||||
// TODO finish implementing
|
||||
Admin bool
|
||||
|
||||
guilds map[string]*database.Guild
|
||||
guildsLock sync.Mutex
|
||||
|
||||
|
||||
Reference in New Issue
Block a user