core_crypto/mls/conversation/
pending_conversation.rs

1//! When a client joins a group via an external commit, it sometimes receives messages
2//! (most of the time renewed external proposals) for the new epoch whereas it does not yet have
3//! the confirmation from the DS that the external join commit has been accepted.
4
5use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11    ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12    MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25/// A pending conversation is a conversation that has been created via an external join commit
26/// locally, while this commit has not yet been approved by the DS.
27#[derive(Debug)]
28pub struct PendingConversation {
29    inner: PersistedMlsPendingGroup,
30    context: TransactionContext,
31}
32
33impl PendingConversation {
34    pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35        Self { inner, context }
36    }
37
38    pub(crate) fn from_mls_group(
39        group: MlsGroup,
40        custom_cfg: MlsCustomConfiguration,
41        context: TransactionContext,
42    ) -> Result<Self> {
43        let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44        let serialized_group =
45            core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46        let group_id = group.group_id().to_vec();
47
48        let inner = PersistedMlsPendingGroup {
49            id: group_id,
50            state: serialized_group,
51            custom_configuration: serialized_cfg,
52            parent_id: None,
53        };
54        Ok(Self::new(inner, context))
55    }
56
57    async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58        self.context
59            .mls_provider()
60            .await
61            .map_err(RecursiveError::transaction("getting mls provider"))
62            .map_err(Into::into)
63    }
64
65    async fn keystore(&self) -> Result<CryptoKeystore> {
66        let backend = self.mls_provider().await?;
67        Ok(backend.keystore())
68    }
69
70    fn id(&self) -> &ConversationId {
71        &self.inner.id
72    }
73
74    pub(crate) async fn save(&self) -> Result<()> {
75        let keystore = self.keystore().await?;
76        keystore
77            .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78            .await
79            .map_err(KeystoreError::wrap("saving mls pending groups"))
80            .map_err(Into::into)
81    }
82
83    /// Send the commit via [crate::MlsTransport] and handle the response.
84    pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85        let transport = self
86            .context
87            .mls_transport()
88            .await
89            .map_err(RecursiveError::transaction("getting mls transport"))?;
90        let transport = transport.as_ref().ok_or::<Error>(
91            RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92        )?;
93
94        match transport
95            .send_commit_bundle(commit.clone())
96            .await
97            .map_err(RecursiveError::root("sending commit bundle"))?
98        {
99            MlsTransportResponse::Success => Ok(()),
100            MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101            MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102        }
103    }
104
105    /// If the given message is the commit generated by [TransactionContext::join_by_external_commit],
106    /// merge the pending group and restore any buffered messages.
107    ///
108    /// Otherwise, the given message will be buffered.
109    pub async fn try_process_own_join_commit(
110        &mut self,
111        message: impl AsRef<[u8]>,
112    ) -> Result<MlsConversationDecryptMessage> {
113        // If the confirmation tag of the pending group and this incoming message are identical, we can merge the pending group.
114        if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115            return self.merge_and_restore_messages().await;
116        }
117
118        let keystore = self.keystore().await?;
119
120        let pending_msg = MlsPendingMessage {
121            foreign_id: self.id().clone(),
122            message: message.as_ref().to_vec(),
123        };
124        keystore
125            .save::<MlsPendingMessage>(pending_msg)
126            .await
127            .map_err(KeystoreError::wrap("saving mls pending message"))?;
128        Err(Error::BufferedForPendingConversation)
129    }
130
131    /// If the message confirmation tag and the group confirmation tag are the same, it means that
132    /// the external join commit has been accepted by the DS and the pending group can be merged.
133    async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134        let backend = self.mls_provider().await?;
135        let keystore = backend.keystore();
136        // Instantiate the pending group
137        let (group, _cfg) = keystore
138            .mls_pending_groups_load(self.id())
139            .await
140            .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141        let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142            .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144        // The commit is only merged on this temporary instance of the pending group, to enable
145        // calculation of the confirmation tag.
146        mls_group
147            .merge_pending_commit(&backend)
148            .await
149            .map_err(MlsError::wrap("merging pending commit"))?;
150        let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151            .map_err(MlsError::wrap("deserializing mls message"))?;
152        let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153            return Ok(false);
154        };
155        let Some(msg_ct) = public_message.confirmation_tag() else {
156            return Ok(false);
157        };
158        let group_ct = mls_group
159            .compute_confirmation_tag(&backend)
160            .map_err(MlsError::wrap("computing confirmation tag"))?;
161        Ok(*msg_ct == group_ct)
162    }
163
164    /// Merges the [Self] instance and restores any buffered messages.
165    async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166        let buffered_messages = self.merge().await?;
167        let context = &self.context;
168        let backend = self.mls_provider().await?;
169        let id = self.id();
170
171        // This is the now merged conversation
172        let conversation = context
173            .conversation(id)
174            .await
175            .map_err(RecursiveError::transaction("getting conversation by id"))?;
176        let conversation = conversation.conversation().await;
177        let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179        // We return self identity here, probably not necessary to check revocation
180        let own_leaf_credential_with_key = CredentialWithKey {
181            credential: own_leaf.credential().clone(),
182            signature_key: own_leaf.signature_key().clone(),
183        };
184        let identity = own_leaf_credential_with_key
185            .extract_identity(conversation.ciphersuite(), None)
186            .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188        let crl_new_distribution_points = get_new_crl_distribution_points(
189            &backend,
190            extract_crl_uris_from_group(&conversation.group)
191                .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192        )
193        .await
194        .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196        // Note that though we return `has_epoch_changed: true` here, we don't notify the observer.
197        // This function can only be reached via a code path going through `MlsConversation::decrypt_message`
198        // which already notifies the observer; it would be redundant to notify here also.
199
200        // we still support the `has_epoch_changed` field, though we'll remove it later
201        #[expect(deprecated)]
202        Ok(MlsConversationDecryptMessage {
203            app_msg: None,
204            proposals: vec![],
205            is_active: conversation.group.is_active(),
206            delay: conversation.compute_next_commit_delay(),
207            sender_client_id: None,
208            has_epoch_changed: true,
209            identity,
210            buffered_messages,
211            crl_new_distribution_points,
212        })
213    }
214
215    /// This merges the commit generated by [TransactionContext::join_by_external_commit],
216    /// persists the group permanently and deletes the temporary one. After merging, the group
217    /// is fully functional.
218    ///
219    /// # Errors
220    /// Errors resulting from OpenMls, the KeyStore calls and deserialization
221    pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222        let mls_provider = self.mls_provider().await?;
223        let id = self.id();
224        let group = self.inner.state.clone();
225        let cfg = self.inner.custom_configuration.clone();
226
227        let mut mls_group =
228            core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230        // Merge it aka bring the MLS group to life and make it usable
231        mls_group
232            .merge_pending_commit(&mls_provider)
233            .await
234            .map_err(MlsError::wrap("merging pending commit"))?;
235
236        // Restore the custom configuration and build a conversation from it
237        let custom_cfg =
238            serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239        let configuration = MlsConversationConfiguration {
240            ciphersuite: mls_group.ciphersuite().into(),
241            custom: custom_cfg,
242            ..Default::default()
243        };
244
245        // We have to determine the restore policy before we persist the group, because it depends
246        // on whether the group already exists.
247        let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248            // If the group already exists, it means the external commit is about rejoining the group.
249            // This is most of the time a last resort measure (for example when a commit is dropped,
250            // and you go out of sync), so there's no point in decrypting buffered messages
251            trace!("External commit trying to rejoin group");
252            MessageRestorePolicy::ClearOnly
253        } else {
254            MessageRestorePolicy::DecryptAndClear
255        };
256
257        // Persist the now usable MLS group in the keystore
258        let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259            .await
260            .map_err(RecursiveError::mls_conversation(
261                "constructing conversation from mls group",
262            ))?;
263
264        let context = &self.context;
265
266        context
267            .mls_groups()
268            .await
269            .map_err(RecursiveError::transaction("getting mls groups"))?
270            .insert(id.clone(), conversation);
271
272        // This is the now merged conversation
273        let mut conversation = context
274            .conversation(id)
275            .await
276            .map_err(RecursiveError::transaction("getting conversation by id"))?;
277        let pending_messages = conversation
278            .restore_pending_messages(restore_policy)
279            .await
280            .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282        if pending_messages.is_some() {
283            mls_provider
284                .key_store()
285                .remove::<MlsPendingMessage, _>(id)
286                .await
287                .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288        }
289
290        // cleanup the pending group we no longer need
291        self.clear().await?;
292
293        Ok(pending_messages)
294    }
295
296    /// In case the external commit generated by [TransactionContext::join_by_external_commit] is
297    /// rejected by the Delivery Service, and we want to abort this external commit,
298    /// we can wipe out the pending group from the keystore.
299    ///
300    /// # Errors
301    /// Errors resulting from the KeyStore calls
302    pub(crate) async fn clear(&mut self) -> Result<()> {
303        self.keystore()
304            .await?
305            .mls_pending_groups_delete(self.id())
306            .await
307            .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308        Ok(())
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::mls::conversation::Conversation as _;
316    use crate::prelude::MlsConversationDecryptMessage;
317    use crate::test_utils::*;
318    use wasm_bindgen_test::*;
319
320    wasm_bindgen_test_configure!(run_in_browser);
321
322    #[apply(all_cred_cipher)]
323    #[wasm_bindgen_test]
324    async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
325        let [alice_central, bob_central, charlie_central, debbie_central] = case.sessions().await;
326        Box::pin(async move {
327            let id = conversation_id();
328            alice_central
329                .transaction
330                .new_conversation(&id, case.credential_type, case.cfg.clone())
331                .await
332                .unwrap();
333            // Bob tries to join Alice's group with an external commit
334            let gi = alice_central.get_group_info(&id).await;
335            let (external_commit, _) = bob_central
336                .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
337                .await;
338
339            // Alice decrypts the external commit...
340            alice_central
341                .transaction
342                .conversation(&id)
343                .await
344                .unwrap()
345                .decrypt_message(external_commit.commit.to_bytes().unwrap())
346                .await
347                .unwrap();
348
349            // Meanwhile Debbie joins the party by creating an external proposal
350            let epoch = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
351            let external_proposal = debbie_central
352                .transaction
353                .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
354                .await
355                .unwrap();
356
357            // ...then Alice generates new messages for this epoch
358            let app_msg = alice_central
359                .transaction
360                .conversation(&id)
361                .await
362                .unwrap()
363                .encrypt_message(b"Hello Bob !")
364                .await
365                .unwrap();
366            let proposal = alice_central
367                .transaction
368                .new_update_proposal(&id)
369                .await
370                .unwrap()
371                .proposal;
372            alice_central
373                .transaction
374                .conversation(&id)
375                .await
376                .unwrap()
377                .decrypt_message(external_proposal.to_bytes().unwrap())
378                .await
379                .unwrap();
380            let charlie = charlie_central.rand_key_package(&case).await;
381            alice_central
382                .transaction
383                .conversation(&id)
384                .await
385                .unwrap()
386                .add_members(vec![charlie])
387                .await
388                .unwrap();
389            let commit = alice_central.mls_transport().await.latest_commit_bundle().await;
390            charlie_central
391                .transaction
392                .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
393                .await
394                .unwrap();
395            debbie_central
396                .transaction
397                .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
398                .await
399                .unwrap();
400
401            // And now Bob will have to decrypt those messages while he hasn't yet merged its external commit
402            // To add more fun, he will buffer the messages in exactly the wrong order (to make
403            // sure he reapplies them in the right order afterwards)
404            let messages = vec![commit.commit, external_proposal, proposal]
405                .into_iter()
406                .map(|m| m.to_bytes().unwrap());
407            let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
408                bob_central.transaction.conversation(&id).await
409            else {
410                panic!("Bob should not have the conversation yet")
411            };
412            for m in messages {
413                let decrypt = pending_conversation.try_process_own_join_commit(m).await;
414                assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
415            }
416            let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
417            assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
418
419            // Bob should have buffered the messages
420            assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 4);
421
422            let observer = TestEpochObserver::new();
423            bob_central
424                .session()
425                .await
426                .register_epoch_observer(observer.clone())
427                .await
428                .unwrap();
429
430            // Finally, Bob receives the green light from the DS and he can merge the external commit
431            let MlsConversationDecryptMessage {
432                buffered_messages: Some(restored_messages),
433                ..
434            } = pending_conversation
435                .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
436                .await
437                .unwrap()
438            else {
439                panic!("Alice's messages should have been restored at this point");
440            };
441
442            let observed_epochs = observer.observed_epochs().await;
443            assert_eq!(
444                observed_epochs.len(),
445                1,
446                "we should see exactly 1 epoch change in these 4 messages"
447            );
448            assert_eq!(observed_epochs[0].0, id, "conversation id must match");
449
450            for (idx, msg) in restored_messages.iter().enumerate() {
451                if idx == 0 {
452                    // the only application message
453                    assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
454                } else {
455                    assert!(msg.app_msg.is_none());
456                }
457            }
458
459            // because external commit got merged
460            assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
461            // because Alice's commit got merged
462            assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
463            // because Debbie's external proposal got merged through the commit
464            assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
465
466            // After merging we should erase all those pending messages
467            assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 0);
468        })
469        .await
470    }
471
472    #[apply(all_cred_cipher)]
473    #[wasm_bindgen_test]
474    async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestContext) {
475        use crate::mls;
476
477        let [alice_central, bob_central] = case.sessions().await;
478        Box::pin(async move {
479            let id = conversation_id();
480            alice_central
481                .transaction
482                .new_conversation(&id, case.credential_type, case.cfg.clone())
483                .await
484                .unwrap();
485            alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
486
487            // Alice will never see this commit
488            bob_central
489                .transaction
490                .conversation(&id)
491                .await
492                .unwrap()
493                .update_key_material()
494                .await
495                .unwrap();
496
497            let msg1 = bob_central
498                .transaction
499                .conversation(&id)
500                .await
501                .unwrap()
502                .encrypt_message("A")
503                .await
504                .unwrap();
505            let msg2 = bob_central
506                .transaction
507                .conversation(&id)
508                .await
509                .unwrap()
510                .encrypt_message("B")
511                .await
512                .unwrap();
513
514            // Since Alice missed Bob's commit she should buffer this message
515            let decrypt = alice_central
516                .transaction
517                .conversation(&id)
518                .await
519                .unwrap()
520                .decrypt_message(msg1)
521                .await;
522            assert!(matches!(
523                decrypt.unwrap_err(),
524                mls::conversation::Error::BufferedFutureMessage { .. }
525            ));
526            let decrypt = alice_central
527                .transaction
528                .conversation(&id)
529                .await
530                .unwrap()
531                .decrypt_message(msg2)
532                .await;
533            assert!(matches!(
534                decrypt.unwrap_err(),
535                mls::conversation::Error::BufferedFutureMessage { .. }
536            ));
537            assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 2);
538
539            let gi = bob_central.get_group_info(&id).await;
540            alice_central
541                .transaction
542                .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
543                .await
544                .unwrap();
545
546            let ext_commit = alice_central.mls_transport().await.latest_commit_bundle().await;
547
548            bob_central
549                .transaction
550                .conversation(&id)
551                .await
552                .unwrap()
553                .decrypt_message(ext_commit.commit.to_bytes().unwrap())
554                .await
555                .unwrap();
556            // Alice should have deleted all her buffered messages
557            assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 0);
558        })
559        .await
560    }
561}