core_crypto/mls/conversation/
buffer_messages.rs

1//! This file is intended to fix some issues we have with the Delivery Service. Sometimes, clients
2//! receive for the next epoch before receiving the commit for this epoch.
3//!
4//! Feel free to delete all of this when the issue is fixed on the DS side !
5
6use crate::context::CentralContext;
7use crate::obfuscate::Obfuscated;
8use crate::{
9    group_store::GroupStoreValue,
10    prelude::{
11        decrypt::MlsBufferedConversationDecryptMessage, Client, ConversationId, CoreCryptoCallbacks, CryptoError,
12        CryptoResult, MlsConversation, MlsError,
13    },
14};
15use core_crypto_keystore::{
16    connection::FetchFromDatabase,
17    entities::{EntityFindParams, MlsPendingMessage},
18};
19use log::{error, info, trace};
20use mls_crypto_provider::MlsCryptoProvider;
21use openmls::prelude::{MlsMessageIn, MlsMessageInBody};
22use tls_codec::Deserialize;
23
24impl CentralContext {
25    pub(crate) async fn handle_future_message(
26        &self,
27        id: &ConversationId,
28        message: impl AsRef<[u8]>,
29    ) -> CryptoResult<()> {
30        let keystore = self.keystore().await?;
31
32        let pending_msg = MlsPendingMessage {
33            foreign_id: id.clone(),
34            message: message.as_ref().to_vec(),
35        };
36        keystore.save::<MlsPendingMessage>(pending_msg).await?;
37        Ok(())
38    }
39
40    pub(crate) async fn restore_pending_messages(
41        &self,
42        conversation: &mut MlsConversation,
43        is_rejoin: bool,
44    ) -> CryptoResult<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
45        let parent_conversation = match &conversation.parent_id {
46            Some(id) => self.get_conversation(id).await.ok(),
47            _ => None,
48        };
49        let guard = self.callbacks().await?;
50        let callbacks = guard.as_ref().map(|boxed| boxed.as_ref());
51        let client = &self.mls_client().await?;
52        let mls_provider = self.mls_provider().await?;
53        conversation
54            .restore_pending_messages(
55                client,
56                &mls_provider,
57                callbacks,
58                parent_conversation.as_ref(),
59                is_rejoin,
60            )
61            .await
62    }
63}
64
65impl MlsConversation {
66    #[cfg_attr(target_family = "wasm", async_recursion::async_recursion(?Send))]
67    #[cfg_attr(not(target_family = "wasm"), async_recursion::async_recursion)]
68    pub(crate) async fn restore_pending_messages<'a>(
69        &'a mut self,
70        client: &'a Client,
71        backend: &'a MlsCryptoProvider,
72        callbacks: Option<&'a dyn CoreCryptoCallbacks>,
73        parent_conversation: Option<&'a GroupStoreValue<Self>>,
74        is_rejoin: bool,
75    ) -> CryptoResult<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
76        // using the macro produces a clippy warning
77        let result = async move {
78            let keystore = backend.keystore();
79            let group_id = self.id().as_slice();
80            if is_rejoin {
81                // This means the external commit is about rejoining the group.
82                // This is most of the time a last resort measure (for example when a commit is dropped)
83                // and you go out of sync so there's no point in decrypting buffered messages
84
85                trace!("External commit trying to rejoin group");
86                if keystore.find::<MlsPendingMessage>(group_id).await?.is_some() {
87                    keystore.remove::<MlsPendingMessage, _>(group_id).await?;
88                }
89                return Ok(None);
90            }
91
92            let mut pending_messages = keystore
93                .find_all::<MlsPendingMessage>(EntityFindParams::default())
94                .await?
95                .into_iter()
96                .filter(|pm| pm.foreign_id == group_id)
97                .try_fold(vec![], |mut acc, m| {
98                    let msg = MlsMessageIn::tls_deserialize(&mut m.message.as_slice()).map_err(MlsError::from)?;
99                    let ct = match msg.body_as_ref() {
100                        MlsMessageInBody::PublicMessage(m) => Ok(m.content_type()),
101                        MlsMessageInBody::PrivateMessage(m) => Ok(m.content_type()),
102                        _ => Err(CryptoError::ConsumerError),
103                    }?;
104                    acc.push((ct as u8, msg));
105                    CryptoResult::Ok(acc)
106                })?;
107
108            // We want to restore application messages first, then Proposals & finally Commits
109            // luckily for us that's the exact same order as the [ContentType] enum
110            pending_messages.sort_by(|(a, _), (b, _)| a.cmp(b));
111
112            info!(group_id = Obfuscated::from(&self.id); "Attempting to restore {} buffered messages", pending_messages.len());
113
114            let mut decrypted_messages = Vec::with_capacity(pending_messages.len());
115            for (_, m) in pending_messages {
116                let parent_conversation = match &self.parent_id {
117                    Some(_) => Some(parent_conversation.ok_or(CryptoError::ParentGroupNotFound)?),
118                    _ => None,
119                };
120                let restore_pending = false; // to prevent infinite recursion
121                let decrypted = self
122                    .decrypt_message(m, parent_conversation, client, backend, callbacks, restore_pending)
123                    .await?;
124                decrypted_messages.push(decrypted.into());
125            }
126
127            let decrypted_messages = (!decrypted_messages.is_empty()).then_some(decrypted_messages);
128
129            Ok(decrypted_messages)
130        }
131        .await;
132        match result {
133            Ok(r) => Ok(r),
134            Err(e) => {
135                error!(error:% = e; "Error restoring pending messages");
136                Err(e)
137            }
138        }
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use crate::{test_utils::*, CryptoError};
145    use wasm_bindgen_test::*;
146
147    wasm_bindgen_test_configure!(run_in_browser);
148
149    #[apply(all_cred_cipher)]
150    #[wasm_bindgen_test]
151    async fn should_buffer_and_reapply_messages_after_commit_merged_for_sender(case: TestCase) {
152        run_test_with_client_ids(
153            case.clone(),
154            ["alice", "bob", "charlie", "debbie"],
155            move |[alice_central, bob_central, charlie_central, debbie_central]| {
156                Box::pin(async move {
157                    let id = conversation_id();
158                    alice_central
159                        .context
160                        .new_conversation(&id, case.credential_type, case.cfg.clone())
161                        .await
162                        .unwrap();
163                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
164
165                    // Bob creates a commit but won't merge it immediately
166                    let unmerged_commit = bob_central.context.update_keying_material(&id).await.unwrap();
167
168                    // Alice decrypts the commit...
169                    alice_central
170                        .context
171                        .decrypt_message(&id, unmerged_commit.commit.to_bytes().unwrap())
172                        .await
173                        .unwrap();
174
175                    // Meanwhile Debbie joins the party by creating an external proposal
176                    let epoch = alice_central.context.conversation_epoch(&id).await.unwrap();
177                    let external_proposal = debbie_central
178                        .context
179                        .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
180                        .await
181                        .unwrap();
182
183                    // ...then Alice generates new messages for this epoch
184                    let app_msg = alice_central
185                        .context
186                        .encrypt_message(&id, b"Hello Bob !")
187                        .await
188                        .unwrap();
189                    let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
190                    alice_central
191                        .context
192                        .decrypt_message(&id, external_proposal.to_bytes().unwrap())
193                        .await
194                        .unwrap();
195                    let charlie = charlie_central.rand_key_package(&case).await;
196                    let commit = alice_central
197                        .context
198                        .add_members_to_conversation(&id, vec![charlie])
199                        .await
200                        .unwrap();
201                    alice_central.context.commit_accepted(&id).await.unwrap();
202                    charlie_central
203                        .context
204                        .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
205                        .await
206                        .unwrap();
207                    debbie_central
208                        .context
209                        .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
210                        .await
211                        .unwrap();
212
213                    // And now Bob will have to decrypt those messages while he hasn't yet merged its commit
214                    // To add more fun, he will buffer the messages in exactly the wrong order (to make
215                    // sure he reapplies them in the right order afterwards)
216                    let messages = vec![commit.commit, external_proposal, proposal]
217                        .into_iter()
218                        .map(|m| m.to_bytes().unwrap());
219                    for m in messages {
220                        let decrypt = bob_central.context.decrypt_message(&id, m).await;
221                        assert!(matches!(
222                            decrypt.unwrap_err(),
223                            CryptoError::BufferedFutureMessage { .. }
224                        ));
225                    }
226                    let decrypt = bob_central.context.decrypt_message(&id, app_msg).await;
227                    assert!(matches!(
228                        decrypt.unwrap_err(),
229                        CryptoError::BufferedFutureMessage { .. }
230                    ));
231
232                    // Bob should have buffered the messages
233                    assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
234
235                    // Finally, Bob receives the green light from the DS and he can merge the external commit
236                    let Some(restored_messages) = bob_central.context.commit_accepted(&id).await.unwrap() else {
237                        panic!("Alice's messages should have been restored at this point");
238                    };
239                    for (i, m) in restored_messages.into_iter().enumerate() {
240                        match i {
241                            0 => {
242                                // this is the application message
243                                assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !");
244                                assert!(!m.has_epoch_changed);
245                            }
246                            1 | 2 => {
247                                // this is either the member or the external proposal
248                                assert!(m.app_msg.is_none());
249                                assert!(!m.has_epoch_changed);
250                            }
251                            3 => {
252                                // this is the commit
253                                assert!(m.app_msg.is_none());
254                                assert!(m.has_epoch_changed);
255                            }
256                            _ => unreachable!(),
257                        }
258                    }
259                    // because external commit got merged
260                    assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
261                    // because Alice's commit got merged
262                    assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
263                    // because Debbie's external proposal got merged through the commit
264                    assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
265
266                    // After merging we should erase all those pending messages
267                    assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
268                })
269            },
270        )
271        .await
272    }
273
274    #[apply(all_cred_cipher)]
275    #[wasm_bindgen_test]
276    async fn should_buffer_and_reapply_messages_after_commit_merged_for_receivers(case: TestCase) {
277        if !case.is_pure_ciphertext() {
278            run_test_with_client_ids(
279                case.clone(),
280                ["alice", "bob", "charlie", "debbie"],
281                move |[alice_central, bob_central, charlie_central, debbie_central]| {
282                    Box::pin(async move {
283                        let id = conversation_id();
284                        alice_central
285                            .context
286                            .new_conversation(&id, case.credential_type, case.cfg.clone())
287                            .await
288                            .unwrap();
289
290                        // Bob joins the group with an external commit...
291                        let gi = alice_central.get_group_info(&id).await;
292                        let ext_commit = bob_central
293                            .context
294                            .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
295                            .await
296                            .unwrap();
297                        bob_central
298                            .context
299                            .merge_pending_group_from_external_commit(&id)
300                            .await
301                            .unwrap();
302
303                        // And before others had the chance to get the commit, Bob will create & send messages in the next epoch
304                        // which Alice will have to buffer until she receives the commit.
305                        // This simulates what the DS does with unordered messages
306                        let epoch = bob_central.context.conversation_epoch(&id).await.unwrap();
307                        let external_proposal = charlie_central
308                            .context
309                            .new_external_add_proposal(
310                                id.clone(),
311                                epoch.into(),
312                                case.ciphersuite(),
313                                case.credential_type,
314                            )
315                            .await
316                            .unwrap();
317                        let app_msg = bob_central
318                            .context
319                            .encrypt_message(&id, b"Hello Alice !")
320                            .await
321                            .unwrap();
322                        let proposal = bob_central.context.new_update_proposal(&id).await.unwrap().proposal;
323                        bob_central
324                            .context
325                            .decrypt_message(&id, external_proposal.to_bytes().unwrap())
326                            .await
327                            .unwrap();
328                        let debbie = debbie_central.rand_key_package(&case).await;
329                        let commit = bob_central
330                            .context
331                            .add_members_to_conversation(&id, vec![debbie])
332                            .await
333                            .unwrap();
334                        bob_central.context.commit_accepted(&id).await.unwrap();
335                        charlie_central
336                            .context
337                            .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
338                            .await
339                            .unwrap();
340                        debbie_central
341                            .context
342                            .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
343                            .await
344                            .unwrap();
345
346                        // And now Alice will have to decrypt those messages while he hasn't yet merged the commit
347                        // To add more fun, he will buffer the messages in exactly the wrong order (to make
348                        // sure he reapplies them in the right order afterwards)
349                        let messages = vec![commit.commit, external_proposal, proposal]
350                            .into_iter()
351                            .map(|m| m.to_bytes().unwrap());
352                        for m in messages {
353                            let decrypt = alice_central.context.decrypt_message(&id, m).await;
354                            assert!(matches!(
355                                decrypt.unwrap_err(),
356                                CryptoError::BufferedFutureMessage { .. }
357                            ));
358                        }
359                        let decrypt = alice_central.context.decrypt_message(&id, app_msg).await;
360                        assert!(matches!(
361                            decrypt.unwrap_err(),
362                            CryptoError::BufferedFutureMessage { .. }
363                        ));
364
365                        // Alice should have buffered the messages
366                        assert_eq!(alice_central.context.count_entities().await.pending_messages, 4);
367
368                        // Finally, Alice receives the original commit for this epoch
369                        let original_commit = ext_commit.commit.to_bytes().unwrap();
370
371                        let Some(restored_messages) = alice_central
372                            .context
373                            .decrypt_message(&id, original_commit)
374                            .await
375                            .unwrap()
376                            .buffered_messages
377                        else {
378                            panic!("Bob's messages should have been restored at this point");
379                        };
380                        for (i, m) in restored_messages.into_iter().enumerate() {
381                            match i {
382                                0 => {
383                                    // this is the application message
384                                    assert_eq!(&m.app_msg.unwrap(), b"Hello Alice !");
385                                    assert!(!m.has_epoch_changed);
386                                }
387                                1 | 2 => {
388                                    // this is either the member or the external proposal
389                                    assert!(m.app_msg.is_none());
390                                    assert!(!m.has_epoch_changed);
391                                }
392                                3 => {
393                                    // this is the commit
394                                    assert!(m.app_msg.is_none());
395                                    assert!(m.has_epoch_changed);
396                                }
397                                _ => unreachable!(),
398                            }
399                        }
400                        // because external commit got merged
401                        assert!(alice_central.try_talk_to(&id, &bob_central).await.is_ok());
402                        // because Alice's commit got merged
403                        assert!(alice_central.try_talk_to(&id, &charlie_central).await.is_ok());
404                        // because Debbie's external proposal got merged through the commit
405                        assert!(alice_central.try_talk_to(&id, &debbie_central).await.is_ok());
406
407                        // After merging we should erase all those pending messages
408                        assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
409                    })
410                },
411            )
412            .await
413        }
414    }
415
416    /// Replicating [WPB-15810]
417    ///
418    /// [WPB-15810]: https://wearezeta.atlassian.net/browse/WPB-15810
419    #[apply(all_cred_cipher)]
420    async fn wpb_15810(case: TestCase) {
421        use openmls::{
422            group::GroupId,
423            prelude::{ExternalProposal, SenderExtensionIndex},
424        };
425
426        if case.is_pure_ciphertext() {
427            // The use case tested here requires inspecting your own commit.
428            // Openmls does not support this currently when protocol messages are encrypted.
429            return;
430        }
431        run_test_with_client_ids(
432            case.clone(),
433            ["external_0", "new_member", "member_27", "observer", "114", "115"],
434            move |[external_0, new_member, member_27, observer, member_114, member_115]| {
435                Box::pin(async move {
436                    // scenario start: everyone except "new_member" is in the conversation
437                    let conv_id = conversation_id();
438
439                    // set up external_0 as the backend / delivery service
440                    let signature_key = external_0.client_signature_key(&case).await.as_slice().to_vec();
441                    let mut config = case.cfg.clone();
442                    observer
443                        .context
444                        .set_raw_external_senders(&mut config, vec![signature_key])
445                        .await
446                        .unwrap();
447
448                    // create and initialize the conversation
449                    observer
450                        .context
451                        .new_conversation(&conv_id, case.credential_type, config)
452                        .await
453                        .unwrap();
454
455                    // everyone else except new_member joins (also except observer, who created it)
456                    observer
457                        .invite_all(&case, &conv_id, [&member_114, &member_115, &member_27])
458                        .await
459                        .unwrap();
460
461                    // Everyone should agree on the overall state here, to wit: the group consists of everyone
462                    // except "new_member", and "external_0", and no messages have been sent.
463                    // At this point only the observer is going to receive messages, because that shouldn't impact group state.
464
465                    // external 0 sends a proposal to remove 114
466                    let leaf_of_114 = observer.index_of(&conv_id, member_114.get_client_id().await).await;
467                    let sender_index = SenderExtensionIndex::new(0);
468                    let sc = case.signature_scheme();
469                    let ct = case.credential_type;
470                    let cb = external_0.find_most_recent_credential_bundle(sc, ct).await.unwrap();
471                    let group_id = GroupId::from_slice(&conv_id[..]);
472                    let epoch = observer.get_conversation_unchecked(&conv_id).await.group.epoch();
473                    let proposal_remove_114_1 = ExternalProposal::new_remove(
474                        leaf_of_114,
475                        group_id.clone(),
476                        epoch,
477                        &cb.signature_key,
478                        sender_index,
479                    )
480                    .unwrap();
481
482                    // now bump the epoch in external_0: the new member has joined
483                    let new_member_join_commit = new_member
484                        .context
485                        .join_by_external_commit(
486                            observer.get_group_info(&conv_id).await,
487                            case.custom_cfg(),
488                            case.credential_type,
489                        )
490                        .await
491                        .unwrap()
492                        .commit;
493
494                    new_member
495                        .context
496                        .merge_pending_group_from_external_commit(&conv_id)
497                        .await
498                        .unwrap();
499
500                    // also create the same proposal with the epoch increased by 1
501                    let leaf_of_114 = new_member.index_of(&conv_id, member_114.get_client_id().await).await;
502                    let proposal_remove_114_2 = ExternalProposal::new_remove(
503                        leaf_of_114,
504                        group_id.clone(),
505                        (epoch.as_u64() + 1).into(),
506                        &cb.signature_key,
507                        sender_index,
508                    )
509                    .unwrap();
510
511                    // now our observer receives these messages out of order
512                    println!("observer executing first proposal");
513                    observer
514                        .context
515                        .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
516                        .await
517                        .unwrap();
518                    println!("observer executing second proposal");
519                    let result = observer
520                        .context
521                        .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
522                        .await;
523                    assert!(matches!(
524                        result.unwrap_err(),
525                        CryptoError::BufferedFutureMessage { message_epoch: 2 }
526                    ));
527                    println!("executing commit adding new user");
528                    observer
529                        .context
530                        .decrypt_message(&conv_id, &new_member_join_commit.to_bytes().unwrap())
531                        .await
532                        .unwrap();
533
534                    // now the new member receives the messages in order
535                    println!("new_member executing first proposal");
536                    assert!(matches!(
537                        new_member
538                            .context
539                            .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
540                            .await
541                            .unwrap_err(),
542                        CryptoError::StaleProposal,
543                    ));
544                    println!("new_member executing second proposal");
545                    new_member
546                        .context
547                        .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
548                        .await
549                        .unwrap();
550
551                    // now let's switch to the perspective of member 27
552                    // they have observed exactly one of the "remove 114" proposals,
553                    // plus a "remove 115" proposal. We can assume that they observe the 2nd
554                    // "remove 114" proposal because they advanced the epoch correctly when
555                    // the new member was added.
556                    let leaf_of_115 = observer.index_of(&conv_id, member_115.get_client_id().await).await;
557                    let epoch = observer.get_conversation_unchecked(&conv_id).await.group.epoch();
558                    let proposal_remove_115 =
559                        ExternalProposal::new_remove(leaf_of_115, group_id, epoch, &cb.signature_key, sender_index)
560                            .unwrap();
561
562                    member_27
563                        .context
564                        .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
565                        .await
566                        .unwrap();
567                    let result = member_27
568                        .context
569                        .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
570                        .await;
571                    assert!(matches!(
572                        result.unwrap_err(),
573                        CryptoError::BufferedFutureMessage { message_epoch: 2 }
574                    ));
575                    member_27
576                        .context
577                        .decrypt_message(&conv_id, &new_member_join_commit.to_bytes().unwrap())
578                        .await
579                        .unwrap();
580                    member_27
581                        .context
582                        .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
583                        .await
584                        .unwrap();
585
586                    let remove_two_members_commit = member_27
587                        .context
588                        .commit_pending_proposals(&conv_id)
589                        .await
590                        .unwrap() // result, for errors
591                        .unwrap() // option, in case no proposals
592                        .commit;
593
594                    // member 27 applies its own commit
595                    member_27
596                        .context
597                        .decrypt_message(&conv_id, remove_two_members_commit.to_bytes().unwrap())
598                        .await
599                        .unwrap();
600
601                    // In this case, note that observer receives the proposal before the commit.
602                    // This is the straightforward ordering and easy to deal with.
603                    observer
604                        .context
605                        .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
606                        .await
607                        .unwrap();
608                    observer
609                        .context
610                        .decrypt_message(&conv_id, &remove_two_members_commit.to_bytes().unwrap())
611                        .await
612                        .unwrap();
613
614                    // In this case, new_member receives the commit before the proposal. This means that
615                    // the commit has to be buffered until the proposal it references is received.
616                    let result = new_member
617                        .context
618                        .decrypt_message(&conv_id, &remove_two_members_commit.to_bytes().unwrap())
619                        .await;
620                    assert!(matches!(result.unwrap_err(), CryptoError::BufferedCommit));
621                    new_member
622                        .context
623                        .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
624                        .await
625                        .unwrap();
626
627                    // And communication is possible
628                    observer.try_talk_to(&conv_id, &new_member).await.unwrap();
629                    observer.try_talk_to(&conv_id, &member_27).await.unwrap();
630                    new_member.try_talk_to(&conv_id, &member_27).await.unwrap();
631                })
632            },
633        )
634        .await
635    }
636}