core_crypto/mls/conversation/
pending_conversation.rs

1//! When a client joins a group via an external commit, it sometimes receives messages
2//! (most of the time renewed external proposals) for the new epoch whereas it does not yet have
3//! the confirmation from the DS that the external join commit has been accepted.
4
5use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::context::CentralContext;
8use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
9use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
10use crate::mls::credential::ext::CredentialExt as _;
11use crate::prelude::{
12    ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
13    MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
14};
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25/// A pending conversation is a conversation that has been created via an external join commit
26/// locally, while this commit has not yet been approved by the DS.
27#[derive(Debug)]
28pub struct PendingConversation {
29    inner: PersistedMlsPendingGroup,
30    context: CentralContext,
31}
32
33impl PendingConversation {
34    pub(crate) fn new(inner: PersistedMlsPendingGroup, context: CentralContext) -> Self {
35        Self { inner, context }
36    }
37
38    pub(crate) fn from_mls_group(
39        group: MlsGroup,
40        custom_cfg: MlsCustomConfiguration,
41        context: CentralContext,
42    ) -> Result<Self> {
43        let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44        let serialized_group =
45            core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46        let group_id = group.group_id().to_vec();
47
48        let inner = PersistedMlsPendingGroup {
49            id: group_id,
50            state: serialized_group,
51            custom_configuration: serialized_cfg,
52            parent_id: None,
53        };
54        Ok(Self::new(inner, context))
55    }
56
57    async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58        self.context
59            .mls_provider()
60            .await
61            .map_err(RecursiveError::root("getting mls provider"))
62            .map_err(Into::into)
63    }
64
65    async fn keystore(&self) -> Result<CryptoKeystore> {
66        let backend = self.mls_provider().await?;
67        Ok(backend.keystore())
68    }
69
70    fn id(&self) -> &ConversationId {
71        &self.inner.id
72    }
73
74    pub(crate) async fn save(&self) -> Result<()> {
75        let keystore = self.keystore().await?;
76        keystore
77            .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78            .await
79            .map_err(KeystoreError::wrap("saving mls pending groups"))
80            .map_err(Into::into)
81    }
82
83    /// Send the commit via [crate::MlsTransport] and handle the response.
84    pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85        let transport = self
86            .context
87            .mls_transport()
88            .await
89            .map_err(RecursiveError::root("getting mls transport"))?;
90        let transport = transport.as_ref().ok_or::<Error>(
91            RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92        )?;
93
94        match transport
95            .send_commit_bundle(commit.clone())
96            .await
97            .map_err(RecursiveError::root("sending commit bundle"))?
98        {
99            MlsTransportResponse::Success => Ok(()),
100            MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101            MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102        }
103    }
104
105    /// If the given message is the commit generated by [CentralContext::join_by_external_commit],
106    /// merge the pending group and restore any buffered messages.
107    ///
108    /// Otherwise, the given message will be buffered.
109    pub async fn try_process_own_join_commit(
110        &mut self,
111        message: impl AsRef<[u8]>,
112    ) -> Result<MlsConversationDecryptMessage> {
113        // If the confirmation tag of the pending group and this incoming message are identical, we can merge the pending group.
114        if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115            return self.merge_and_restore_messages().await;
116        }
117
118        let keystore = self.keystore().await?;
119
120        let pending_msg = MlsPendingMessage {
121            foreign_id: self.id().clone(),
122            message: message.as_ref().to_vec(),
123        };
124        keystore
125            .save::<MlsPendingMessage>(pending_msg)
126            .await
127            .map_err(KeystoreError::wrap("saving mls pending message"))?;
128        Err(Error::BufferedForPendingConversation)
129    }
130
131    /// If the message confirmation tag and the group confirmation tag are the same, it means that
132    /// the external join commit has been accepted by the DS and the pending group can be merged.
133    async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134        let backend = self.mls_provider().await?;
135        let keystore = backend.keystore();
136        // Instantiate the pending group
137        let (group, _cfg) = keystore
138            .mls_pending_groups_load(self.id())
139            .await
140            .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141        let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142            .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144        // The commit is only merged on this temporary instance of the pending group, to enable
145        // calculation of the confirmation tag.
146        mls_group
147            .merge_pending_commit(&backend)
148            .await
149            .map_err(MlsError::wrap("merging pending commit"))?;
150        let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151            .map_err(MlsError::wrap("deserializing mls message"))?;
152        let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153            return Ok(false);
154        };
155        let Some(msg_ct) = public_message.confirmation_tag() else {
156            return Ok(false);
157        };
158        let group_ct = mls_group
159            .compute_confirmation_tag(&backend)
160            .map_err(MlsError::wrap("computing confirmation tag"))?;
161        Ok(*msg_ct == group_ct)
162    }
163
164    /// Merges the [Self] instance and restores any buffered messages.
165    async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166        let buffered_messages = self.merge().await?;
167        let context = &self.context;
168        let backend = self.mls_provider().await?;
169        let id = self.id();
170
171        // This is the now merged conversation
172        let conversation = context.conversation(id).await?;
173        let conversation = conversation.conversation().await;
174        let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
175
176        // We return self identity here, probably not necessary to check revocation
177        let own_leaf_credential_with_key = CredentialWithKey {
178            credential: own_leaf.credential().clone(),
179            signature_key: own_leaf.signature_key().clone(),
180        };
181        let identity = own_leaf_credential_with_key
182            .extract_identity(conversation.ciphersuite(), None)
183            .map_err(RecursiveError::mls_credential("extracting identity"))?;
184
185        let crl_new_distribution_points = get_new_crl_distribution_points(
186            &backend,
187            extract_crl_uris_from_group(&conversation.group)
188                .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
189        )
190        .await
191        .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
192
193        // Note that though we return `has_epoch_changed: true` here, we don't notify the observer.
194        // This function can only be reached via a code path going through `MlsConversation::decrypt_message`
195        // which already notifies the observer; it would be redundant to notify here also.
196
197        // we still support the `has_epoch_changed` field, though we'll remove it later
198        #[expect(deprecated)]
199        Ok(MlsConversationDecryptMessage {
200            app_msg: None,
201            proposals: vec![],
202            is_active: conversation.group.is_active(),
203            delay: conversation.compute_next_commit_delay(),
204            sender_client_id: None,
205            has_epoch_changed: true,
206            identity,
207            buffered_messages,
208            crl_new_distribution_points,
209        })
210    }
211
212    /// This merges the commit generated by [CentralContext::join_by_external_commit],
213    /// persists the group permanently and deletes the temporary one. After merging, the group
214    /// is fully functional.
215    ///
216    /// # Errors
217    /// Errors resulting from OpenMls, the KeyStore calls and deserialization
218    pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
219        let mls_provider = self.mls_provider().await?;
220        let id = self.id();
221        let group = self.inner.state.clone();
222        let cfg = self.inner.custom_configuration.clone();
223
224        let mut mls_group =
225            core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
226
227        // Merge it aka bring the MLS group to life and make it usable
228        mls_group
229            .merge_pending_commit(&mls_provider)
230            .await
231            .map_err(MlsError::wrap("merging pending commit"))?;
232
233        // Restore the custom configuration and build a conversation from it
234        let custom_cfg =
235            serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
236        let configuration = MlsConversationConfiguration {
237            ciphersuite: mls_group.ciphersuite().into(),
238            custom: custom_cfg,
239            ..Default::default()
240        };
241
242        // We have to determine the restore policy before we persist the group, because it depends
243        // on whether the group already exists.
244        let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
245            // If the group already exists, it means the external commit is about rejoining the group.
246            // This is most of the time a last resort measure (for example when a commit is dropped,
247            // and you go out of sync), so there's no point in decrypting buffered messages
248            trace!("External commit trying to rejoin group");
249            MessageRestorePolicy::ClearOnly
250        } else {
251            MessageRestorePolicy::DecryptAndClear
252        };
253
254        // Persist the now usable MLS group in the keystore
255        let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
256            .await
257            .map_err(RecursiveError::mls_conversation(
258                "constructing conversation from mls group",
259            ))?;
260
261        let context = &self.context;
262
263        context
264            .mls_groups()
265            .await
266            .map_err(RecursiveError::root("getting mls groups"))?
267            .insert(id.clone(), conversation);
268
269        // This is the now merged conversation
270        let mut conversation = context.conversation(id).await?;
271        let pending_messages = conversation
272            .restore_pending_messages(restore_policy)
273            .await
274            .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
275
276        if pending_messages.is_some() {
277            mls_provider
278                .key_store()
279                .remove::<MlsPendingMessage, _>(id)
280                .await
281                .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
282        }
283
284        // cleanup the pending group we no longer need
285        self.clear().await?;
286
287        Ok(pending_messages)
288    }
289
290    /// In case the external commit generated by [CentralContext::join_by_external_commit] is
291    /// rejected by the Delivery Service, and we want to abort this external commit,
292    /// we can wipe out the pending group from the keystore.
293    ///
294    /// # Errors
295    /// Errors resulting from the KeyStore calls
296    pub(crate) async fn clear(&mut self) -> Result<()> {
297        self.keystore()
298            .await?
299            .mls_pending_groups_delete(self.id())
300            .await
301            .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
302        Ok(())
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::mls::conversation::Conversation as _;
310    use crate::prelude::MlsConversationDecryptMessage;
311    use crate::test_utils::*;
312    use wasm_bindgen_test::*;
313
314    wasm_bindgen_test_configure!(run_in_browser);
315
316    #[apply(all_cred_cipher)]
317    #[wasm_bindgen_test]
318    async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestCase) {
319        run_test_with_client_ids(
320            case.clone(),
321            ["alice", "bob", "charlie", "debbie"],
322            move |[alice_central, bob_central, charlie_central, debbie_central]| {
323                Box::pin(async move {
324                    let id = conversation_id();
325                    alice_central
326                        .context
327                        .new_conversation(&id, case.credential_type, case.cfg.clone())
328                        .await
329                        .unwrap();
330                    // Bob tries to join Alice's group with an external commit
331                    let gi = alice_central.get_group_info(&id).await;
332                    let (external_commit, _) = bob_central
333                        .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
334                        .await;
335
336                    // Alice decrypts the external commit...
337                    alice_central
338                        .context
339                        .conversation(&id)
340                        .await
341                        .unwrap()
342                        .decrypt_message(external_commit.commit.to_bytes().unwrap())
343                        .await
344                        .unwrap();
345
346                    // Meanwhile Debbie joins the party by creating an external proposal
347                    let epoch = alice_central.context.conversation(&id).await.unwrap().epoch().await;
348                    let external_proposal = debbie_central
349                        .context
350                        .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
351                        .await
352                        .unwrap();
353
354                    // ...then Alice generates new messages for this epoch
355                    let app_msg = alice_central
356                        .context
357                        .conversation(&id)
358                        .await
359                        .unwrap()
360                        .encrypt_message(b"Hello Bob !")
361                        .await
362                        .unwrap();
363                    let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
364                    alice_central
365                        .context
366                        .conversation(&id)
367                        .await
368                        .unwrap()
369                        .decrypt_message(external_proposal.to_bytes().unwrap())
370                        .await
371                        .unwrap();
372                    let charlie = charlie_central.rand_key_package(&case).await;
373                    alice_central
374                        .context
375                        .conversation(&id)
376                        .await
377                        .unwrap()
378                        .add_members(vec![charlie])
379                        .await
380                        .unwrap();
381                    let commit = alice_central.mls_transport.latest_commit_bundle().await;
382                    charlie_central
383                        .context
384                        .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
385                        .await
386                        .unwrap();
387                    debbie_central
388                        .context
389                        .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
390                        .await
391                        .unwrap();
392
393                    // And now Bob will have to decrypt those messages while he hasn't yet merged its external commit
394                    // To add more fun, he will buffer the messages in exactly the wrong order (to make
395                    // sure he reapplies them in the right order afterwards)
396                    let messages = vec![commit.commit, external_proposal, proposal]
397                        .into_iter()
398                        .map(|m| m.to_bytes().unwrap());
399                    let Err(Error::PendingConversation(mut pending_conversation)) =
400                        bob_central.context.conversation(&id).await
401                    else {
402                        panic!("Bob should not have the conversation yet")
403                    };
404                    for m in messages {
405                        let decrypt = pending_conversation.try_process_own_join_commit(m).await;
406                        assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
407                    }
408                    let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
409                    assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
410
411                    // Bob should have buffered the messages
412                    assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
413
414                    let observer = TestEpochObserver::new();
415                    bob_central
416                        .client()
417                        .await
418                        .register_epoch_observer(observer.clone())
419                        .await
420                        .unwrap();
421
422                    // Finally, Bob receives the green light from the DS and he can merge the external commit
423                    let MlsConversationDecryptMessage {
424                        buffered_messages: Some(restored_messages),
425                        ..
426                    } = pending_conversation
427                        .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
428                        .await
429                        .unwrap()
430                    else {
431                        panic!("Alice's messages should have been restored at this point");
432                    };
433
434                    let observed_epochs = observer.observed_epochs().await;
435                    assert_eq!(
436                        observed_epochs.len(),
437                        1,
438                        "we should see exactly 1 epoch change in these 4 messages"
439                    );
440                    assert_eq!(observed_epochs[0].0, id, "conversation id must match");
441
442                    for (idx, msg) in restored_messages.iter().enumerate() {
443                        if idx == 0 {
444                            // the only application message
445                            assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
446                        } else {
447                            assert!(msg.app_msg.is_none());
448                        }
449                    }
450
451                    // because external commit got merged
452                    assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
453                    // because Alice's commit got merged
454                    assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
455                    // because Debbie's external proposal got merged through the commit
456                    assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
457
458                    // After merging we should erase all those pending messages
459                    assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
460                })
461            },
462        )
463        .await
464    }
465
466    #[apply(all_cred_cipher)]
467    #[wasm_bindgen_test]
468    async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestCase) {
469        use crate::mls;
470
471        run_test_with_client_ids(
472            case.clone(),
473            ["alice", "bob"],
474            move |[alice_central, mut bob_central]| {
475                Box::pin(async move {
476                    let id = conversation_id();
477                    alice_central
478                        .context
479                        .new_conversation(&id, case.credential_type, case.cfg.clone())
480                        .await
481                        .unwrap();
482                    alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
483
484                    // Alice will never see this commit
485                    bob_central
486                        .context
487                        .conversation(&id)
488                        .await
489                        .unwrap()
490                        .update_key_material()
491                        .await
492                        .unwrap();
493
494                    let msg1 = bob_central
495                        .context
496                        .conversation(&id)
497                        .await
498                        .unwrap()
499                        .encrypt_message("A")
500                        .await
501                        .unwrap();
502                    let msg2 = bob_central
503                        .context
504                        .conversation(&id)
505                        .await
506                        .unwrap()
507                        .encrypt_message("B")
508                        .await
509                        .unwrap();
510
511                    // Since Alice missed Bob's commit she should buffer this message
512                    let decrypt = alice_central
513                        .context
514                        .conversation(&id)
515                        .await
516                        .unwrap()
517                        .decrypt_message(msg1)
518                        .await;
519                    assert!(matches!(
520                        decrypt.unwrap_err(),
521                        mls::conversation::Error::BufferedFutureMessage { .. }
522                    ));
523                    let decrypt = alice_central
524                        .context
525                        .conversation(&id)
526                        .await
527                        .unwrap()
528                        .decrypt_message(msg2)
529                        .await;
530                    assert!(matches!(
531                        decrypt.unwrap_err(),
532                        mls::conversation::Error::BufferedFutureMessage { .. }
533                    ));
534                    assert_eq!(alice_central.context.count_entities().await.pending_messages, 2);
535
536                    let gi = bob_central.get_group_info(&id).await;
537                    alice_central
538                        .context
539                        .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
540                        .await
541                        .unwrap();
542
543                    let ext_commit = alice_central.mls_transport.latest_commit_bundle().await;
544
545                    bob_central
546                        .context
547                        .conversation(&id)
548                        .await
549                        .unwrap()
550                        .decrypt_message(ext_commit.commit.to_bytes().unwrap())
551                        .await
552                        .unwrap();
553                    // Alice should have deleted all her buffered messages
554                    assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
555                })
556            },
557        )
558        .await
559    }
560}