core_crypto/mls/conversation/
pending_conversation.rs

1//! When a client joins a group via an external commit, it sometimes receives messages
2//! (most of the time renewed external proposals) for the new epoch whereas it does not yet have
3//! the confirmation from the DS that the external join commit has been accepted.
4
5use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11    ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12    MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25/// A pending conversation is a conversation that has been created via an external join commit
26/// locally, while this commit has not yet been approved by the DS.
27#[derive(Debug)]
28pub struct PendingConversation {
29    inner: PersistedMlsPendingGroup,
30    context: TransactionContext,
31}
32
33impl PendingConversation {
34    pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35        Self { inner, context }
36    }
37
38    pub(crate) fn from_mls_group(
39        group: MlsGroup,
40        custom_cfg: MlsCustomConfiguration,
41        context: TransactionContext,
42    ) -> Result<Self> {
43        let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44        let serialized_group =
45            core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46        let group_id = group.group_id().to_vec();
47
48        let inner = PersistedMlsPendingGroup {
49            id: group_id,
50            state: serialized_group,
51            custom_configuration: serialized_cfg,
52            parent_id: None,
53        };
54        Ok(Self::new(inner, context))
55    }
56
57    async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58        self.context
59            .mls_provider()
60            .await
61            .map_err(RecursiveError::transaction("getting mls provider"))
62            .map_err(Into::into)
63    }
64
65    async fn keystore(&self) -> Result<CryptoKeystore> {
66        let backend = self.mls_provider().await?;
67        Ok(backend.keystore())
68    }
69
70    fn id(&self) -> &ConversationId {
71        &self.inner.id
72    }
73
74    pub(crate) async fn save(&self) -> Result<()> {
75        let keystore = self.keystore().await?;
76        keystore
77            .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78            .await
79            .map_err(KeystoreError::wrap("saving mls pending groups"))
80            .map_err(Into::into)
81    }
82
83    /// Send the commit via [crate::MlsTransport] and handle the response.
84    pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85        let transport = self
86            .context
87            .mls_transport()
88            .await
89            .map_err(RecursiveError::transaction("getting mls transport"))?;
90        let transport = transport.as_ref().ok_or::<Error>(
91            RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92        )?;
93
94        match transport
95            .send_commit_bundle(commit.clone())
96            .await
97            .map_err(RecursiveError::root("sending commit bundle"))?
98        {
99            MlsTransportResponse::Success => Ok(()),
100            MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101            MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102        }
103    }
104
105    /// If the given message is the commit generated by [TransactionContext::join_by_external_commit],
106    /// merge the pending group and restore any buffered messages.
107    ///
108    /// Otherwise, the given message will be buffered.
109    pub async fn try_process_own_join_commit(
110        &mut self,
111        message: impl AsRef<[u8]>,
112    ) -> Result<MlsConversationDecryptMessage> {
113        // If the confirmation tag of the pending group and this incoming message are identical, we can merge the pending group.
114        if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115            return self.merge_and_restore_messages().await;
116        }
117
118        let keystore = self.keystore().await?;
119
120        let pending_msg = MlsPendingMessage {
121            foreign_id: self.id().clone(),
122            message: message.as_ref().to_vec(),
123        };
124        keystore
125            .save::<MlsPendingMessage>(pending_msg)
126            .await
127            .map_err(KeystoreError::wrap("saving mls pending message"))?;
128        Err(Error::BufferedForPendingConversation)
129    }
130
131    /// If the message confirmation tag and the group confirmation tag are the same, it means that
132    /// the external join commit has been accepted by the DS and the pending group can be merged.
133    async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134        let backend = self.mls_provider().await?;
135        let keystore = backend.keystore();
136        // Instantiate the pending group
137        let (group, _cfg) = keystore
138            .mls_pending_groups_load(self.id())
139            .await
140            .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141        let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142            .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144        // The commit is only merged on this temporary instance of the pending group, to enable
145        // calculation of the confirmation tag.
146        mls_group
147            .merge_pending_commit(&backend)
148            .await
149            .map_err(MlsError::wrap("merging pending commit"))?;
150        let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151            .map_err(MlsError::wrap("deserializing mls message"))?;
152        let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153            return Ok(false);
154        };
155        let Some(msg_ct) = public_message.confirmation_tag() else {
156            return Ok(false);
157        };
158        let group_ct = mls_group
159            .compute_confirmation_tag(&backend)
160            .map_err(MlsError::wrap("computing confirmation tag"))?;
161        Ok(*msg_ct == group_ct)
162    }
163
164    /// Merges the [Self] instance and restores any buffered messages.
165    async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166        let buffered_messages = self.merge().await?;
167        let context = &self.context;
168        let backend = self.mls_provider().await?;
169        let id = self.id();
170
171        // This is the now merged conversation
172        let conversation = context
173            .conversation(id)
174            .await
175            .map_err(RecursiveError::transaction("getting conversation by id"))?;
176        let conversation = conversation.conversation().await;
177        let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179        // We return self identity here, probably not necessary to check revocation
180        let own_leaf_credential_with_key = CredentialWithKey {
181            credential: own_leaf.credential().clone(),
182            signature_key: own_leaf.signature_key().clone(),
183        };
184        let identity = own_leaf_credential_with_key
185            .extract_identity(conversation.ciphersuite(), None)
186            .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188        let crl_new_distribution_points = get_new_crl_distribution_points(
189            &backend,
190            extract_crl_uris_from_group(&conversation.group)
191                .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192        )
193        .await
194        .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196        // Note that though we return `has_epoch_changed: true` here, we don't notify the observer.
197        // This function can only be reached via a code path going through `MlsConversation::decrypt_message`
198        // which already notifies the observer; it would be redundant to notify here also.
199
200        // we still support the `has_epoch_changed` field, though we'll remove it later
201        #[expect(deprecated)]
202        Ok(MlsConversationDecryptMessage {
203            app_msg: None,
204            proposals: vec![],
205            is_active: conversation.group.is_active(),
206            delay: conversation.compute_next_commit_delay(),
207            sender_client_id: None,
208            has_epoch_changed: true,
209            identity,
210            buffered_messages,
211            crl_new_distribution_points,
212        })
213    }
214
215    /// This merges the commit generated by [TransactionContext::join_by_external_commit],
216    /// persists the group permanently and deletes the temporary one. After merging, the group
217    /// is fully functional.
218    ///
219    /// # Errors
220    /// Errors resulting from OpenMls, the KeyStore calls and deserialization
221    pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222        let mls_provider = self.mls_provider().await?;
223        let id = self.id();
224        let group = self.inner.state.clone();
225        let cfg = self.inner.custom_configuration.clone();
226
227        let mut mls_group =
228            core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230        // Merge it aka bring the MLS group to life and make it usable
231        mls_group
232            .merge_pending_commit(&mls_provider)
233            .await
234            .map_err(MlsError::wrap("merging pending commit"))?;
235
236        // Restore the custom configuration and build a conversation from it
237        let custom_cfg =
238            serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239        let configuration = MlsConversationConfiguration {
240            ciphersuite: mls_group.ciphersuite().into(),
241            custom: custom_cfg,
242            ..Default::default()
243        };
244
245        // We have to determine the restore policy before we persist the group, because it depends
246        // on whether the group already exists.
247        let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248            // If the group already exists, it means the external commit is about rejoining the group.
249            // This is most of the time a last resort measure (for example when a commit is dropped,
250            // and you go out of sync), so there's no point in decrypting buffered messages
251            trace!("External commit trying to rejoin group");
252            MessageRestorePolicy::ClearOnly
253        } else {
254            MessageRestorePolicy::DecryptAndClear
255        };
256
257        // Persist the now usable MLS group in the keystore
258        let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259            .await
260            .map_err(RecursiveError::mls_conversation(
261                "constructing conversation from mls group",
262            ))?;
263
264        let context = &self.context;
265
266        context
267            .mls_groups()
268            .await
269            .map_err(RecursiveError::transaction("getting mls groups"))?
270            .insert(id.clone(), conversation);
271
272        // This is the now merged conversation
273        let mut conversation = context
274            .conversation(id)
275            .await
276            .map_err(RecursiveError::transaction("getting conversation by id"))?;
277        let pending_messages = conversation
278            .restore_pending_messages(restore_policy)
279            .await
280            .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282        if pending_messages.is_some() {
283            mls_provider
284                .key_store()
285                .remove::<MlsPendingMessage, _>(id)
286                .await
287                .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288        }
289
290        // cleanup the pending group we no longer need
291        self.clear().await?;
292
293        Ok(pending_messages)
294    }
295
296    /// In case the external commit generated by [TransactionContext::join_by_external_commit] is
297    /// rejected by the Delivery Service, and we want to abort this external commit,
298    /// we can wipe out the pending group from the keystore.
299    ///
300    /// # Errors
301    /// Errors resulting from the KeyStore calls
302    pub(crate) async fn clear(&mut self) -> Result<()> {
303        self.keystore()
304            .await?
305            .mls_pending_groups_delete(self.id())
306            .await
307            .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308        Ok(())
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::prelude::MlsConversationDecryptMessage;
316    use crate::test_utils::*;
317    use wasm_bindgen_test::*;
318
319    wasm_bindgen_test_configure!(run_in_browser);
320
321    #[apply(all_cred_cipher)]
322    #[wasm_bindgen_test]
323    async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
324        let [alice, bob, charlie, debbie] = case.sessions().await;
325        Box::pin(async move {
326            let conversation = case.create_conversation([&alice]).await;
327            // Bob tries to join Alice's group with an external commit
328            let (commit_guard, _pending_conversation) = conversation.external_join_unmerged(&bob).await;
329            let external_commit = commit_guard.message();
330
331            // Alice decrypts the external commit...
332            let conversation = commit_guard
333                .notify_member(&alice)
334                .await
335                .process_member_changes()
336                .await
337                .finish();
338
339            // Meanwhile Debbie joins the party by creating an external proposal
340            let proposal_guard = conversation.external_join_proposal(&debbie).await;
341            let external_proposal = proposal_guard.message();
342
343            // ...then Alice generates new messages for this epoch
344            let app_msg = proposal_guard
345                .conversation()
346                .guard()
347                .await
348                .encrypt_message(b"Hello Bob !")
349                .await
350                .unwrap();
351
352            let conversation = proposal_guard.notify_member(&alice).await.finish();
353            let proposal_guard = conversation.update_proposal().await;
354            let proposal = proposal_guard.message();
355            let conversation = proposal_guard.finish();
356
357            let commit_guard = conversation.invite([&charlie]).await;
358            let commit = commit_guard.message();
359            let conversation = commit_guard.process_member_changes().await.finish();
360
361            // And now Bob will have to decrypt those messages while he hasn't yet merged its external commit
362            // To add more fun, he will buffer the messages in exactly the wrong order (to make
363            // sure he reapplies them in the right order afterwards)
364            let messages = vec![commit, external_proposal, proposal]
365                .into_iter()
366                .map(|m| m.to_bytes().unwrap());
367            let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
368                bob.transaction.conversation(conversation.id()).await
369            else {
370                panic!("Bob should not have the conversation yet")
371            };
372            for m in messages {
373                let decrypt = pending_conversation.try_process_own_join_commit(m).await;
374                assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
375            }
376            let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
377            assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
378
379            // Bob should have buffered the messages
380            assert_eq!(bob.transaction.count_entities().await.pending_messages, 4);
381
382            let observer = TestEpochObserver::new();
383            bob.session()
384                .await
385                .register_epoch_observer(observer.clone())
386                .await
387                .unwrap();
388
389            // Finally, Bob receives the green light from the DS and he can merge the external commit
390            let MlsConversationDecryptMessage {
391                buffered_messages: Some(restored_messages),
392                ..
393            } = pending_conversation
394                .try_process_own_join_commit(external_commit.to_bytes().unwrap())
395                .await
396                .unwrap()
397            else {
398                panic!("Alice's messages should have been restored at this point");
399            };
400
401            let observed_epochs = observer.observed_epochs().await;
402            assert_eq!(
403                observed_epochs.len(),
404                1,
405                "we should see exactly 1 epoch change in these 4 messages"
406            );
407            assert_eq!(observed_epochs[0].0, *conversation.id(), "conversation id must match");
408
409            for (idx, msg) in restored_messages.iter().enumerate() {
410                if idx == 0 {
411                    // the only application message
412                    assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
413                } else {
414                    assert!(msg.app_msg.is_none());
415                }
416            }
417
418            assert_eq!(conversation.member_count().await, 4);
419            assert!(
420                conversation
421                    .is_functional_and_contains([&alice, &bob, &charlie, &debbie])
422                    .await
423            );
424
425            // After merging we should erase all those pending messages
426            assert_eq!(bob.transaction.count_entities().await.pending_messages, 0);
427        })
428        .await
429    }
430
431    #[apply(all_cred_cipher)]
432    #[wasm_bindgen_test]
433    async fn should_not_reapply_buffered_messages_when_rejoining(case: TestContext) {
434        use crate::mls;
435
436        let [alice, bob] = case.sessions().await;
437        Box::pin(async move {
438            let conversation = case.create_conversation([&alice, &bob]).await;
439
440            // Alice will never see this commit
441            let conversation = conversation.acting_as(&bob).await;
442            let commit_guard = conversation.update().await;
443            let conversation = commit_guard.conversation();
444
445            let msg1 = conversation.guard().await.encrypt_message("A").await.unwrap();
446            let msg2 = conversation.guard().await.encrypt_message("B").await.unwrap();
447
448            let conversation = commit_guard.finish();
449            // Since Alice missed Bob's commit she should buffer this message
450            let decrypt = conversation.guard().await.decrypt_message(msg1).await;
451            assert!(matches!(
452                decrypt.unwrap_err(),
453                mls::conversation::Error::BufferedFutureMessage { .. }
454            ));
455            let decrypt = conversation.guard().await.decrypt_message(msg2).await;
456            assert!(matches!(
457                decrypt.unwrap_err(),
458                mls::conversation::Error::BufferedFutureMessage { .. }
459            ));
460            assert_eq!(alice.transaction.count_entities().await.pending_messages, 2);
461
462            let conversation = conversation.acting_as(&bob).await.external_join_notify(&alice).await;
463            // Alice should have deleted all her buffered messages
464            assert_eq!(alice.transaction.count_entities().await.pending_messages, 0);
465            assert!(conversation.is_functional_and_contains([&alice, &bob]).await);
466        })
467        .await
468    }
469}