1mod buffer_commit;
12pub(crate) mod buffer_messages;
13
14use super::{ConversationGuard, Result};
15use crate::e2e_identity::NewCrlDistributionPoints;
16use crate::mls::conversation::renew::Renew;
17use crate::mls::conversation::{Conversation, ConversationWithMls, Error};
18use crate::mls::credential::crl::{
19 extract_crl_uris_from_proposals, extract_crl_uris_from_update_path, get_new_crl_distribution_points,
20};
21use crate::mls::credential::ext::CredentialExt as _;
22use crate::obfuscate::Obfuscated;
23use crate::prelude::{ClientId, E2eiConversationState, Session};
24use crate::prelude::{MlsProposalBundle, WireIdentity};
25use crate::{MlsError, RecursiveError};
26use log::{debug, info};
27use openmls::framing::errors::{MessageDecryptionError, SecretTreeError};
28use openmls::framing::{MlsMessageIn, MlsMessageInBody, ProcessedMessage, ProtocolMessage};
29use openmls::prelude::{
30 ContentType, CredentialType, ProcessMessageError, ProcessedMessageContent, Proposal, StageCommitError,
31 StagedCommit, ValidationError,
32};
33use openmls_traits::OpenMlsCryptoProvider as _;
34use tls_codec::Deserialize as _;
35
36#[derive(Debug)]
39pub struct MlsConversationDecryptMessage {
40 pub app_msg: Option<Vec<u8>>,
42 pub proposals: Vec<MlsProposalBundle>,
47 pub is_active: bool,
49 pub delay: Option<u64>,
51 pub sender_client_id: Option<ClientId>,
53 #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
55 pub has_epoch_changed: bool,
56 pub identity: WireIdentity,
59 pub buffered_messages: Option<Vec<MlsBufferedConversationDecryptMessage>>,
63 pub crl_new_distribution_points: NewCrlDistributionPoints,
65}
66
67#[derive(Debug)]
69pub struct MlsBufferedConversationDecryptMessage {
70 pub app_msg: Option<Vec<u8>>,
72 pub proposals: Vec<MlsProposalBundle>,
74 pub is_active: bool,
76 pub delay: Option<u64>,
78 pub sender_client_id: Option<ClientId>,
80 #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
82 pub has_epoch_changed: bool,
83 pub identity: WireIdentity,
85 pub crl_new_distribution_points: NewCrlDistributionPoints,
87}
88
89impl From<MlsConversationDecryptMessage> for MlsBufferedConversationDecryptMessage {
90 fn from(from: MlsConversationDecryptMessage) -> Self {
91 #[expect(deprecated)]
93 Self {
94 app_msg: from.app_msg,
95 proposals: from.proposals,
96 is_active: from.is_active,
97 delay: from.delay,
98 sender_client_id: from.sender_client_id,
99 has_epoch_changed: from.has_epoch_changed,
100 identity: from.identity,
101 crl_new_distribution_points: from.crl_new_distribution_points,
102 }
103 }
104}
105
106struct ParsedMessage {
107 is_duplicate: bool,
108 protocol_message: ProtocolMessage,
109 content_type: ContentType,
110}
111
112#[derive(Clone, Copy, PartialEq, Eq)]
113enum RecursionPolicy {
114 AsNecessary,
115 None,
116}
117
118impl ConversationGuard {
119 pub async fn decrypt_message(&mut self, message: impl AsRef<[u8]>) -> Result<MlsConversationDecryptMessage> {
131 let mls_message_in =
132 MlsMessageIn::tls_deserialize(&mut message.as_ref()).map_err(Error::tls_deserialize("mls message in"))?;
133
134 let decrypt_message_result = self
135 .decrypt_message_inner(mls_message_in, RecursionPolicy::AsNecessary)
136 .await;
137
138 if let Err(Error::BufferedFutureMessage { message_epoch }) = decrypt_message_result {
143 self.buffer_future_message(message.as_ref()).await?;
144 let conversation = self.conversation().await;
145 info!(group_id = Obfuscated::from(conversation.id()); "Buffered future message from epoch {message_epoch}");
146 }
147 if let Err(Error::BufferedCommit) = decrypt_message_result {
148 self.buffer_commit(message).await?;
149 }
150
151 let decrypt_message = decrypt_message_result?;
152
153 if !decrypt_message.is_active {
154 self.wipe().await?;
155 }
156 Ok(decrypt_message)
157 }
158
159 async fn decrypt_message_inner(
161 &mut self,
162 message: MlsMessageIn,
163 recursion_policy: RecursionPolicy,
164 ) -> Result<MlsConversationDecryptMessage> {
165 let client = &self.session().await?;
166 let backend = &self.crypto_provider().await?;
167 let parsed_message = self.parse_message(message.clone()).await?;
168
169 let message_result = self.process_message(parsed_message).await;
170
171 if let Err(Error::Mls(crate::MlsError {
173 source:
174 crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::OwnCommit)),
175 ..
176 })) = message_result
177 {
178 let mut conversation = self.conversation_mut().await;
179 let ct = conversation.extract_confirmation_tag_from_own_commit(&message)?;
180 let mut decrypted_message = conversation.handle_own_commit(client, backend, ct).await?;
181 debug_assert!(
182 decrypted_message.buffered_messages.is_none(),
183 "decrypted message should be constructed with empty buffer"
184 );
185 if recursion_policy == RecursionPolicy::AsNecessary {
186 drop(conversation);
187 decrypted_message.buffered_messages = self.restore_and_clear_pending_messages().await?;
188 }
189
190 return Ok(decrypted_message);
191 }
192
193 if let Err(Error::Mls(crate::MlsError {
197 source:
198 crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::MissingProposal)),
199 ..
200 })) = message_result
201 {
202 return Err(Error::BufferedCommit);
203 }
204
205 let message = message_result?;
206
207 let credential = message.credential();
208 let epoch = message.epoch();
209
210 let identity = credential
211 .extract_identity(
212 self.ciphersuite().await,
213 backend.authentication_service().borrow().await.as_ref(),
214 )
215 .map_err(RecursiveError::mls_credential("extracting identity"))?;
216
217 let sender_client_id: ClientId = credential.credential.identity().into();
218
219 let decrypted = match message.into_content() {
220 ProcessedMessageContent::ApplicationMessage(app_msg) => {
221 let conversation = self.conversation().await;
222 debug!(
223 group_id = Obfuscated::from(&conversation.id),
224 epoch = epoch.as_u64(),
225 sender_client_id = Obfuscated::from(&sender_client_id);
226 "Application message"
227 );
228
229 #[expect(deprecated)]
231 MlsConversationDecryptMessage {
232 app_msg: Some(app_msg.into_bytes()),
233 proposals: vec![],
234 is_active: true,
235 delay: None,
236 sender_client_id: Some(sender_client_id),
237 has_epoch_changed: false,
238 identity,
239 buffered_messages: None,
240 crl_new_distribution_points: None.into(),
241 }
242 }
243 ProcessedMessageContent::ProposalMessage(proposal) => {
244 let mut conversation = self.conversation_mut().await;
245 let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
246 .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
247 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
248 .await
249 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
250
251 info!(
252 group_id = Obfuscated::from(&conversation.id),
253 sender = Obfuscated::from(proposal.sender()),
254 proposals = Obfuscated::from(&proposal.proposal);
255 "Received proposal"
256 );
257
258 conversation.group.store_pending_proposal(*proposal);
259 drop(conversation);
260 if let Some(commit) =
261 self.retrieve_buffered_commit()
262 .await
263 .map_err(RecursiveError::mls_conversation(
264 "retrieving buffered commit while handling proposal",
265 ))?
266 {
267 let process_result = self.try_process_buffered_commit(commit, recursion_policy).await;
268
269 if process_result.is_ok() {
270 self.clear_buffered_commit()
271 .await
272 .map_err(RecursiveError::mls_conversation(
273 "clearing buffered commit after successful application",
274 ))?;
275 }
276 if !matches!(process_result, Err(Error::BufferedCommit)) {
281 return process_result
285 .map_err(RecursiveError::mls_conversation("processing buffered commit"))
286 .map_err(Into::into);
287 }
288 }
289
290 let conversation = self.conversation().await;
291 let delay = conversation.compute_next_commit_delay();
292
293 #[expect(deprecated)]
295 MlsConversationDecryptMessage {
296 app_msg: None,
297 proposals: vec![],
298 is_active: true,
299 delay,
300 sender_client_id: None,
301 has_epoch_changed: false,
302 identity,
303 buffered_messages: None,
304 crl_new_distribution_points,
305 }
306 }
307 ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
308 self.validate_commit(&staged_commit).await?;
309 let mut conversation = self.conversation_mut().await;
310
311 let pending_proposals = conversation.self_pending_proposals().cloned().collect::<Vec<_>>();
312
313 let proposal_refs: Vec<Proposal> = pending_proposals
314 .iter()
315 .map(|p| p.proposal().clone())
316 .chain(
317 staged_commit
318 .add_proposals()
319 .map(|p| Proposal::Add(p.add_proposal().clone())),
320 )
321 .chain(
322 staged_commit
323 .update_proposals()
324 .map(|p| Proposal::Update(p.update_proposal().clone())),
325 )
326 .collect();
327
328 let mut crl_dps = extract_crl_uris_from_proposals(&proposal_refs)
330 .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
331 crl_dps.extend(
332 extract_crl_uris_from_update_path(&staged_commit)
333 .map_err(RecursiveError::mls_credential("extracting crl urls from update path"))?,
334 );
335
336 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
337 .await
338 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
339
340 let pending_commit = conversation.group.pending_commit().cloned();
342
343 conversation
344 .group
345 .merge_staged_commit(backend, *staged_commit.clone())
346 .await
347 .map_err(MlsError::wrap("merge staged commit"))?;
348
349 let (proposals_to_renew, needs_update) = Renew::renew(
350 &conversation.group.own_leaf_index(),
351 pending_proposals.into_iter(),
352 pending_commit.as_ref(),
353 staged_commit.as_ref(),
354 );
355 let proposals = conversation
356 .renew_proposals_for_current_epoch(client, backend, proposals_to_renew.into_iter(), needs_update)
357 .await?;
358
359 let mut buffered_messages = None;
361 drop(conversation);
363 if recursion_policy == RecursionPolicy::AsNecessary {
364 buffered_messages = self.restore_and_clear_pending_messages().await?;
365 }
366
367 let conversation = self.conversation().await;
368 let epoch = staged_commit.staged_context().epoch().as_u64();
369 info!(
370 group_id = Obfuscated::from(&conversation.id),
371 epoch,
372 proposals:? = staged_commit.queued_proposals().map(Obfuscated::from).collect::<Vec<_>>();
373 "Epoch advanced"
374 );
375 client.notify_epoch_changed(conversation.id.clone(), epoch).await;
376
377 #[expect(deprecated)]
379 MlsConversationDecryptMessage {
380 app_msg: None,
381 proposals,
382 is_active: conversation.group.is_active(),
383 delay: conversation.compute_next_commit_delay(),
384 sender_client_id: None,
385 has_epoch_changed: true,
386 identity,
387 buffered_messages,
388 crl_new_distribution_points,
389 }
390 }
391 ProcessedMessageContent::ExternalJoinProposalMessage(proposal) => {
392 let mut conversation = self.conversation_mut().await;
393 info!(
394 group_id = Obfuscated::from(&conversation.id),
395 sender = Obfuscated::from(proposal.sender());
396 "Received external join proposal"
397 );
398
399 let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
400 .map_err(RecursiveError::mls_credential("extracting crl uris from proposals"))?;
401 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
402 .await
403 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
404 conversation.group.store_pending_proposal(*proposal);
405
406 #[expect(deprecated)]
408 MlsConversationDecryptMessage {
409 app_msg: None,
410 proposals: vec![],
411 is_active: true,
412 delay: conversation.compute_next_commit_delay(),
413 sender_client_id: None,
414 has_epoch_changed: false,
415 identity,
416 buffered_messages: None,
417 crl_new_distribution_points,
418 }
419 }
420 };
421
422 let mut conversation = self.conversation_mut().await;
423
424 conversation
425 .persist_group_when_changed(&backend.keystore(), false)
426 .await?;
427
428 Ok(decrypted)
429 }
430
431 async fn parse_message(&self, msg_in: MlsMessageIn) -> Result<ParsedMessage> {
432 let mut is_duplicate = false;
433 let conversation = self.conversation().await;
434 let backend = self.crypto_provider().await?;
435 let (protocol_message, content_type) = match msg_in.extract() {
436 MlsMessageInBody::PublicMessage(m) => {
437 is_duplicate = conversation.is_duplicate_message(&backend, &m)?;
438 let ct = m.content_type();
439 (ProtocolMessage::PublicMessage(m), ct)
440 }
441 MlsMessageInBody::PrivateMessage(m) => {
442 let ct = m.content_type();
443 (ProtocolMessage::PrivateMessage(m), ct)
444 }
445 _ => {
446 return Err(
447 MlsError::wrap("parsing inbound message")(ProcessMessageError::IncompatibleWireFormat).into(),
448 );
449 }
450 };
451 Ok(ParsedMessage {
452 is_duplicate,
453 protocol_message,
454 content_type,
455 })
456 }
457
458 async fn process_message(
459 &mut self,
460 ParsedMessage {
461 is_duplicate,
462 protocol_message,
463 content_type,
464 }: ParsedMessage,
465 ) -> Result<ProcessedMessage> {
466 let msg_epoch = protocol_message.epoch().as_u64();
467 let backend = self.crypto_provider().await?;
468 let mut conversation = self.conversation_mut().await;
469 let group_epoch = conversation.group.epoch().as_u64();
470 let processed_msg = conversation
471 .group
472 .process_message(&backend, protocol_message)
473 .await
474 .map_err(|e| match e {
475 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
476 MessageDecryptionError::GenerationOutOfBound,
477 )) => Error::DuplicateMessage,
478 ProcessMessageError::ValidationError(ValidationError::WrongEpoch) => {
479 if is_duplicate {
480 Error::DuplicateMessage
481 } else if msg_epoch == group_epoch + 1 {
482 Error::BufferedFutureMessage {
491 message_epoch: msg_epoch,
492 }
493 } else if msg_epoch < group_epoch {
494 match content_type {
495 ContentType::Application => Error::StaleMessage,
496 ContentType::Commit => Error::StaleCommit,
497 ContentType::Proposal => Error::StaleProposal,
498 }
499 } else {
500 Error::UnbufferedFarFutureMessage
501 }
502 }
503 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
504 MessageDecryptionError::AeadError,
505 )) => Error::DecryptionError,
506 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
507 MessageDecryptionError::SecretTreeError(SecretTreeError::TooDistantInThePast),
508 )) => Error::MessageEpochTooOld,
509 _ => MlsError::wrap("processing message")(e).into(),
510 })?;
511 if is_duplicate {
512 return Err(Error::DuplicateMessage);
513 }
514 Ok(processed_msg)
515 }
516
517 async fn validate_commit(&self, commit: &StagedCommit) -> Result<()> {
518 let backend = self.crypto_provider().await?;
519 if backend.authentication_service().is_env_setup().await {
520 let credentials: Vec<_> = commit
521 .add_proposals()
522 .filter_map(|add_proposal| {
523 let credential = add_proposal.add_proposal().key_package().leaf_node().credential();
524
525 matches!(credential.credential_type(), CredentialType::X509).then(|| credential.clone())
526 })
527 .collect();
528 let state = Session::compute_conversation_state(
529 self.ciphersuite().await,
530 credentials.iter(),
531 crate::prelude::MlsCredentialType::X509,
532 backend.authentication_service().borrow().await.as_ref(),
533 )
534 .await;
535 if state != E2eiConversationState::Verified {
536 }
539 }
540 Ok(())
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use wasm_bindgen_test::*;
547
548 use crate::{
549 mls::conversation::{config::MAX_PAST_EPOCHS, error::Error},
550 test_utils::*,
551 };
552
553 use super::*;
554
555 wasm_bindgen_test_configure!(run_in_browser);
556
557 mod is_active {
558 use super::*;
559
560 #[apply(all_cred_cipher)]
561 #[wasm_bindgen_test]
562 pub async fn decrypting_a_regular_commit_should_leave_conversation_active(case: TestContext) {
563 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
564 Box::pin(async move {
565 let id = conversation_id();
566 alice_central
567 .transaction
568 .new_conversation(&id, case.credential_type, case.cfg.clone())
569 .await
570 .unwrap();
571 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
572
573 bob_central
574 .transaction
575 .conversation(&id)
576 .await
577 .unwrap()
578 .update_key_material()
579 .await
580 .unwrap();
581 let commit = bob_central.mls_transport.latest_commit().await;
582 let MlsConversationDecryptMessage { is_active, .. } = alice_central
583 .transaction
584 .conversation(&id)
585 .await
586 .unwrap()
587 .decrypt_message(commit.to_bytes().unwrap())
588 .await
589 .unwrap();
590 assert!(is_active)
591 })
592 })
593 .await
594 }
595
596 #[apply(all_cred_cipher)]
597 #[wasm_bindgen_test]
598 pub async fn decrypting_a_commit_removing_self_should_set_conversation_inactive(case: TestContext) {
599 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
600 Box::pin(async move {
601 let id = conversation_id();
602 alice_central
603 .transaction
604 .new_conversation(&id, case.credential_type, case.cfg.clone())
605 .await
606 .unwrap();
607 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
608
609 bob_central
610 .transaction
611 .conversation(&id)
612 .await
613 .unwrap()
614 .remove_members(&[alice_central.get_client_id().await])
615 .await
616 .unwrap();
617 let commit = bob_central.mls_transport.latest_commit().await;
618 let MlsConversationDecryptMessage { is_active, .. } = alice_central
619 .transaction
620 .conversation(&id)
621 .await
622 .unwrap()
623 .decrypt_message(commit.to_bytes().unwrap())
624 .await
625 .unwrap();
626 assert!(!is_active)
627 })
628 })
629 .await
630 }
631 }
632
633 mod commit {
634 use super::*;
635
636 #[apply(all_cred_cipher)]
637 #[wasm_bindgen_test]
638 pub async fn decrypting_a_commit_should_succeed(case: TestContext) {
639 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
640 Box::pin(async move {
641 let id = conversation_id();
642 alice_central
643 .transaction
644 .new_conversation(&id, case.credential_type, case.cfg.clone())
645 .await
646 .unwrap();
647 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
648
649 let bob_observer = TestEpochObserver::new();
650 bob_central
651 .session()
652 .await
653 .register_epoch_observer(bob_observer.clone())
654 .await
655 .unwrap();
656
657 let epoch_before = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
658
659 alice_central
660 .transaction
661 .conversation(&id)
662 .await
663 .unwrap()
664 .update_key_material()
665 .await
666 .unwrap();
667 let commit = alice_central.mls_transport.latest_commit().await;
668
669 let decrypted = bob_central
670 .transaction
671 .conversation(&id)
672 .await
673 .unwrap()
674 .decrypt_message(commit.to_bytes().unwrap())
675 .await
676 .unwrap();
677 let epoch_after = bob_central.transaction.conversation(&id).await.unwrap().epoch().await;
678 assert_eq!(epoch_after, epoch_before + 1);
679 assert!(bob_observer.has_changed().await);
680 assert!(decrypted.delay.is_none());
681 assert!(decrypted.app_msg.is_none());
682
683 alice_central.verify_sender_identity(&case, &decrypted).await;
684 })
685 })
686 .await
687 }
688
689 #[apply(all_cred_cipher)]
690 #[wasm_bindgen_test]
691 pub async fn decrypting_a_commit_should_not_renew_proposals_in_valid_commit(case: TestContext) {
692 run_test_with_client_ids(
693 case.clone(),
694 ["alice", "bob", "charlie"],
695 move |[mut alice_central, bob_central, charlie_central]| {
696 Box::pin(async move {
697 let id = conversation_id();
698 alice_central
699 .transaction
700 .new_conversation(&id, case.credential_type, case.cfg.clone())
701 .await
702 .unwrap();
703 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
704
705 let alice_observer = TestEpochObserver::new();
706 alice_central
707 .session()
708 .await
709 .register_epoch_observer(alice_observer.clone())
710 .await
711 .unwrap();
712
713 let charlie_kp = charlie_central.get_one_key_package(&case).await;
718
719 let add_charlie_proposal =
720 bob_central.transaction.new_add_proposal(&id, charlie_kp).await.unwrap();
721 alice_central
722 .transaction
723 .conversation(&id)
724 .await
725 .unwrap()
726 .decrypt_message(add_charlie_proposal.proposal.to_bytes().unwrap())
727 .await
728 .unwrap();
729
730 alice_observer.reset().await;
731
732 bob_central
733 .transaction
734 .conversation(&id)
735 .await
736 .unwrap()
737 .update_key_material()
738 .await
739 .unwrap();
740 let commit = bob_central.mls_transport.latest_commit().await;
741 let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
742 .transaction
743 .conversation(&id)
744 .await
745 .unwrap()
746 .decrypt_message(commit.to_bytes().unwrap())
747 .await
748 .unwrap();
749 assert!(proposals.is_empty());
750 assert!(delay.is_none());
751 assert!(alice_central.pending_proposals(&id).await.is_empty());
752 assert!(alice_observer.has_changed().await);
753 })
754 },
755 )
756 .await
757 }
758
759 #[apply(all_cred_cipher)]
761 #[wasm_bindgen_test]
762 pub async fn decrypting_a_commit_should_renew_orphan_pending_proposals(case: TestContext) {
763 run_test_with_client_ids(
764 case.clone(),
765 ["alice", "bob", "charlie"],
766 move |[mut alice_central, mut bob_central, charlie_central]| {
767 Box::pin(async move {
768 let id = conversation_id();
769 alice_central
770 .transaction
771 .new_conversation(&id, case.credential_type, case.cfg.clone())
772 .await
773 .unwrap();
774 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
775
776 let alice_observer = TestEpochObserver::new();
777 alice_central
778 .session()
779 .await
780 .register_epoch_observer(alice_observer.clone())
781 .await
782 .unwrap();
783
784 bob_central
788 .transaction
789 .conversation(&id)
790 .await
791 .unwrap()
792 .update_key_material()
793 .await
794 .unwrap();
795 let bob_commit = bob_central.mls_transport.latest_commit().await;
796 let commit_epoch = bob_commit.epoch().unwrap();
797
798 let charlie_kp = charlie_central.get_one_key_package(&case).await;
800 alice_central
801 .transaction
802 .new_add_proposal(&id, charlie_kp)
803 .await
804 .unwrap();
805 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
806
807 let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
809 .transaction
810 .conversation(&id)
811 .await
812 .unwrap()
813 .decrypt_message(bob_commit.to_bytes().unwrap())
814 .await
815 .unwrap();
816 assert!(
818 !alice_central
819 .get_conversation_unchecked(&id)
820 .await
821 .members()
822 .contains_key(b"charlie".as_slice())
823 );
824 assert!(delay.is_some());
826
827 assert!(!proposals.is_empty());
829 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
830 let renewed_proposal = proposals.first().unwrap();
831 assert_eq!(
832 commit_epoch.as_u64() + 1,
833 renewed_proposal.proposal.epoch().unwrap().as_u64()
834 );
835
836 alice_observer.reset().await;
839
840 bob_central
842 .transaction
843 .conversation(&id)
844 .await
845 .unwrap()
846 .decrypt_message(renewed_proposal.proposal.to_bytes().unwrap())
847 .await
848 .unwrap();
849 assert_eq!(bob_central.pending_proposals(&id).await.len(), 1);
850 bob_central
851 .transaction
852 .conversation(&id)
853 .await
854 .unwrap()
855 .commit_pending_proposals()
856 .await
857 .unwrap();
858 let commit = bob_central.mls_transport.latest_commit().await;
859 let _decrypted = alice_central
860 .transaction
861 .conversation(&id)
862 .await
863 .unwrap()
864 .decrypt_message(commit.to_bytes().unwrap())
865 .await
866 .unwrap();
867 assert!(
869 alice_central
870 .get_conversation_unchecked(&id)
871 .await
872 .members()
873 .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
874 );
875
876 assert!(
878 bob_central
879 .get_conversation_unchecked(&id)
880 .await
881 .members()
882 .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
883 );
884 assert!(alice_observer.has_changed().await);
885 })
886 },
887 )
888 .await
889 }
890
891 #[apply(all_cred_cipher)]
892 #[wasm_bindgen_test]
893 pub async fn decrypting_a_commit_should_discard_pending_external_proposals(case: TestContext) {
894 run_test_with_client_ids(
895 case.clone(),
896 ["alice", "bob", "charlie"],
897 move |[mut alice_central, bob_central, charlie_central]| {
898 Box::pin(async move {
899 let id = conversation_id();
900 alice_central
901 .transaction
902 .new_conversation(&id, case.credential_type, case.cfg.clone())
903 .await
904 .unwrap();
905 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
906
907 let ext_proposal = charlie_central
912 .transaction
913 .new_external_add_proposal(
914 id.clone(),
915 alice_central.get_conversation_unchecked(&id).await.group.epoch(),
916 case.ciphersuite(),
917 case.credential_type,
918 )
919 .await
920 .unwrap();
921 assert!(alice_central.pending_proposals(&id).await.is_empty());
922 alice_central
923 .transaction
924 .conversation(&id)
925 .await
926 .unwrap()
927 .decrypt_message(ext_proposal.to_bytes().unwrap())
928 .await
929 .unwrap();
930 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
931
932 bob_central
933 .transaction
934 .conversation(&id)
935 .await
936 .unwrap()
937 .update_key_material()
938 .await
939 .unwrap();
940 let commit = bob_central.mls_transport.latest_commit().await;
941 let alice_renewed_proposals = alice_central
942 .transaction
943 .conversation(&id)
944 .await
945 .unwrap()
946 .decrypt_message(commit.to_bytes().unwrap())
947 .await
948 .unwrap()
949 .proposals;
950 assert!(alice_renewed_proposals.is_empty());
951 assert!(alice_central.pending_proposals(&id).await.is_empty());
952 })
953 },
954 )
955 .await
956 }
957
958 #[apply(all_cred_cipher)]
959 #[wasm_bindgen_test]
960 async fn should_not_return_sender_client_id(case: TestContext) {
961 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
962 Box::pin(async move {
963 let id = conversation_id();
964 alice_central
965 .transaction
966 .new_conversation(&id, case.credential_type, case.cfg.clone())
967 .await
968 .unwrap();
969 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
970
971 alice_central
972 .transaction
973 .conversation(&id)
974 .await
975 .unwrap()
976 .update_key_material()
977 .await
978 .unwrap();
979 let commit = alice_central.mls_transport.latest_commit().await;
980
981 let sender_client_id = bob_central
982 .transaction
983 .conversation(&id)
984 .await
985 .unwrap()
986 .decrypt_message(commit.to_bytes().unwrap())
987 .await
988 .unwrap()
989 .sender_client_id;
990 assert!(sender_client_id.is_none());
991 })
992 })
993 .await
994 }
995 }
996
997 mod external_proposal {
998 use super::*;
999
1000 #[apply(all_cred_cipher)]
1001 #[wasm_bindgen_test]
1002 async fn can_decrypt_external_proposal(case: TestContext) {
1003 run_test_with_client_ids(
1004 case.clone(),
1005 ["alice", "bob", "alice2"],
1006 move |[alice_central, bob_central, alice2_central]| {
1007 Box::pin(async move {
1008 let id = conversation_id();
1009 alice_central
1010 .transaction
1011 .new_conversation(&id, case.credential_type, case.cfg.clone())
1012 .await
1013 .unwrap();
1014 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1015
1016 let bob_observer = TestEpochObserver::new();
1017 bob_central
1018 .session()
1019 .await
1020 .register_epoch_observer(bob_observer.clone())
1021 .await
1022 .unwrap();
1023
1024 let epoch = alice_central.get_conversation_unchecked(&id).await.group.epoch();
1025 let ext_proposal = alice2_central
1026 .transaction
1027 .new_external_add_proposal(id.clone(), epoch, case.ciphersuite(), case.credential_type)
1028 .await
1029 .unwrap();
1030
1031 let decrypted = alice_central
1032 .transaction
1033 .conversation(&id)
1034 .await
1035 .unwrap()
1036 .decrypt_message(&ext_proposal.to_bytes().unwrap())
1037 .await
1038 .unwrap();
1039 assert!(decrypted.app_msg.is_none());
1040 assert!(decrypted.delay.is_some());
1041
1042 let decrypted = bob_central
1043 .transaction
1044 .conversation(&id)
1045 .await
1046 .unwrap()
1047 .decrypt_message(&ext_proposal.to_bytes().unwrap())
1048 .await
1049 .unwrap();
1050 assert!(decrypted.app_msg.is_none());
1051 assert!(decrypted.delay.is_some());
1052 assert!(!bob_observer.has_changed().await)
1053 })
1054 },
1055 )
1056 .await
1057 }
1058 }
1059
1060 mod proposal {
1061 use super::*;
1062
1063 #[apply(all_cred_cipher)]
1065 #[wasm_bindgen_test]
1066 async fn can_decrypt_proposal(case: TestContext) {
1067 run_test_with_client_ids(
1068 case.clone(),
1069 ["alice", "bob", "charlie"],
1070 move |[alice_central, bob_central, charlie_central]| {
1071 Box::pin(async move {
1072 let id = conversation_id();
1073 alice_central
1074 .transaction
1075 .new_conversation(&id, case.credential_type, case.cfg.clone())
1076 .await
1077 .unwrap();
1078 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1079
1080 let charlie_kp = charlie_central.get_one_key_package(&case).await;
1081 let proposal = alice_central
1082 .transaction
1083 .new_add_proposal(&id, charlie_kp)
1084 .await
1085 .unwrap()
1086 .proposal;
1087
1088 let decrypted = bob_central
1089 .transaction
1090 .conversation(&id)
1091 .await
1092 .unwrap()
1093 .decrypt_message(proposal.to_bytes().unwrap())
1094 .await
1095 .unwrap();
1096
1097 assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 2);
1098 bob_central
1100 .transaction
1101 .conversation(&id)
1102 .await
1103 .unwrap()
1104 .commit_pending_proposals()
1105 .await
1106 .unwrap();
1107 assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 3);
1108
1109 alice_central.verify_sender_identity(&case, &decrypted).await;
1110 })
1111 },
1112 )
1113 .await
1114 }
1115
1116 #[apply(all_cred_cipher)]
1117 #[wasm_bindgen_test]
1118 async fn should_not_return_sender_client_id(case: TestContext) {
1119 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1120 Box::pin(async move {
1121 let id = conversation_id();
1122 alice_central
1123 .transaction
1124 .new_conversation(&id, case.credential_type, case.cfg.clone())
1125 .await
1126 .unwrap();
1127 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1128
1129 let proposal = alice_central
1130 .transaction
1131 .new_update_proposal(&id)
1132 .await
1133 .unwrap()
1134 .proposal;
1135
1136 let sender_client_id = bob_central
1137 .transaction
1138 .conversation(&id)
1139 .await
1140 .unwrap()
1141 .decrypt_message(proposal.to_bytes().unwrap())
1142 .await
1143 .unwrap()
1144 .sender_client_id;
1145 assert!(sender_client_id.is_none());
1146 })
1147 })
1148 .await
1149 }
1150 }
1151
1152 mod app_message {
1153 use super::*;
1154
1155 #[apply(all_cred_cipher)]
1156 #[wasm_bindgen_test]
1157 async fn can_decrypt_app_message(case: TestContext) {
1158 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1159 Box::pin(async move {
1160 let id = conversation_id();
1161 alice_central
1162 .transaction
1163 .new_conversation(&id, case.credential_type, case.cfg.clone())
1164 .await
1165 .unwrap();
1166 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1167
1168 let alice_observer = TestEpochObserver::new();
1169 alice_central
1170 .session()
1171 .await
1172 .register_epoch_observer(alice_observer.clone())
1173 .await
1174 .unwrap();
1175 let bob_observer = TestEpochObserver::new();
1176 bob_central
1177 .session()
1178 .await
1179 .register_epoch_observer(bob_observer.clone())
1180 .await
1181 .unwrap();
1182
1183 let msg = b"Hello bob";
1184 let encrypted = alice_central
1185 .transaction
1186 .conversation(&id)
1187 .await
1188 .unwrap()
1189 .encrypt_message(msg)
1190 .await
1191 .unwrap();
1192 assert_ne!(&msg[..], &encrypted[..]);
1193 let decrypted = bob_central
1194 .transaction
1195 .conversation(&id)
1196 .await
1197 .unwrap()
1198 .decrypt_message(encrypted)
1199 .await
1200 .unwrap();
1201 let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1202 assert_eq!(dec_msg, &msg[..]);
1203 assert!(!bob_observer.has_changed().await);
1204 alice_central.verify_sender_identity(&case, &decrypted).await;
1205
1206 let msg = b"Hello alice";
1207 let encrypted = bob_central
1208 .transaction
1209 .conversation(&id)
1210 .await
1211 .unwrap()
1212 .encrypt_message(msg)
1213 .await
1214 .unwrap();
1215 assert_ne!(&msg[..], &encrypted[..]);
1216 let decrypted = alice_central
1217 .transaction
1218 .conversation(&id)
1219 .await
1220 .unwrap()
1221 .decrypt_message(encrypted)
1222 .await
1223 .unwrap();
1224 let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1225 assert_eq!(dec_msg, &msg[..]);
1226 assert!(!alice_observer.has_changed().await);
1227 bob_central.verify_sender_identity(&case, &decrypted).await;
1228 })
1229 })
1230 .await
1231 }
1232
1233 #[apply(all_cred_cipher)]
1234 #[wasm_bindgen_test]
1235 async fn cannot_decrypt_app_message_after_rejoining(case: TestContext) {
1236 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1237 Box::pin(async move {
1238 let id = conversation_id();
1239 alice_central
1240 .transaction
1241 .new_conversation(&id, case.credential_type, case.cfg.clone())
1242 .await
1243 .unwrap();
1244 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1245
1246 let msg = b"Hello bob";
1248 let encrypted = alice_central
1249 .transaction
1250 .conversation(&id)
1251 .await
1252 .unwrap()
1253 .encrypt_message(msg)
1254 .await
1255 .unwrap();
1256
1257 let gi = alice_central.get_group_info(&id).await;
1260 bob_central
1261 .transaction
1262 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
1263 .await
1264 .unwrap();
1265
1266 let decrypt = bob_central
1268 .transaction
1269 .conversation(&id)
1270 .await
1271 .unwrap()
1272 .decrypt_message(&encrypted)
1273 .await;
1274 assert!(matches!(decrypt.unwrap_err(), Error::DecryptionError));
1275 })
1276 })
1277 .await
1278 }
1279
1280 #[apply(all_cred_cipher)]
1281 #[wasm_bindgen_test]
1282 async fn cannot_decrypt_app_message_from_future_epoch(case: TestContext) {
1283 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1284 Box::pin(async move {
1285 let id = conversation_id();
1286 alice_central
1287 .transaction
1288 .new_conversation(&id, case.credential_type, case.cfg.clone())
1289 .await
1290 .unwrap();
1291 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1292
1293 alice_central
1295 .transaction
1296 .conversation(&id)
1297 .await
1298 .unwrap()
1299 .update_key_material()
1300 .await
1301 .unwrap();
1302 let commit = alice_central.mls_transport.latest_commit().await;
1303
1304 let msg = b"Hello bob";
1306 let encrypted = alice_central
1307 .transaction
1308 .conversation(&id)
1309 .await
1310 .unwrap()
1311 .encrypt_message(msg)
1312 .await
1313 .unwrap();
1314
1315 let decrypt = bob_central
1317 .transaction
1318 .conversation(&id)
1319 .await
1320 .unwrap()
1321 .decrypt_message(&encrypted)
1322 .await;
1323 assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));
1324
1325 let decrypted_commit = bob_central
1326 .transaction
1327 .conversation(&id)
1328 .await
1329 .unwrap()
1330 .decrypt_message(commit.to_bytes().unwrap())
1331 .await
1332 .unwrap();
1333 let buffered_msg = decrypted_commit.buffered_messages.unwrap();
1334 let decrypted_msg = buffered_msg.first().unwrap().app_msg.clone().unwrap();
1335 assert_eq!(&decrypted_msg, msg);
1336 })
1337 })
1338 .await
1339 }
1340
1341 #[apply(all_cred_cipher)]
1342 #[wasm_bindgen_test]
1343 async fn can_decrypt_app_message_in_any_order(mut case: TestContext) {
1344 case.cfg.custom.maximum_forward_distance = 0;
1347 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1348 Box::pin(async move {
1349 let id = conversation_id();
1350 alice_central
1351 .transaction
1352 .new_conversation(&id, case.credential_type, case.cfg.clone())
1353 .await
1354 .unwrap();
1355 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1356
1357 let out_of_order_tolerance = case.custom_cfg().out_of_order_tolerance;
1358 let nb_messages = out_of_order_tolerance * 2;
1359 let mut messages = vec![];
1360
1361 for i in 0..nb_messages {
1363 let msg = format!("Hello {i}");
1364 let encrypted = alice_central
1365 .transaction
1366 .conversation(&id)
1367 .await
1368 .unwrap()
1369 .encrypt_message(&msg)
1370 .await
1371 .unwrap();
1372 messages.push((msg, encrypted));
1373 }
1374
1375 messages.reverse();
1377 for (i, (original, encrypted)) in messages.iter().enumerate() {
1378 let decrypt = bob_central
1379 .transaction
1380 .conversation(&id)
1381 .await
1382 .unwrap()
1383 .decrypt_message(encrypted)
1384 .await;
1385 if i > out_of_order_tolerance as usize {
1386 let decrypted = decrypt.unwrap().app_msg.unwrap();
1387 assert_eq!(decrypted, original.as_bytes());
1388 } else {
1389 assert!(matches!(decrypt.unwrap_err(), Error::DuplicateMessage))
1390 }
1391 }
1392 })
1393 })
1394 .await
1395 }
1396
1397 #[apply(all_cred_cipher)]
1398 #[wasm_bindgen_test]
1399 async fn returns_sender_client_id(case: TestContext) {
1400 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1401 Box::pin(async move {
1402 let id = conversation_id();
1403 alice_central
1404 .transaction
1405 .new_conversation(&id, case.credential_type, case.cfg.clone())
1406 .await
1407 .unwrap();
1408 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1409
1410 let msg = b"Hello bob";
1411 let encrypted = alice_central
1412 .transaction
1413 .conversation(&id)
1414 .await
1415 .unwrap()
1416 .encrypt_message(msg)
1417 .await
1418 .unwrap();
1419 assert_ne!(&msg[..], &encrypted[..]);
1420
1421 let sender_client_id = bob_central
1422 .transaction
1423 .conversation(&id)
1424 .await
1425 .unwrap()
1426 .decrypt_message(encrypted)
1427 .await
1428 .unwrap()
1429 .sender_client_id
1430 .unwrap();
1431 assert_eq!(sender_client_id, alice_central.get_client_id().await);
1432 })
1433 })
1434 .await
1435 }
1436 }
1437
1438 mod epoch_sync {
1439 use super::*;
1440
1441 #[apply(all_cred_cipher)]
1442 #[wasm_bindgen_test]
1443 async fn should_throw_specialized_error_when_epoch_too_old(mut case: TestContext) {
1444 case.cfg.custom.out_of_order_tolerance = 0;
1445 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1446 Box::pin(async move {
1447 let id = conversation_id();
1448 alice_central
1449 .transaction
1450 .new_conversation(&id, case.credential_type, case.cfg.clone())
1451 .await
1452 .unwrap();
1453 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1454
1455 let bob_message1 = alice_central
1457 .transaction
1458 .conversation(&id)
1459 .await
1460 .unwrap()
1461 .encrypt_message(b"Hello Bob")
1462 .await
1463 .unwrap();
1464 let bob_message2 = alice_central
1465 .transaction
1466 .conversation(&id)
1467 .await
1468 .unwrap()
1469 .encrypt_message(b"Hello again Bob")
1470 .await
1471 .unwrap();
1472
1473 for _ in 0..MAX_PAST_EPOCHS {
1475 alice_central
1476 .transaction
1477 .conversation(&id)
1478 .await
1479 .unwrap()
1480 .update_key_material()
1481 .await
1482 .unwrap();
1483 let commit = alice_central.mls_transport.latest_commit().await;
1484 bob_central
1485 .transaction
1486 .conversation(&id)
1487 .await
1488 .unwrap()
1489 .decrypt_message(commit.to_bytes().unwrap())
1490 .await
1491 .unwrap();
1492 }
1493 let decrypt = bob_central
1495 .transaction
1496 .conversation(&id)
1497 .await
1498 .unwrap()
1499 .decrypt_message(&bob_message1)
1500 .await
1501 .unwrap();
1502 assert_eq!(decrypt.app_msg.unwrap(), b"Hello Bob");
1503
1504 alice_central
1506 .transaction
1507 .conversation(&id)
1508 .await
1509 .unwrap()
1510 .update_key_material()
1511 .await
1512 .unwrap();
1513 let commit = alice_central.mls_transport.latest_commit().await;
1514 bob_central
1515 .transaction
1516 .conversation(&id)
1517 .await
1518 .unwrap()
1519 .decrypt_message(commit.to_bytes().unwrap())
1520 .await
1521 .unwrap();
1522
1523 let decrypt = bob_central
1524 .transaction
1525 .conversation(&id)
1526 .await
1527 .unwrap()
1528 .decrypt_message(&bob_message2)
1529 .await;
1530 assert!(matches!(decrypt.unwrap_err(), Error::MessageEpochTooOld));
1531 })
1532 })
1533 .await
1534 }
1535
1536 #[apply(all_cred_cipher)]
1537 #[wasm_bindgen_test]
1538 async fn should_throw_specialized_error_when_epoch_desynchronized(mut case: TestContext) {
1539 case.cfg.custom.out_of_order_tolerance = 0;
1540 run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1541 Box::pin(async move {
1542 let id = conversation_id();
1543 alice_central
1544 .transaction
1545 .new_conversation(&id, case.credential_type, case.cfg.clone())
1546 .await
1547 .unwrap();
1548 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1549
1550 let old_proposal = alice_central
1552 .transaction
1553 .new_update_proposal(&id)
1554 .await
1555 .unwrap()
1556 .proposal
1557 .to_bytes()
1558 .unwrap();
1559 alice_central
1560 .get_conversation_unchecked(&id)
1561 .await
1562 .group
1563 .clear_pending_proposals();
1564 let old_commit = alice_central
1565 .create_unmerged_commit(&id)
1566 .await
1567 .commit
1568 .to_bytes()
1569 .unwrap();
1570 alice_central
1571 .transaction
1572 .conversation(&id)
1573 .await
1574 .unwrap()
1575 .clear_pending_commit()
1576 .await
1577 .unwrap();
1578
1579 alice_central
1581 .transaction
1582 .conversation(&id)
1583 .await
1584 .unwrap()
1585 .update_key_material()
1586 .await
1587 .unwrap();
1588 let commit = alice_central.mls_transport.latest_commit().await;
1589 bob_central
1590 .transaction
1591 .conversation(&id)
1592 .await
1593 .unwrap()
1594 .decrypt_message(commit.to_bytes().unwrap())
1595 .await
1596 .unwrap();
1597
1598 let decrypt_err = bob_central
1600 .transaction
1601 .conversation(&id)
1602 .await
1603 .unwrap()
1604 .decrypt_message(&old_proposal)
1605 .await
1606 .unwrap_err();
1607
1608 assert!(matches!(decrypt_err, Error::StaleProposal));
1609
1610 let decrypt_err = bob_central
1611 .transaction
1612 .conversation(&id)
1613 .await
1614 .unwrap()
1615 .decrypt_message(&old_commit)
1616 .await
1617 .unwrap_err();
1618
1619 assert!(matches!(decrypt_err, Error::StaleCommit));
1620 })
1621 })
1622 .await
1623 }
1624 }
1625}