core_crypto/mls/conversation/
merge.rs

1//! A MLS group can be merged (aka committed) when it has a pending commit. The latter is a commit
2//! we created which is still waiting to be "committed". By doing so, we will apply all the
3//! modifications present in the commit to the ratchet tree and also persist the new group in the
4//! keystore. Like this, even if the application crashes we will be able to restore.
5//!
6//! This table summarizes when a MLS group can be merged:
7//!
8//! | can be merged ?   | 0 pend. Commit | 1 pend. Commit |
9//! |-------------------|----------------|----------------|
10//! | 0 pend. Proposal  | ❌              | ✅              |
11//! | 1+ pend. Proposal | ❌              | ✅              |
12//!
13
14use core_crypto_keystore::entities::{MlsEncryptionKeyPair, MlsPendingMessage};
15use openmls::prelude::MlsGroupStateError;
16use openmls_traits::OpenMlsCryptoProvider;
17
18use mls_crypto_provider::MlsCryptoProvider;
19
20use crate::context::CentralContext;
21use crate::{
22    mls::{ConversationId, MlsConversation},
23    prelude::{decrypt::MlsBufferedConversationDecryptMessage, MlsProposalRef},
24    CryptoError, CryptoResult, MlsError,
25};
26
27/// Abstraction over a MLS group capable of merging a commit
28impl MlsConversation {
29    /// see [CentralContext::commit_accepted]
30    #[cfg_attr(test, crate::durable)]
31    pub async fn commit_accepted(&mut self, backend: &MlsCryptoProvider) -> CryptoResult<()> {
32        // openmls stores here all the encryption keypairs used for update proposals..
33        let previous_own_leaf_nodes = self.group.own_leaf_nodes.clone();
34
35        self.group.merge_pending_commit(backend).await.map_err(MlsError::from)?;
36        self.persist_group_when_changed(&backend.keystore(), false).await?;
37
38        // ..so if there's any, we clear them after the commit is merged
39        for oln in &previous_own_leaf_nodes {
40            let ek = oln.encryption_key().as_slice();
41            let _ = backend.key_store().remove::<MlsEncryptionKeyPair, _>(ek).await;
42        }
43
44        Ok(())
45    }
46
47    /// see [CentralContext::clear_pending_proposal]
48    #[cfg_attr(test, crate::durable)]
49    pub async fn clear_pending_proposal(
50        &mut self,
51        proposal_ref: MlsProposalRef,
52        backend: &MlsCryptoProvider,
53    ) -> CryptoResult<()> {
54        self.group
55            .remove_pending_proposal(backend.key_store(), &proposal_ref)
56            .await
57            .map_err(|e| match e {
58                MlsGroupStateError::PendingProposalNotFound => CryptoError::PendingProposalNotFound(proposal_ref),
59                _ => CryptoError::from(MlsError::from(e)),
60            })?;
61        self.persist_group_when_changed(&backend.keystore(), true).await?;
62        Ok(())
63    }
64
65    /// see [CentralContext::clear_pending_commit]
66    #[cfg_attr(test, crate::durable)]
67    pub async fn clear_pending_commit(&mut self, backend: &MlsCryptoProvider) -> CryptoResult<()> {
68        if self.group.pending_commit().is_some() {
69            self.group.clear_pending_commit();
70            self.persist_group_when_changed(&backend.keystore(), true).await?;
71            Ok(())
72        } else {
73            Err(CryptoError::PendingCommitNotFound)
74        }
75    }
76}
77
78/// A MLS group is a distributed object scattered across many parties. We use a Delivery Service
79/// to orchestrate those parties. So when we create a commit, a mutable operation, it has to be
80/// validated by the Delivery Service. But it might occur that another group member did the
81/// exact same thing at the same time. So if we arrive second in this race, we must "rollback" the commit
82/// we created and accept ("merge") the other one.
83/// A client would
84/// * Create a commit
85/// * Send the commit to the Delivery Service
86/// * When Delivery Service responds
87///     * 200 OK --> use [CentralContext::commit_accepted] to merge the commit
88///     * 409 CONFLICT --> do nothing. [CentralContext::decrypt_message] will restore the proposals not committed
89///     * 5xx --> retry
90impl CentralContext {
91    /// The commit we created has been accepted by the Delivery Service. Hence it is guaranteed
92    /// to be used for the new epoch.
93    /// We can now safely "merge" it (effectively apply the commit to the group) and update it
94    /// in the keystore. The previous can be discarded to respect Forward Secrecy.
95    pub async fn commit_accepted(
96        &self,
97        id: &ConversationId,
98    ) -> CryptoResult<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
99        let conv = self.get_conversation(id).await?;
100        let mut conv = conv.write().await;
101        conv.commit_accepted(&self.mls_provider().await?).await?;
102
103        let pending_messages = self.restore_pending_messages(&mut conv, false).await?;
104        if pending_messages.is_some() {
105            self.keystore().await?.remove::<MlsPendingMessage, _>(id).await?;
106        }
107        Ok(pending_messages)
108    }
109
110    /// Allows to remove a pending (uncommitted) proposal. Use this when backend rejects the proposal
111    /// you just sent e.g. if permissions have changed meanwhile.
112    ///
113    /// **CAUTION**: only use this when you had an explicit response from the Delivery Service
114    /// e.g. 403 or 409. Do not use otherwise e.g. 5xx responses, timeout etc..
115    ///
116    /// # Arguments
117    /// * `conversation_id` - the group/conversation id
118    /// * `proposal_ref` - unique proposal identifier which is present in [crate::prelude::MlsProposalBundle]
119    ///   and returned from all operation creating a proposal
120    ///
121    /// # Errors
122    /// When the conversation is not found or the proposal reference does not identify a proposal
123    /// in the local pending proposal store
124    pub async fn clear_pending_proposal(
125        &self,
126        conversation_id: &ConversationId,
127        proposal_ref: MlsProposalRef,
128    ) -> CryptoResult<()> {
129        self.get_conversation(conversation_id)
130            .await?
131            .write()
132            .await
133            .clear_pending_proposal(proposal_ref, &self.mls_provider().await?)
134            .await
135    }
136
137    /// Allows to remove a pending commit. Use this when backend rejects the commit
138    /// you just sent e.g. if permissions have changed meanwhile.
139    ///
140    /// **CAUTION**: only use this when you had an explicit response from the Delivery Service
141    /// e.g. 403. Do not use otherwise e.g. 5xx responses, timeout etc..
142    /// **DO NOT** use when Delivery Service responds 409, pending state will be renewed
143    /// in [CentralContext::decrypt_message]
144    ///
145    /// # Arguments
146    /// * `conversation_id` - the group/conversation id
147    ///
148    /// # Errors
149    /// When the conversation is not found or there is no pending commit
150    #[cfg_attr(test, crate::idempotent)]
151    pub async fn clear_pending_commit(&self, conversation_id: &ConversationId) -> CryptoResult<()> {
152        self.get_conversation(conversation_id)
153            .await?
154            .write()
155            .await
156            .clear_pending_commit(&self.mls_provider().await?)
157            .await
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use openmls::prelude::Proposal;
164    use wasm_bindgen_test::*;
165
166    use crate::test_utils::*;
167
168    use super::*;
169
170    wasm_bindgen_test_configure!(run_in_browser);
171
172    mod commit_accepted {
173        use super::*;
174
175        #[apply(all_cred_cipher)]
176        #[wasm_bindgen_test]
177        async fn should_apply_pending_commit(case: TestCase) {
178            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
179                Box::pin(async move {
180                    let id = conversation_id();
181                    alice_central
182                        .context
183                        .new_conversation(&id, case.credential_type, case.cfg.clone())
184                        .await
185                        .unwrap();
186                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
187                    assert_eq!(alice_central.get_conversation_unchecked(&id).await.members().len(), 2);
188                    alice_central
189                        .context
190                        .remove_members_from_conversation(&id, &[bob_central.get_client_id().await])
191                        .await
192                        .unwrap();
193                    assert_eq!(alice_central.get_conversation_unchecked(&id).await.members().len(), 2);
194                    alice_central.context.commit_accepted(&id).await.unwrap();
195                    assert_eq!(alice_central.get_conversation_unchecked(&id).await.members().len(), 1);
196                })
197            })
198            .await
199        }
200
201        #[apply(all_cred_cipher)]
202        #[wasm_bindgen_test]
203        async fn should_clear_pending_commit_and_proposals(case: TestCase) {
204            run_test_with_client_ids(
205                case.clone(),
206                ["alice", "bob"],
207                move |[mut alice_central, bob_central]| {
208                    Box::pin(async move {
209                        let id = conversation_id();
210                        alice_central
211                            .context
212                            .new_conversation(&id, case.credential_type, case.cfg.clone())
213                            .await
214                            .unwrap();
215                        alice_central.context.new_update_proposal(&id).await.unwrap();
216                        let bob = bob_central.rand_key_package(&case).await;
217                        alice_central
218                            .context
219                            .add_members_to_conversation(&id, vec![bob])
220                            .await
221                            .unwrap();
222                        assert!(!alice_central.pending_proposals(&id).await.is_empty());
223                        assert!(alice_central.pending_commit(&id).await.is_some());
224                        alice_central.context.commit_accepted(&id).await.unwrap();
225                        assert!(alice_central.pending_commit(&id).await.is_none());
226                        assert!(alice_central.pending_proposals(&id).await.is_empty());
227                    })
228                },
229            )
230            .await
231        }
232
233        #[apply(all_cred_cipher)]
234        #[wasm_bindgen_test]
235        async fn should_clean_associated_key_material(case: TestCase) {
236            run_test_with_client_ids(case.clone(), ["alice"], move |[alice_central]| {
237                Box::pin(async move {
238                    let id = conversation_id();
239                    alice_central
240                        .context
241                        .new_conversation(&id, case.credential_type, case.cfg.clone())
242                        .await
243                        .unwrap();
244
245                    let initial_count = alice_central.context.count_entities().await;
246
247                    alice_central.context.new_update_proposal(&id).await.unwrap();
248                    let post_proposal_count = alice_central.context.count_entities().await;
249                    assert_eq!(
250                        post_proposal_count.encryption_keypair,
251                        initial_count.encryption_keypair + 1
252                    );
253
254                    alice_central.context.commit_pending_proposals(&id).await.unwrap();
255                    alice_central.context.commit_accepted(&id).await.unwrap();
256
257                    let final_count = alice_central.context.count_entities().await;
258                    assert_eq!(initial_count, final_count);
259                })
260            })
261            .await
262        }
263    }
264
265    mod clear_pending_proposal {
266        use super::*;
267
268        #[apply(all_cred_cipher)]
269        #[wasm_bindgen_test]
270        pub async fn should_remove_proposal(case: TestCase) {
271            run_test_with_client_ids(
272                case.clone(),
273                ["alice", "bob", "charlie"],
274                move |[mut alice_central, bob_central, charlie_central]| {
275                    Box::pin(async move {
276                        let id = conversation_id();
277                        alice_central
278                            .context
279                            .new_conversation(&id, case.credential_type, case.cfg.clone())
280                            .await
281                            .unwrap();
282                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
283                        assert!(alice_central.pending_proposals(&id).await.is_empty());
284
285                        let charlie_kp = charlie_central.get_one_key_package(&case).await;
286                        let add_ref = alice_central
287                            .context
288                            .new_add_proposal(&id, charlie_kp)
289                            .await
290                            .unwrap()
291                            .proposal_ref;
292
293                        let remove_ref = alice_central
294                            .context
295                            .new_remove_proposal(&id, bob_central.get_client_id().await)
296                            .await
297                            .unwrap()
298                            .proposal_ref;
299
300                        let update_ref = alice_central
301                            .context
302                            .new_update_proposal(&id)
303                            .await
304                            .unwrap()
305                            .proposal_ref;
306
307                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 3);
308                        alice_central
309                            .context
310                            .clear_pending_proposal(&id, add_ref)
311                            .await
312                            .unwrap();
313                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 2);
314                        assert!(!alice_central
315                            .pending_proposals(&id)
316                            .await
317                            .into_iter()
318                            .any(|p| matches!(p.proposal(), Proposal::Add(_))));
319
320                        alice_central
321                            .context
322                            .clear_pending_proposal(&id, remove_ref)
323                            .await
324                            .unwrap();
325                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
326                        assert!(!alice_central
327                            .pending_proposals(&id)
328                            .await
329                            .into_iter()
330                            .any(|p| matches!(p.proposal(), Proposal::Remove(_))));
331
332                        alice_central
333                            .context
334                            .clear_pending_proposal(&id, update_ref)
335                            .await
336                            .unwrap();
337                        assert!(alice_central.pending_proposals(&id).await.is_empty());
338                        assert!(!alice_central
339                            .pending_proposals(&id)
340                            .await
341                            .into_iter()
342                            .any(|p| matches!(p.proposal(), Proposal::Update(_))));
343                    })
344                },
345            )
346            .await
347        }
348
349        #[apply(all_cred_cipher)]
350        #[wasm_bindgen_test]
351        pub async fn should_fail_when_conversation_not_found(case: TestCase) {
352            run_test_with_client_ids(case.clone(), ["alice"], move |[alice_central]| {
353                Box::pin(async move {
354                    let id = conversation_id();
355                    let simple_ref = MlsProposalRef::from(vec![0; case.ciphersuite().hash_length()]);
356                    let clear = alice_central.context.clear_pending_proposal(&id, simple_ref).await;
357                    assert!(matches!(clear.unwrap_err(), CryptoError::ConversationNotFound(conv_id) if conv_id == id))
358                })
359            })
360            .await
361        }
362
363        #[apply(all_cred_cipher)]
364        #[wasm_bindgen_test]
365        pub async fn should_fail_when_proposal_ref_not_found(case: TestCase) {
366            run_test_with_client_ids(case.clone(), ["alice"], move |[mut alice_central]| {
367                Box::pin(async move {
368                    let id = conversation_id();
369                    alice_central
370                        .context.new_conversation(&id, case.credential_type, case.cfg.clone())
371                        .await
372                        .unwrap();
373                    assert!(alice_central.pending_proposals(&id).await.is_empty());
374                    let any_ref = MlsProposalRef::from(vec![0; case.ciphersuite().hash_length()]);
375                    let clear = alice_central.context.clear_pending_proposal(&id, any_ref.clone()).await;
376                    assert!(matches!(clear.unwrap_err(), CryptoError::PendingProposalNotFound(prop_ref) if prop_ref == any_ref))
377                })
378            })
379            .await
380        }
381
382        #[apply(all_cred_cipher)]
383        #[wasm_bindgen_test]
384        pub async fn should_clean_associated_key_material(case: TestCase) {
385            run_test_with_client_ids(case.clone(), ["alice"], move |[mut cc]| {
386                Box::pin(async move {
387                    let id = conversation_id();
388                    cc.context
389                        .new_conversation(&id, case.credential_type, case.cfg.clone())
390                        .await
391                        .unwrap();
392                    assert!(cc.pending_proposals(&id).await.is_empty());
393
394                    let init = cc.context.count_entities().await;
395
396                    let proposal_ref = cc.context.new_update_proposal(&id).await.unwrap().proposal_ref;
397                    assert_eq!(cc.pending_proposals(&id).await.len(), 1);
398
399                    cc.context.clear_pending_proposal(&id, proposal_ref).await.unwrap();
400                    assert!(cc.pending_proposals(&id).await.is_empty());
401
402                    // This whole flow should be idempotent.
403                    // Here we verify that we are indeed deleting the `EncryptionKeyPair` created
404                    // for the Update proposal
405                    let after_clear_proposal = cc.context.count_entities().await;
406                    assert_eq!(init, after_clear_proposal);
407                })
408            })
409            .await
410        }
411    }
412
413    mod clear_pending_commit {
414        use super::*;
415
416        #[apply(all_cred_cipher)]
417        #[wasm_bindgen_test]
418        pub async fn should_remove_commit(case: TestCase) {
419            run_test_with_client_ids(case.clone(), ["alice"], move |[alice_central]| {
420                Box::pin(async move {
421                    let id = conversation_id();
422                    alice_central
423                        .context
424                        .new_conversation(&id, case.credential_type, case.cfg.clone())
425                        .await
426                        .unwrap();
427                    assert!(alice_central.pending_commit(&id).await.is_none());
428
429                    alice_central.context.update_keying_material(&id).await.unwrap();
430                    assert!(alice_central.pending_commit(&id).await.is_some());
431                    alice_central.context.clear_pending_commit(&id).await.unwrap();
432                    assert!(alice_central.pending_commit(&id).await.is_none());
433                })
434            })
435            .await
436        }
437
438        #[apply(all_cred_cipher)]
439        #[wasm_bindgen_test]
440        pub async fn should_fail_when_conversation_not_found(case: TestCase) {
441            run_test_with_client_ids(case.clone(), ["alice"], move |[alice_central]| {
442                Box::pin(async move {
443                    let id = conversation_id();
444                    let clear = alice_central.context.clear_pending_commit(&id).await;
445                    assert!(matches!(clear.unwrap_err(), CryptoError::ConversationNotFound(conv_id) if conv_id == id))
446                })
447            })
448            .await
449        }
450
451        #[apply(all_cred_cipher)]
452        #[wasm_bindgen_test]
453        pub async fn should_fail_when_pending_commit_absent(case: TestCase) {
454            run_test_with_client_ids(case.clone(), ["alice"], move |[alice_central]| {
455                Box::pin(async move {
456                    let id = conversation_id();
457                    alice_central
458                        .context
459                        .new_conversation(&id, case.credential_type, case.cfg.clone())
460                        .await
461                        .unwrap();
462                    assert!(alice_central.pending_commit(&id).await.is_none());
463                    let clear = alice_central.context.clear_pending_commit(&id).await;
464                    assert!(matches!(clear.unwrap_err(), CryptoError::PendingCommitNotFound))
465                })
466            })
467            .await
468        }
469
470        #[apply(all_cred_cipher)]
471        #[wasm_bindgen_test]
472        pub async fn should_clean_associated_key_material(case: TestCase) {
473            run_test_with_client_ids(case.clone(), ["alice"], move |[cc]| {
474                Box::pin(async move {
475                    let id = conversation_id();
476                    cc.context
477                        .new_conversation(&id, case.credential_type, case.cfg.clone())
478                        .await
479                        .unwrap();
480                    assert!(cc.pending_commit(&id).await.is_none());
481
482                    let init = cc.context.count_entities().await;
483
484                    cc.context.update_keying_material(&id).await.unwrap();
485                    assert!(cc.pending_commit(&id).await.is_some());
486
487                    cc.context.clear_pending_commit(&id).await.unwrap();
488                    assert!(cc.pending_commit(&id).await.is_none());
489
490                    // This whole flow should be idempotent.
491                    // Here we verify that we are indeed deleting the `EncryptionKeyPair` created
492                    // for the Update commit
493                    let after_clear_commit = cc.context.count_entities().await;
494                    assert_eq!(init, after_clear_commit);
495                })
496            })
497            .await
498        }
499    }
500}