1use crate::context::CentralContext;
7use crate::obfuscate::Obfuscated;
8use crate::{
9 group_store::GroupStoreValue,
10 prelude::{
11 decrypt::MlsBufferedConversationDecryptMessage, Client, ConversationId, CoreCryptoCallbacks, CryptoError,
12 CryptoResult, MlsConversation, MlsError,
13 },
14};
15use core_crypto_keystore::{
16 connection::FetchFromDatabase,
17 entities::{EntityFindParams, MlsPendingMessage},
18};
19use log::{error, info, trace};
20use mls_crypto_provider::MlsCryptoProvider;
21use openmls::prelude::{MlsMessageIn, MlsMessageInBody};
22use tls_codec::Deserialize;
23
24impl CentralContext {
25 pub(crate) async fn handle_future_message(
26 &self,
27 id: &ConversationId,
28 message: impl AsRef<[u8]>,
29 ) -> CryptoResult<()> {
30 let keystore = self.keystore().await?;
31
32 let pending_msg = MlsPendingMessage {
33 foreign_id: id.clone(),
34 message: message.as_ref().to_vec(),
35 };
36 keystore.save::<MlsPendingMessage>(pending_msg).await?;
37 Ok(())
38 }
39
40 pub(crate) async fn restore_pending_messages(
41 &self,
42 conversation: &mut MlsConversation,
43 is_rejoin: bool,
44 ) -> CryptoResult<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
45 let parent_conversation = match &conversation.parent_id {
46 Some(id) => self.get_conversation(id).await.ok(),
47 _ => None,
48 };
49 let guard = self.callbacks().await?;
50 let callbacks = guard.as_ref().map(|boxed| boxed.as_ref());
51 let client = &self.mls_client().await?;
52 let mls_provider = self.mls_provider().await?;
53 conversation
54 .restore_pending_messages(
55 client,
56 &mls_provider,
57 callbacks,
58 parent_conversation.as_ref(),
59 is_rejoin,
60 )
61 .await
62 }
63}
64
65impl MlsConversation {
66 #[cfg_attr(target_family = "wasm", async_recursion::async_recursion(?Send))]
67 #[cfg_attr(not(target_family = "wasm"), async_recursion::async_recursion)]
68 pub(crate) async fn restore_pending_messages<'a>(
69 &'a mut self,
70 client: &'a Client,
71 backend: &'a MlsCryptoProvider,
72 callbacks: Option<&'a dyn CoreCryptoCallbacks>,
73 parent_conversation: Option<&'a GroupStoreValue<Self>>,
74 is_rejoin: bool,
75 ) -> CryptoResult<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
76 let result = async move {
78 let keystore = backend.keystore();
79 let group_id = self.id().as_slice();
80 if is_rejoin {
81 trace!("External commit trying to rejoin group");
86 if keystore.find::<MlsPendingMessage>(group_id).await?.is_some() {
87 keystore.remove::<MlsPendingMessage, _>(group_id).await?;
88 }
89 return Ok(None);
90 }
91
92 let mut pending_messages = keystore
93 .find_all::<MlsPendingMessage>(EntityFindParams::default())
94 .await?
95 .into_iter()
96 .filter(|pm| pm.foreign_id == group_id)
97 .try_fold(vec![], |mut acc, m| {
98 let msg = MlsMessageIn::tls_deserialize(&mut m.message.as_slice()).map_err(MlsError::from)?;
99 let ct = match msg.body_as_ref() {
100 MlsMessageInBody::PublicMessage(m) => Ok(m.content_type()),
101 MlsMessageInBody::PrivateMessage(m) => Ok(m.content_type()),
102 _ => Err(CryptoError::ConsumerError),
103 }?;
104 acc.push((ct as u8, msg));
105 CryptoResult::Ok(acc)
106 })?;
107
108 pending_messages.sort_by(|(a, _), (b, _)| a.cmp(b));
111
112 info!(group_id = Obfuscated::from(&self.id); "Attempting to restore {} buffered messages", pending_messages.len());
113
114 let mut decrypted_messages = Vec::with_capacity(pending_messages.len());
115 for (_, m) in pending_messages {
116 let parent_conversation = match &self.parent_id {
117 Some(_) => Some(parent_conversation.ok_or(CryptoError::ParentGroupNotFound)?),
118 _ => None,
119 };
120 let restore_pending = false; let decrypted = self
122 .decrypt_message(m, parent_conversation, client, backend, callbacks, restore_pending)
123 .await?;
124 decrypted_messages.push(decrypted.into());
125 }
126
127 let decrypted_messages = (!decrypted_messages.is_empty()).then_some(decrypted_messages);
128
129 Ok(decrypted_messages)
130 }
131 .await;
132 match result {
133 Ok(r) => Ok(r),
134 Err(e) => {
135 error!(error:% = e; "Error restoring pending messages");
136 Err(e)
137 }
138 }
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use crate::{test_utils::*, CryptoError};
145 use wasm_bindgen_test::*;
146
147 wasm_bindgen_test_configure!(run_in_browser);
148
149 #[apply(all_cred_cipher)]
150 #[wasm_bindgen_test]
151 async fn should_buffer_and_reapply_messages_after_commit_merged_for_sender(case: TestCase) {
152 run_test_with_client_ids(
153 case.clone(),
154 ["alice", "bob", "charlie", "debbie"],
155 move |[alice_central, bob_central, charlie_central, debbie_central]| {
156 Box::pin(async move {
157 let id = conversation_id();
158 alice_central
159 .context
160 .new_conversation(&id, case.credential_type, case.cfg.clone())
161 .await
162 .unwrap();
163 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
164
165 let unmerged_commit = bob_central.context.update_keying_material(&id).await.unwrap();
167
168 alice_central
170 .context
171 .decrypt_message(&id, unmerged_commit.commit.to_bytes().unwrap())
172 .await
173 .unwrap();
174
175 let epoch = alice_central.context.conversation_epoch(&id).await.unwrap();
177 let external_proposal = debbie_central
178 .context
179 .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
180 .await
181 .unwrap();
182
183 let app_msg = alice_central
185 .context
186 .encrypt_message(&id, b"Hello Bob !")
187 .await
188 .unwrap();
189 let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
190 alice_central
191 .context
192 .decrypt_message(&id, external_proposal.to_bytes().unwrap())
193 .await
194 .unwrap();
195 let charlie = charlie_central.rand_key_package(&case).await;
196 let commit = alice_central
197 .context
198 .add_members_to_conversation(&id, vec![charlie])
199 .await
200 .unwrap();
201 alice_central.context.commit_accepted(&id).await.unwrap();
202 charlie_central
203 .context
204 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
205 .await
206 .unwrap();
207 debbie_central
208 .context
209 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
210 .await
211 .unwrap();
212
213 let messages = vec![commit.commit, external_proposal, proposal]
217 .into_iter()
218 .map(|m| m.to_bytes().unwrap());
219 for m in messages {
220 let decrypt = bob_central.context.decrypt_message(&id, m).await;
221 assert!(matches!(
222 decrypt.unwrap_err(),
223 CryptoError::BufferedFutureMessage { .. }
224 ));
225 }
226 let decrypt = bob_central.context.decrypt_message(&id, app_msg).await;
227 assert!(matches!(
228 decrypt.unwrap_err(),
229 CryptoError::BufferedFutureMessage { .. }
230 ));
231
232 assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
234
235 let Some(restored_messages) = bob_central.context.commit_accepted(&id).await.unwrap() else {
237 panic!("Alice's messages should have been restored at this point");
238 };
239 for (i, m) in restored_messages.into_iter().enumerate() {
240 match i {
241 0 => {
242 assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !");
244 assert!(!m.has_epoch_changed);
245 }
246 1 | 2 => {
247 assert!(m.app_msg.is_none());
249 assert!(!m.has_epoch_changed);
250 }
251 3 => {
252 assert!(m.app_msg.is_none());
254 assert!(m.has_epoch_changed);
255 }
256 _ => unreachable!(),
257 }
258 }
259 assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
261 assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
263 assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
265
266 assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
268 })
269 },
270 )
271 .await
272 }
273
274 #[apply(all_cred_cipher)]
275 #[wasm_bindgen_test]
276 async fn should_buffer_and_reapply_messages_after_commit_merged_for_receivers(case: TestCase) {
277 if !case.is_pure_ciphertext() {
278 run_test_with_client_ids(
279 case.clone(),
280 ["alice", "bob", "charlie", "debbie"],
281 move |[alice_central, bob_central, charlie_central, debbie_central]| {
282 Box::pin(async move {
283 let id = conversation_id();
284 alice_central
285 .context
286 .new_conversation(&id, case.credential_type, case.cfg.clone())
287 .await
288 .unwrap();
289
290 let gi = alice_central.get_group_info(&id).await;
292 let ext_commit = bob_central
293 .context
294 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
295 .await
296 .unwrap();
297 bob_central
298 .context
299 .merge_pending_group_from_external_commit(&id)
300 .await
301 .unwrap();
302
303 let epoch = bob_central.context.conversation_epoch(&id).await.unwrap();
307 let external_proposal = charlie_central
308 .context
309 .new_external_add_proposal(
310 id.clone(),
311 epoch.into(),
312 case.ciphersuite(),
313 case.credential_type,
314 )
315 .await
316 .unwrap();
317 let app_msg = bob_central
318 .context
319 .encrypt_message(&id, b"Hello Alice !")
320 .await
321 .unwrap();
322 let proposal = bob_central.context.new_update_proposal(&id).await.unwrap().proposal;
323 bob_central
324 .context
325 .decrypt_message(&id, external_proposal.to_bytes().unwrap())
326 .await
327 .unwrap();
328 let debbie = debbie_central.rand_key_package(&case).await;
329 let commit = bob_central
330 .context
331 .add_members_to_conversation(&id, vec![debbie])
332 .await
333 .unwrap();
334 bob_central.context.commit_accepted(&id).await.unwrap();
335 charlie_central
336 .context
337 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
338 .await
339 .unwrap();
340 debbie_central
341 .context
342 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
343 .await
344 .unwrap();
345
346 let messages = vec![commit.commit, external_proposal, proposal]
350 .into_iter()
351 .map(|m| m.to_bytes().unwrap());
352 for m in messages {
353 let decrypt = alice_central.context.decrypt_message(&id, m).await;
354 assert!(matches!(
355 decrypt.unwrap_err(),
356 CryptoError::BufferedFutureMessage { .. }
357 ));
358 }
359 let decrypt = alice_central.context.decrypt_message(&id, app_msg).await;
360 assert!(matches!(
361 decrypt.unwrap_err(),
362 CryptoError::BufferedFutureMessage { .. }
363 ));
364
365 assert_eq!(alice_central.context.count_entities().await.pending_messages, 4);
367
368 let original_commit = ext_commit.commit.to_bytes().unwrap();
370
371 let Some(restored_messages) = alice_central
372 .context
373 .decrypt_message(&id, original_commit)
374 .await
375 .unwrap()
376 .buffered_messages
377 else {
378 panic!("Bob's messages should have been restored at this point");
379 };
380 for (i, m) in restored_messages.into_iter().enumerate() {
381 match i {
382 0 => {
383 assert_eq!(&m.app_msg.unwrap(), b"Hello Alice !");
385 assert!(!m.has_epoch_changed);
386 }
387 1 | 2 => {
388 assert!(m.app_msg.is_none());
390 assert!(!m.has_epoch_changed);
391 }
392 3 => {
393 assert!(m.app_msg.is_none());
395 assert!(m.has_epoch_changed);
396 }
397 _ => unreachable!(),
398 }
399 }
400 assert!(alice_central.try_talk_to(&id, &bob_central).await.is_ok());
402 assert!(alice_central.try_talk_to(&id, &charlie_central).await.is_ok());
404 assert!(alice_central.try_talk_to(&id, &debbie_central).await.is_ok());
406
407 assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
409 })
410 },
411 )
412 .await
413 }
414 }
415
416 #[apply(all_cred_cipher)]
420 async fn wpb_15810(case: TestCase) {
421 use openmls::{
422 group::GroupId,
423 prelude::{ExternalProposal, SenderExtensionIndex},
424 };
425
426 if case.is_pure_ciphertext() {
427 return;
430 }
431 run_test_with_client_ids(
432 case.clone(),
433 ["external_0", "new_member", "member_27", "observer", "114", "115"],
434 move |[external_0, new_member, member_27, observer, member_114, member_115]| {
435 Box::pin(async move {
436 let conv_id = conversation_id();
438
439 let signature_key = external_0.client_signature_key(&case).await.as_slice().to_vec();
441 let mut config = case.cfg.clone();
442 observer
443 .context
444 .set_raw_external_senders(&mut config, vec![signature_key])
445 .await
446 .unwrap();
447
448 observer
450 .context
451 .new_conversation(&conv_id, case.credential_type, config)
452 .await
453 .unwrap();
454
455 observer
457 .invite_all(&case, &conv_id, [&member_114, &member_115, &member_27])
458 .await
459 .unwrap();
460
461 let leaf_of_114 = observer.index_of(&conv_id, member_114.get_client_id().await).await;
467 let sender_index = SenderExtensionIndex::new(0);
468 let sc = case.signature_scheme();
469 let ct = case.credential_type;
470 let cb = external_0.find_most_recent_credential_bundle(sc, ct).await.unwrap();
471 let group_id = GroupId::from_slice(&conv_id[..]);
472 let epoch = observer.get_conversation_unchecked(&conv_id).await.group.epoch();
473 let proposal_remove_114_1 = ExternalProposal::new_remove(
474 leaf_of_114,
475 group_id.clone(),
476 epoch,
477 &cb.signature_key,
478 sender_index,
479 )
480 .unwrap();
481
482 let new_member_join_commit = new_member
484 .context
485 .join_by_external_commit(
486 observer.get_group_info(&conv_id).await,
487 case.custom_cfg(),
488 case.credential_type,
489 )
490 .await
491 .unwrap()
492 .commit;
493
494 new_member
495 .context
496 .merge_pending_group_from_external_commit(&conv_id)
497 .await
498 .unwrap();
499
500 let leaf_of_114 = new_member.index_of(&conv_id, member_114.get_client_id().await).await;
502 let proposal_remove_114_2 = ExternalProposal::new_remove(
503 leaf_of_114,
504 group_id.clone(),
505 (epoch.as_u64() + 1).into(),
506 &cb.signature_key,
507 sender_index,
508 )
509 .unwrap();
510
511 println!("observer executing first proposal");
513 observer
514 .context
515 .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
516 .await
517 .unwrap();
518 println!("observer executing second proposal");
519 let result = observer
520 .context
521 .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
522 .await;
523 assert!(matches!(
524 result.unwrap_err(),
525 CryptoError::BufferedFutureMessage { message_epoch: 2 }
526 ));
527 println!("executing commit adding new user");
528 observer
529 .context
530 .decrypt_message(&conv_id, &new_member_join_commit.to_bytes().unwrap())
531 .await
532 .unwrap();
533
534 println!("new_member executing first proposal");
536 assert!(matches!(
537 new_member
538 .context
539 .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
540 .await
541 .unwrap_err(),
542 CryptoError::StaleProposal,
543 ));
544 println!("new_member executing second proposal");
545 new_member
546 .context
547 .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
548 .await
549 .unwrap();
550
551 let leaf_of_115 = observer.index_of(&conv_id, member_115.get_client_id().await).await;
557 let epoch = observer.get_conversation_unchecked(&conv_id).await.group.epoch();
558 let proposal_remove_115 =
559 ExternalProposal::new_remove(leaf_of_115, group_id, epoch, &cb.signature_key, sender_index)
560 .unwrap();
561
562 member_27
563 .context
564 .decrypt_message(&conv_id, &proposal_remove_114_1.to_bytes().unwrap())
565 .await
566 .unwrap();
567 let result = member_27
568 .context
569 .decrypt_message(&conv_id, &proposal_remove_114_2.to_bytes().unwrap())
570 .await;
571 assert!(matches!(
572 result.unwrap_err(),
573 CryptoError::BufferedFutureMessage { message_epoch: 2 }
574 ));
575 member_27
576 .context
577 .decrypt_message(&conv_id, &new_member_join_commit.to_bytes().unwrap())
578 .await
579 .unwrap();
580 member_27
581 .context
582 .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
583 .await
584 .unwrap();
585
586 let remove_two_members_commit = member_27
587 .context
588 .commit_pending_proposals(&conv_id)
589 .await
590 .unwrap() .unwrap() .commit;
593
594 member_27
596 .context
597 .decrypt_message(&conv_id, remove_two_members_commit.to_bytes().unwrap())
598 .await
599 .unwrap();
600
601 observer
604 .context
605 .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
606 .await
607 .unwrap();
608 observer
609 .context
610 .decrypt_message(&conv_id, &remove_two_members_commit.to_bytes().unwrap())
611 .await
612 .unwrap();
613
614 let result = new_member
617 .context
618 .decrypt_message(&conv_id, &remove_two_members_commit.to_bytes().unwrap())
619 .await;
620 assert!(matches!(result.unwrap_err(), CryptoError::BufferedCommit));
621 new_member
622 .context
623 .decrypt_message(&conv_id, &proposal_remove_115.to_bytes().unwrap())
624 .await
625 .unwrap();
626
627 observer.try_talk_to(&conv_id, &new_member).await.unwrap();
629 observer.try_talk_to(&conv_id, &member_27).await.unwrap();
630 new_member.try_talk_to(&conv_id, &member_27).await.unwrap();
631 })
632 },
633 )
634 .await
635 }
636}