core_crypto/mls/conversation/
pending_conversation.rs

1//! When a client joins a group via an external commit, it sometimes receives messages
2//! (most of the time renewed external proposals) for the new epoch whereas it does not yet have
3//! the confirmation from the DS that the external join commit has been accepted.
4
5use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11    ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12    MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25/// A pending conversation is a conversation that has been created via an external join commit
26/// locally, while this commit has not yet been approved by the DS.
27#[derive(Debug)]
28pub struct PendingConversation {
29    inner: PersistedMlsPendingGroup,
30    context: TransactionContext,
31}
32
33impl PendingConversation {
34    pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35        Self { inner, context }
36    }
37
38    pub(crate) fn from_mls_group(
39        group: MlsGroup,
40        custom_cfg: MlsCustomConfiguration,
41        context: TransactionContext,
42    ) -> Result<Self> {
43        let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44        let serialized_group =
45            core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46        let group_id = group.group_id().to_vec();
47
48        let inner = PersistedMlsPendingGroup {
49            id: group_id,
50            state: serialized_group,
51            custom_configuration: serialized_cfg,
52            parent_id: None,
53        };
54        Ok(Self::new(inner, context))
55    }
56
57    async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58        self.context
59            .mls_provider()
60            .await
61            .map_err(RecursiveError::transaction("getting mls provider"))
62            .map_err(Into::into)
63    }
64
65    async fn keystore(&self) -> Result<CryptoKeystore> {
66        let backend = self.mls_provider().await?;
67        Ok(backend.keystore())
68    }
69
70    fn id(&self) -> &ConversationId {
71        &self.inner.id
72    }
73
74    pub(crate) async fn save(&self) -> Result<()> {
75        let keystore = self.keystore().await?;
76        keystore
77            .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78            .await
79            .map_err(KeystoreError::wrap("saving mls pending groups"))
80            .map_err(Into::into)
81    }
82
83    /// Send the commit via [crate::MlsTransport] and handle the response.
84    pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85        let transport = self
86            .context
87            .mls_transport()
88            .await
89            .map_err(RecursiveError::transaction("getting mls transport"))?;
90        let transport = transport.as_ref().ok_or::<Error>(
91            RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92        )?;
93
94        match transport
95            .send_commit_bundle(commit.clone())
96            .await
97            .map_err(RecursiveError::root("sending commit bundle"))?
98        {
99            MlsTransportResponse::Success => Ok(()),
100            MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101            MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102        }
103    }
104
105    /// If the given message is the commit generated by [TransactionContext::join_by_external_commit],
106    /// merge the pending group and restore any buffered messages.
107    ///
108    /// Otherwise, the given message will be buffered.
109    pub async fn try_process_own_join_commit(
110        &mut self,
111        message: impl AsRef<[u8]>,
112    ) -> Result<MlsConversationDecryptMessage> {
113        // If the confirmation tag of the pending group and this incoming message are identical, we can merge the pending group.
114        if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115            return self.merge_and_restore_messages().await;
116        }
117
118        let keystore = self.keystore().await?;
119
120        let pending_msg = MlsPendingMessage {
121            foreign_id: self.id().clone(),
122            message: message.as_ref().to_vec(),
123        };
124        keystore
125            .save::<MlsPendingMessage>(pending_msg)
126            .await
127            .map_err(KeystoreError::wrap("saving mls pending message"))?;
128        Err(Error::BufferedForPendingConversation)
129    }
130
131    /// If the message confirmation tag and the group confirmation tag are the same, it means that
132    /// the external join commit has been accepted by the DS and the pending group can be merged.
133    async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134        let backend = self.mls_provider().await?;
135        let keystore = backend.keystore();
136        // Instantiate the pending group
137        let (group, _cfg) = keystore
138            .mls_pending_groups_load(self.id())
139            .await
140            .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141        let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142            .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144        // The commit is only merged on this temporary instance of the pending group, to enable
145        // calculation of the confirmation tag.
146        mls_group
147            .merge_pending_commit(&backend)
148            .await
149            .map_err(MlsError::wrap("merging pending commit"))?;
150        let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151            .map_err(MlsError::wrap("deserializing mls message"))?;
152        let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153            return Ok(false);
154        };
155        let Some(msg_ct) = public_message.confirmation_tag() else {
156            return Ok(false);
157        };
158        let group_ct = mls_group
159            .compute_confirmation_tag(&backend)
160            .map_err(MlsError::wrap("computing confirmation tag"))?;
161        Ok(*msg_ct == group_ct)
162    }
163
164    /// Merges the [Self] instance and restores any buffered messages.
165    async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166        let buffered_messages = self.merge().await?;
167        let context = &self.context;
168        let backend = self.mls_provider().await?;
169        let id = self.id();
170
171        // This is the now merged conversation
172        let conversation = context
173            .conversation(id)
174            .await
175            .map_err(RecursiveError::transaction("getting conversation by id"))?;
176        let conversation = conversation.conversation().await;
177        let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179        // We return self identity here, probably not necessary to check revocation
180        let own_leaf_credential_with_key = CredentialWithKey {
181            credential: own_leaf.credential().clone(),
182            signature_key: own_leaf.signature_key().clone(),
183        };
184        let identity = own_leaf_credential_with_key
185            .extract_identity(conversation.ciphersuite(), None)
186            .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188        let crl_new_distribution_points = get_new_crl_distribution_points(
189            &backend,
190            extract_crl_uris_from_group(&conversation.group)
191                .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192        )
193        .await
194        .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196        // Note that though we return `has_epoch_changed: true` here, we don't notify the observer.
197        // This function can only be reached via a code path going through `MlsConversation::decrypt_message`
198        // which already notifies the observer; it would be redundant to notify here also.
199
200        // we still support the `has_epoch_changed` field, though we'll remove it later
201        #[expect(deprecated)]
202        Ok(MlsConversationDecryptMessage {
203            app_msg: None,
204            proposals: vec![],
205            is_active: conversation.group.is_active(),
206            delay: conversation.compute_next_commit_delay(),
207            sender_client_id: None,
208            has_epoch_changed: true,
209            identity,
210            buffered_messages,
211            crl_new_distribution_points,
212        })
213    }
214
215    /// This merges the commit generated by [TransactionContext::join_by_external_commit],
216    /// persists the group permanently and deletes the temporary one. After merging, the group
217    /// is fully functional.
218    ///
219    /// # Errors
220    /// Errors resulting from OpenMls, the KeyStore calls and deserialization
221    pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222        let mls_provider = self.mls_provider().await?;
223        let id = self.id();
224        let group = self.inner.state.clone();
225        let cfg = self.inner.custom_configuration.clone();
226
227        let mut mls_group =
228            core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230        // Merge it aka bring the MLS group to life and make it usable
231        mls_group
232            .merge_pending_commit(&mls_provider)
233            .await
234            .map_err(MlsError::wrap("merging pending commit"))?;
235
236        // Restore the custom configuration and build a conversation from it
237        let custom_cfg =
238            serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239        let configuration = MlsConversationConfiguration {
240            ciphersuite: mls_group.ciphersuite().into(),
241            custom: custom_cfg,
242            ..Default::default()
243        };
244
245        // We have to determine the restore policy before we persist the group, because it depends
246        // on whether the group already exists.
247        let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248            // If the group already exists, it means the external commit is about rejoining the group.
249            // This is most of the time a last resort measure (for example when a commit is dropped,
250            // and you go out of sync), so there's no point in decrypting buffered messages
251            trace!("External commit trying to rejoin group");
252            MessageRestorePolicy::ClearOnly
253        } else {
254            MessageRestorePolicy::DecryptAndClear
255        };
256
257        // Persist the now usable MLS group in the keystore
258        let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259            .await
260            .map_err(RecursiveError::mls_conversation(
261                "constructing conversation from mls group",
262            ))?;
263
264        let context = &self.context;
265
266        context
267            .mls_groups()
268            .await
269            .map_err(RecursiveError::transaction("getting mls groups"))?
270            .insert(id.clone(), conversation);
271
272        // This is the now merged conversation
273        let mut conversation = context
274            .conversation(id)
275            .await
276            .map_err(RecursiveError::transaction("getting conversation by id"))?;
277        let pending_messages = conversation
278            .restore_pending_messages(restore_policy)
279            .await
280            .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282        if pending_messages.is_some() {
283            mls_provider
284                .key_store()
285                .remove::<MlsPendingMessage, _>(id)
286                .await
287                .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288        }
289
290        // cleanup the pending group we no longer need
291        self.clear().await?;
292
293        Ok(pending_messages)
294    }
295
296    /// In case the external commit generated by [TransactionContext::join_by_external_commit] is
297    /// rejected by the Delivery Service, and we want to abort this external commit,
298    /// we can wipe out the pending group from the keystore.
299    ///
300    /// # Errors
301    /// Errors resulting from the KeyStore calls
302    pub(crate) async fn clear(&mut self) -> Result<()> {
303        self.keystore()
304            .await?
305            .mls_pending_groups_delete(self.id())
306            .await
307            .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308        Ok(())
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::mls::conversation::Conversation as _;
316    use crate::prelude::MlsConversationDecryptMessage;
317    use crate::test_utils::*;
318    use wasm_bindgen_test::*;
319
320    wasm_bindgen_test_configure!(run_in_browser);
321
322    #[apply(all_cred_cipher)]
323    #[wasm_bindgen_test]
324    async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
325        run_test_with_client_ids(
326            case.clone(),
327            ["alice", "bob", "charlie", "debbie"],
328            move |[alice_central, bob_central, charlie_central, debbie_central]| {
329                Box::pin(async move {
330                    let id = conversation_id();
331                    alice_central
332                        .transaction
333                        .new_conversation(&id, case.credential_type, case.cfg.clone())
334                        .await
335                        .unwrap();
336                    // Bob tries to join Alice's group with an external commit
337                    let gi = alice_central.get_group_info(&id).await;
338                    let (external_commit, _) = bob_central
339                        .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
340                        .await;
341
342                    // Alice decrypts the external commit...
343                    alice_central
344                        .transaction
345                        .conversation(&id)
346                        .await
347                        .unwrap()
348                        .decrypt_message(external_commit.commit.to_bytes().unwrap())
349                        .await
350                        .unwrap();
351
352                    // Meanwhile Debbie joins the party by creating an external proposal
353                    let epoch = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
354                    let external_proposal = debbie_central
355                        .transaction
356                        .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
357                        .await
358                        .unwrap();
359
360                    // ...then Alice generates new messages for this epoch
361                    let app_msg = alice_central
362                        .transaction
363                        .conversation(&id)
364                        .await
365                        .unwrap()
366                        .encrypt_message(b"Hello Bob !")
367                        .await
368                        .unwrap();
369                    let proposal = alice_central
370                        .transaction
371                        .new_update_proposal(&id)
372                        .await
373                        .unwrap()
374                        .proposal;
375                    alice_central
376                        .transaction
377                        .conversation(&id)
378                        .await
379                        .unwrap()
380                        .decrypt_message(external_proposal.to_bytes().unwrap())
381                        .await
382                        .unwrap();
383                    let charlie = charlie_central.rand_key_package(&case).await;
384                    alice_central
385                        .transaction
386                        .conversation(&id)
387                        .await
388                        .unwrap()
389                        .add_members(vec![charlie])
390                        .await
391                        .unwrap();
392                    let commit = alice_central.mls_transport.latest_commit_bundle().await;
393                    charlie_central
394                        .transaction
395                        .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
396                        .await
397                        .unwrap();
398                    debbie_central
399                        .transaction
400                        .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
401                        .await
402                        .unwrap();
403
404                    // And now Bob will have to decrypt those messages while he hasn't yet merged its external commit
405                    // To add more fun, he will buffer the messages in exactly the wrong order (to make
406                    // sure he reapplies them in the right order afterwards)
407                    let messages = vec![commit.commit, external_proposal, proposal]
408                        .into_iter()
409                        .map(|m| m.to_bytes().unwrap());
410                    let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
411                        bob_central.transaction.conversation(&id).await
412                    else {
413                        panic!("Bob should not have the conversation yet")
414                    };
415                    for m in messages {
416                        let decrypt = pending_conversation.try_process_own_join_commit(m).await;
417                        assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
418                    }
419                    let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
420                    assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
421
422                    // Bob should have buffered the messages
423                    assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 4);
424
425                    let observer = TestEpochObserver::new();
426                    bob_central
427                        .session()
428                        .await
429                        .register_epoch_observer(observer.clone())
430                        .await
431                        .unwrap();
432
433                    // Finally, Bob receives the green light from the DS and he can merge the external commit
434                    let MlsConversationDecryptMessage {
435                        buffered_messages: Some(restored_messages),
436                        ..
437                    } = pending_conversation
438                        .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
439                        .await
440                        .unwrap()
441                    else {
442                        panic!("Alice's messages should have been restored at this point");
443                    };
444
445                    let observed_epochs = observer.observed_epochs().await;
446                    assert_eq!(
447                        observed_epochs.len(),
448                        1,
449                        "we should see exactly 1 epoch change in these 4 messages"
450                    );
451                    assert_eq!(observed_epochs[0].0, id, "conversation id must match");
452
453                    for (idx, msg) in restored_messages.iter().enumerate() {
454                        if idx == 0 {
455                            // the only application message
456                            assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
457                        } else {
458                            assert!(msg.app_msg.is_none());
459                        }
460                    }
461
462                    // because external commit got merged
463                    assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
464                    // because Alice's commit got merged
465                    assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
466                    // because Debbie's external proposal got merged through the commit
467                    assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
468
469                    // After merging we should erase all those pending messages
470                    assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 0);
471                })
472            },
473        )
474        .await
475    }
476
477    #[apply(all_cred_cipher)]
478    #[wasm_bindgen_test]
479    async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestContext) {
480        use crate::mls;
481
482        run_test_with_client_ids(
483            case.clone(),
484            ["alice", "bob"],
485            move |[alice_central, mut bob_central]| {
486                Box::pin(async move {
487                    let id = conversation_id();
488                    alice_central
489                        .transaction
490                        .new_conversation(&id, case.credential_type, case.cfg.clone())
491                        .await
492                        .unwrap();
493                    alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
494
495                    // Alice will never see this commit
496                    bob_central
497                        .transaction
498                        .conversation(&id)
499                        .await
500                        .unwrap()
501                        .update_key_material()
502                        .await
503                        .unwrap();
504
505                    let msg1 = bob_central
506                        .transaction
507                        .conversation(&id)
508                        .await
509                        .unwrap()
510                        .encrypt_message("A")
511                        .await
512                        .unwrap();
513                    let msg2 = bob_central
514                        .transaction
515                        .conversation(&id)
516                        .await
517                        .unwrap()
518                        .encrypt_message("B")
519                        .await
520                        .unwrap();
521
522                    // Since Alice missed Bob's commit she should buffer this message
523                    let decrypt = alice_central
524                        .transaction
525                        .conversation(&id)
526                        .await
527                        .unwrap()
528                        .decrypt_message(msg1)
529                        .await;
530                    assert!(matches!(
531                        decrypt.unwrap_err(),
532                        mls::conversation::Error::BufferedFutureMessage { .. }
533                    ));
534                    let decrypt = alice_central
535                        .transaction
536                        .conversation(&id)
537                        .await
538                        .unwrap()
539                        .decrypt_message(msg2)
540                        .await;
541                    assert!(matches!(
542                        decrypt.unwrap_err(),
543                        mls::conversation::Error::BufferedFutureMessage { .. }
544                    ));
545                    assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 2);
546
547                    let gi = bob_central.get_group_info(&id).await;
548                    alice_central
549                        .transaction
550                        .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
551                        .await
552                        .unwrap();
553
554                    let ext_commit = alice_central.mls_transport.latest_commit_bundle().await;
555
556                    bob_central
557                        .transaction
558                        .conversation(&id)
559                        .await
560                        .unwrap()
561                        .decrypt_message(ext_commit.commit.to_bytes().unwrap())
562                        .await
563                        .unwrap();
564                    // Alice should have deleted all her buffered messages
565                    assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 0);
566                })
567            },
568        )
569        .await
570    }
571}