1mod buffer_commit;
12pub(crate) mod buffer_messages;
13
14use super::{ConversationGuard, Result};
15use crate::e2e_identity::NewCrlDistributionPoints;
16use crate::mls::conversation::renew::Renew;
17use crate::mls::conversation::{Conversation, ConversationWithMls, Error};
18use crate::mls::credential::crl::{
19 extract_crl_uris_from_proposals, extract_crl_uris_from_update_path, get_new_crl_distribution_points,
20};
21use crate::mls::credential::ext::CredentialExt as _;
22use crate::obfuscate::Obfuscated;
23use crate::prelude::{ClientId, E2eiConversationState, Session};
24use crate::prelude::{MlsProposalBundle, WireIdentity};
25use crate::{MlsError, RecursiveError};
26use log::{debug, info};
27use openmls::framing::errors::{MessageDecryptionError, SecretTreeError};
28use openmls::framing::{MlsMessageIn, MlsMessageInBody, ProcessedMessage, ProtocolMessage};
29use openmls::prelude::{
30 ContentType, CredentialType, ProcessMessageError, ProcessedMessageContent, Proposal, StageCommitError,
31 StagedCommit, ValidationError,
32};
33use openmls_traits::OpenMlsCryptoProvider as _;
34use tls_codec::Deserialize as _;
35
36#[derive(Debug)]
39pub struct MlsConversationDecryptMessage {
40 pub app_msg: Option<Vec<u8>>,
42 pub proposals: Vec<MlsProposalBundle>,
47 pub is_active: bool,
49 pub delay: Option<u64>,
51 pub sender_client_id: Option<ClientId>,
53 #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
55 pub has_epoch_changed: bool,
56 pub identity: WireIdentity,
59 pub buffered_messages: Option<Vec<MlsBufferedConversationDecryptMessage>>,
63 pub crl_new_distribution_points: NewCrlDistributionPoints,
65}
66
67#[derive(Debug)]
69pub struct MlsBufferedConversationDecryptMessage {
70 pub app_msg: Option<Vec<u8>>,
72 pub proposals: Vec<MlsProposalBundle>,
74 pub is_active: bool,
76 pub delay: Option<u64>,
78 pub sender_client_id: Option<ClientId>,
80 #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
82 pub has_epoch_changed: bool,
83 pub identity: WireIdentity,
85 pub crl_new_distribution_points: NewCrlDistributionPoints,
87}
88
89impl From<MlsConversationDecryptMessage> for MlsBufferedConversationDecryptMessage {
90 fn from(from: MlsConversationDecryptMessage) -> Self {
91 #[expect(deprecated)]
93 Self {
94 app_msg: from.app_msg,
95 proposals: from.proposals,
96 is_active: from.is_active,
97 delay: from.delay,
98 sender_client_id: from.sender_client_id,
99 has_epoch_changed: from.has_epoch_changed,
100 identity: from.identity,
101 crl_new_distribution_points: from.crl_new_distribution_points,
102 }
103 }
104}
105
106struct ParsedMessage {
107 is_duplicate: bool,
108 protocol_message: ProtocolMessage,
109 content_type: ContentType,
110}
111
112#[derive(Clone, Copy, PartialEq, Eq)]
113enum RecursionPolicy {
114 AsNecessary,
115 None,
116}
117
118impl ConversationGuard {
119 pub async fn decrypt_message(&mut self, message: impl AsRef<[u8]>) -> Result<MlsConversationDecryptMessage> {
131 let mls_message_in =
132 MlsMessageIn::tls_deserialize(&mut message.as_ref()).map_err(Error::tls_deserialize("mls message in"))?;
133
134 let decrypt_message_result = self
135 .decrypt_message_inner(mls_message_in, RecursionPolicy::AsNecessary)
136 .await;
137
138 if let Err(Error::BufferedFutureMessage { message_epoch }) = decrypt_message_result {
143 self.buffer_future_message(message.as_ref()).await?;
144 let conversation = self.conversation().await;
145 info!(group_id = Obfuscated::from(conversation.id()); "Buffered future message from epoch {message_epoch}");
146 }
147 if let Err(Error::BufferedCommit) = decrypt_message_result {
148 self.buffer_commit(message).await?;
149 }
150
151 let decrypt_message = decrypt_message_result?;
152
153 if !decrypt_message.is_active {
154 self.wipe().await?;
155 }
156 Ok(decrypt_message)
157 }
158
159 async fn decrypt_message_inner(
161 &mut self,
162 message: MlsMessageIn,
163 recursion_policy: RecursionPolicy,
164 ) -> Result<MlsConversationDecryptMessage> {
165 let client = &self.session().await?;
166 let backend = &self.crypto_provider().await?;
167 let parsed_message = self.parse_message(message.clone()).await?;
168
169 let message_result = self.process_message(parsed_message).await;
170
171 if let Err(Error::Mls(crate::MlsError {
173 source:
174 crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::OwnCommit)),
175 ..
176 })) = message_result
177 {
178 let mut conversation = self.conversation_mut().await;
179 let ct = conversation.extract_confirmation_tag_from_own_commit(&message)?;
180 let mut decrypted_message = conversation.handle_own_commit(client, backend, ct).await?;
181 debug_assert!(
182 decrypted_message.buffered_messages.is_none(),
183 "decrypted message should be constructed with empty buffer"
184 );
185 if recursion_policy == RecursionPolicy::AsNecessary {
186 drop(conversation);
187 decrypted_message.buffered_messages = self.restore_and_clear_pending_messages().await?;
188 }
189
190 return Ok(decrypted_message);
191 }
192
193 if let Err(Error::Mls(crate::MlsError {
197 source:
198 crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::MissingProposal)),
199 ..
200 })) = message_result
201 {
202 return Err(Error::BufferedCommit);
203 }
204
205 let message = message_result?;
206
207 let credential = message.credential();
208 let epoch = message.epoch();
209
210 let identity = credential
211 .extract_identity(
212 self.ciphersuite().await,
213 backend.authentication_service().borrow().await.as_ref(),
214 )
215 .map_err(RecursiveError::mls_credential("extracting identity"))?;
216
217 let sender_client_id: ClientId = credential.credential.identity().into();
218
219 let decrypted = match message.into_content() {
220 ProcessedMessageContent::ApplicationMessage(app_msg) => {
221 let conversation = self.conversation().await;
222 debug!(
223 group_id = Obfuscated::from(&conversation.id),
224 epoch = epoch.as_u64(),
225 sender_client_id = Obfuscated::from(&sender_client_id);
226 "Application message"
227 );
228
229 #[expect(deprecated)]
231 MlsConversationDecryptMessage {
232 app_msg: Some(app_msg.into_bytes()),
233 proposals: vec![],
234 is_active: true,
235 delay: None,
236 sender_client_id: Some(sender_client_id),
237 has_epoch_changed: false,
238 identity,
239 buffered_messages: None,
240 crl_new_distribution_points: None.into(),
241 }
242 }
243 ProcessedMessageContent::ProposalMessage(proposal) => {
244 let mut conversation = self.conversation_mut().await;
245 let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
246 .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
247 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
248 .await
249 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
250
251 info!(
252 group_id = Obfuscated::from(&conversation.id),
253 sender = Obfuscated::from(proposal.sender()),
254 proposals = Obfuscated::from(&proposal.proposal);
255 "Received proposal"
256 );
257
258 conversation.group.store_pending_proposal(*proposal);
259 drop(conversation);
260 if let Some(commit) =
261 self.retrieve_buffered_commit()
262 .await
263 .map_err(RecursiveError::mls_conversation(
264 "retrieving buffered commit while handling proposal",
265 ))?
266 {
267 let process_result = self.try_process_buffered_commit(commit, recursion_policy).await;
268
269 if process_result.is_ok() {
270 self.clear_buffered_commit()
271 .await
272 .map_err(RecursiveError::mls_conversation(
273 "clearing buffered commit after successful application",
274 ))?;
275 }
276 if !matches!(process_result, Err(Error::BufferedCommit)) {
281 return process_result
285 .map_err(RecursiveError::mls_conversation("processing buffered commit"))
286 .map_err(Into::into);
287 }
288 }
289
290 let conversation = self.conversation().await;
291 let delay = conversation.compute_next_commit_delay();
292
293 #[expect(deprecated)]
295 MlsConversationDecryptMessage {
296 app_msg: None,
297 proposals: vec![],
298 is_active: true,
299 delay,
300 sender_client_id: None,
301 has_epoch_changed: false,
302 identity,
303 buffered_messages: None,
304 crl_new_distribution_points,
305 }
306 }
307 ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
308 self.validate_commit(&staged_commit).await?;
309 let mut conversation = self.conversation_mut().await;
310
311 let pending_proposals = conversation.self_pending_proposals().cloned().collect::<Vec<_>>();
312
313 let proposal_refs: Vec<Proposal> = pending_proposals
314 .iter()
315 .map(|p| p.proposal().clone())
316 .chain(
317 staged_commit
318 .add_proposals()
319 .map(|p| Proposal::Add(p.add_proposal().clone())),
320 )
321 .chain(
322 staged_commit
323 .update_proposals()
324 .map(|p| Proposal::Update(p.update_proposal().clone())),
325 )
326 .collect();
327
328 let mut crl_dps = extract_crl_uris_from_proposals(&proposal_refs)
330 .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
331 crl_dps.extend(
332 extract_crl_uris_from_update_path(&staged_commit)
333 .map_err(RecursiveError::mls_credential("extracting crl urls from update path"))?,
334 );
335
336 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
337 .await
338 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
339
340 let pending_commit = conversation.group.pending_commit().cloned();
342
343 conversation
344 .group
345 .merge_staged_commit(backend, *staged_commit.clone())
346 .await
347 .map_err(MlsError::wrap("merge staged commit"))?;
348
349 let (proposals_to_renew, needs_update) = Renew::renew(
350 &conversation.group.own_leaf_index(),
351 pending_proposals.into_iter(),
352 pending_commit.as_ref(),
353 staged_commit.as_ref(),
354 );
355 let proposals = conversation
356 .renew_proposals_for_current_epoch(client, backend, proposals_to_renew.into_iter(), needs_update)
357 .await?;
358
359 let mut buffered_messages = None;
361 drop(conversation);
363 if recursion_policy == RecursionPolicy::AsNecessary {
364 buffered_messages = self.restore_and_clear_pending_messages().await?;
365 }
366
367 let conversation = self.conversation().await;
368 let epoch = staged_commit.staged_context().epoch().as_u64();
369 info!(
370 group_id = Obfuscated::from(&conversation.id),
371 epoch,
372 proposals:? = staged_commit.queued_proposals().map(Obfuscated::from).collect::<Vec<_>>();
373 "Epoch advanced"
374 );
375 client.notify_epoch_changed(conversation.id.clone(), epoch).await;
376
377 #[expect(deprecated)]
379 MlsConversationDecryptMessage {
380 app_msg: None,
381 proposals,
382 is_active: conversation.group.is_active(),
383 delay: conversation.compute_next_commit_delay(),
384 sender_client_id: None,
385 has_epoch_changed: true,
386 identity,
387 buffered_messages,
388 crl_new_distribution_points,
389 }
390 }
391 ProcessedMessageContent::ExternalJoinProposalMessage(proposal) => {
392 let mut conversation = self.conversation_mut().await;
393 info!(
394 group_id = Obfuscated::from(&conversation.id),
395 sender = Obfuscated::from(proposal.sender());
396 "Received external join proposal"
397 );
398
399 let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
400 .map_err(RecursiveError::mls_credential("extracting crl uris from proposals"))?;
401 let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
402 .await
403 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
404 conversation.group.store_pending_proposal(*proposal);
405
406 #[expect(deprecated)]
408 MlsConversationDecryptMessage {
409 app_msg: None,
410 proposals: vec![],
411 is_active: true,
412 delay: conversation.compute_next_commit_delay(),
413 sender_client_id: None,
414 has_epoch_changed: false,
415 identity,
416 buffered_messages: None,
417 crl_new_distribution_points,
418 }
419 }
420 };
421
422 let mut conversation = self.conversation_mut().await;
423
424 conversation
425 .persist_group_when_changed(&backend.keystore(), false)
426 .await?;
427
428 Ok(decrypted)
429 }
430
431 async fn parse_message(&self, msg_in: MlsMessageIn) -> Result<ParsedMessage> {
432 let mut is_duplicate = false;
433 let conversation = self.conversation().await;
434 let backend = self.crypto_provider().await?;
435 let (protocol_message, content_type) = match msg_in.extract() {
436 MlsMessageInBody::PublicMessage(m) => {
437 is_duplicate = conversation.is_duplicate_message(&backend, &m)?;
438 let ct = m.content_type();
439 (ProtocolMessage::PublicMessage(m), ct)
440 }
441 MlsMessageInBody::PrivateMessage(m) => {
442 let ct = m.content_type();
443 (ProtocolMessage::PrivateMessage(m), ct)
444 }
445 _ => {
446 return Err(
447 MlsError::wrap("parsing inbound message")(ProcessMessageError::IncompatibleWireFormat).into(),
448 );
449 }
450 };
451 Ok(ParsedMessage {
452 is_duplicate,
453 protocol_message,
454 content_type,
455 })
456 }
457
458 async fn process_message(
459 &mut self,
460 ParsedMessage {
461 is_duplicate,
462 protocol_message,
463 content_type,
464 }: ParsedMessage,
465 ) -> Result<ProcessedMessage> {
466 let msg_epoch = protocol_message.epoch().as_u64();
467 let backend = self.crypto_provider().await?;
468 let mut conversation = self.conversation_mut().await;
469 let group_epoch = conversation.group.epoch().as_u64();
470 let processed_msg = conversation
471 .group
472 .process_message(&backend, protocol_message)
473 .await
474 .map_err(|e| match e {
475 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
476 MessageDecryptionError::GenerationOutOfBound,
477 )) => Error::DuplicateMessage,
478 ProcessMessageError::ValidationError(ValidationError::WrongEpoch) => {
479 if is_duplicate {
480 Error::DuplicateMessage
481 } else if msg_epoch == group_epoch + 1 {
482 Error::BufferedFutureMessage {
491 message_epoch: msg_epoch,
492 }
493 } else if msg_epoch < group_epoch {
494 match content_type {
495 ContentType::Application => Error::StaleMessage,
496 ContentType::Commit => Error::StaleCommit,
497 ContentType::Proposal => Error::StaleProposal,
498 }
499 } else {
500 Error::UnbufferedFarFutureMessage
501 }
502 }
503 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
504 MessageDecryptionError::AeadError,
505 )) => Error::DecryptionError,
506 ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
507 MessageDecryptionError::SecretTreeError(SecretTreeError::TooDistantInThePast),
508 )) => Error::MessageEpochTooOld,
509 _ => MlsError::wrap("processing message")(e).into(),
510 })?;
511 if is_duplicate {
512 return Err(Error::DuplicateMessage);
513 }
514 Ok(processed_msg)
515 }
516
517 async fn validate_commit(&self, commit: &StagedCommit) -> Result<()> {
518 let backend = self.crypto_provider().await?;
519 if backend.authentication_service().is_env_setup().await {
520 let credentials: Vec<_> = commit
521 .add_proposals()
522 .filter_map(|add_proposal| {
523 let credential = add_proposal.add_proposal().key_package().leaf_node().credential();
524
525 matches!(credential.credential_type(), CredentialType::X509).then(|| credential.clone())
526 })
527 .collect();
528 let state = Session::compute_conversation_state(
529 self.ciphersuite().await,
530 credentials.iter(),
531 crate::prelude::MlsCredentialType::X509,
532 backend.authentication_service().borrow().await.as_ref(),
533 )
534 .await;
535 if state != E2eiConversationState::Verified {
536 }
539 }
540 Ok(())
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use wasm_bindgen_test::*;
547
548 use crate::{
549 mls::conversation::{config::MAX_PAST_EPOCHS, error::Error},
550 test_utils::*,
551 };
552
553 use super::*;
554
555 wasm_bindgen_test_configure!(run_in_browser);
556
557 mod is_active {
558 use super::*;
559
560 #[apply(all_cred_cipher)]
561 #[wasm_bindgen_test]
562 pub async fn decrypting_a_regular_commit_should_leave_conversation_active(case: TestContext) {
563 let [alice_central, bob_central] = case.sessions().await;
564 Box::pin(async move {
565 let id = conversation_id();
566 alice_central
567 .transaction
568 .new_conversation(&id, case.credential_type, case.cfg.clone())
569 .await
570 .unwrap();
571 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
572
573 bob_central
574 .transaction
575 .conversation(&id)
576 .await
577 .unwrap()
578 .update_key_material()
579 .await
580 .unwrap();
581 let commit = bob_central.mls_transport().await.latest_commit().await;
582 let MlsConversationDecryptMessage { is_active, .. } = alice_central
583 .transaction
584 .conversation(&id)
585 .await
586 .unwrap()
587 .decrypt_message(commit.to_bytes().unwrap())
588 .await
589 .unwrap();
590 assert!(is_active)
591 })
592 .await
593 }
594
595 #[apply(all_cred_cipher)]
596 #[wasm_bindgen_test]
597 pub async fn decrypting_a_commit_removing_self_should_set_conversation_inactive(case: TestContext) {
598 let [alice_central, bob_central] = case.sessions().await;
599 Box::pin(async move {
600 let id = conversation_id();
601 alice_central
602 .transaction
603 .new_conversation(&id, case.credential_type, case.cfg.clone())
604 .await
605 .unwrap();
606 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
607
608 bob_central
609 .transaction
610 .conversation(&id)
611 .await
612 .unwrap()
613 .remove_members(&[alice_central.get_client_id().await])
614 .await
615 .unwrap();
616 let commit = bob_central.mls_transport().await.latest_commit().await;
617 let MlsConversationDecryptMessage { is_active, .. } = alice_central
618 .transaction
619 .conversation(&id)
620 .await
621 .unwrap()
622 .decrypt_message(commit.to_bytes().unwrap())
623 .await
624 .unwrap();
625 assert!(!is_active)
626 })
627 .await
628 }
629 }
630
631 mod commit {
632 use super::*;
633
634 #[apply(all_cred_cipher)]
635 #[wasm_bindgen_test]
636 pub async fn decrypting_a_commit_should_succeed(case: TestContext) {
637 let [alice_central, bob_central] = case.sessions().await;
638 Box::pin(async move {
639 let id = conversation_id();
640 alice_central
641 .transaction
642 .new_conversation(&id, case.credential_type, case.cfg.clone())
643 .await
644 .unwrap();
645 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
646
647 let bob_observer = TestEpochObserver::new();
648 bob_central
649 .session()
650 .await
651 .register_epoch_observer(bob_observer.clone())
652 .await
653 .unwrap();
654
655 let epoch_before = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
656
657 alice_central
658 .transaction
659 .conversation(&id)
660 .await
661 .unwrap()
662 .update_key_material()
663 .await
664 .unwrap();
665 let commit = alice_central.mls_transport().await.latest_commit().await;
666
667 let decrypted = bob_central
668 .transaction
669 .conversation(&id)
670 .await
671 .unwrap()
672 .decrypt_message(commit.to_bytes().unwrap())
673 .await
674 .unwrap();
675 let epoch_after = bob_central.transaction.conversation(&id).await.unwrap().epoch().await;
676 assert_eq!(epoch_after, epoch_before + 1);
677 assert!(bob_observer.has_changed().await);
678 assert!(decrypted.delay.is_none());
679 assert!(decrypted.app_msg.is_none());
680
681 alice_central.verify_sender_identity(&case, &decrypted).await;
682 })
683 .await
684 }
685
686 #[apply(all_cred_cipher)]
687 #[wasm_bindgen_test]
688 pub async fn decrypting_a_commit_should_not_renew_proposals_in_valid_commit(case: TestContext) {
689 let [mut alice_central, bob_central, charlie_central] = case.sessions().await;
690 Box::pin(async move {
691 let id = conversation_id();
692 alice_central
693 .transaction
694 .new_conversation(&id, case.credential_type, case.cfg.clone())
695 .await
696 .unwrap();
697 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
698
699 let alice_observer = TestEpochObserver::new();
700 alice_central
701 .session()
702 .await
703 .register_epoch_observer(alice_observer.clone())
704 .await
705 .unwrap();
706
707 let charlie_kp = charlie_central.get_one_key_package(&case).await;
712
713 let add_charlie_proposal = bob_central.transaction.new_add_proposal(&id, charlie_kp).await.unwrap();
714 alice_central
715 .transaction
716 .conversation(&id)
717 .await
718 .unwrap()
719 .decrypt_message(add_charlie_proposal.proposal.to_bytes().unwrap())
720 .await
721 .unwrap();
722
723 alice_observer.reset().await;
724
725 bob_central
726 .transaction
727 .conversation(&id)
728 .await
729 .unwrap()
730 .update_key_material()
731 .await
732 .unwrap();
733 let commit = bob_central.mls_transport().await.latest_commit().await;
734 let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
735 .transaction
736 .conversation(&id)
737 .await
738 .unwrap()
739 .decrypt_message(commit.to_bytes().unwrap())
740 .await
741 .unwrap();
742 assert!(proposals.is_empty());
743 assert!(delay.is_none());
744 assert!(alice_central.pending_proposals(&id).await.is_empty());
745 assert!(alice_observer.has_changed().await);
746 })
747 .await
748 }
749
750 #[apply(all_cred_cipher)]
752 #[wasm_bindgen_test]
753 pub async fn decrypting_a_commit_should_renew_orphan_pending_proposals(case: TestContext) {
754 let [mut alice_central, mut bob_central, charlie_central] = case.sessions().await;
755 Box::pin(async move {
756 let id = conversation_id();
757 alice_central
758 .transaction
759 .new_conversation(&id, case.credential_type, case.cfg.clone())
760 .await
761 .unwrap();
762 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
763
764 let alice_observer = TestEpochObserver::new();
765 alice_central
766 .session()
767 .await
768 .register_epoch_observer(alice_observer.clone())
769 .await
770 .unwrap();
771
772 bob_central
776 .transaction
777 .conversation(&id)
778 .await
779 .unwrap()
780 .update_key_material()
781 .await
782 .unwrap();
783 let bob_commit = bob_central.mls_transport().await.latest_commit().await;
784 let commit_epoch = bob_commit.epoch().unwrap();
785
786 let charlie_kp = charlie_central.get_one_key_package(&case).await;
788 alice_central
789 .transaction
790 .new_add_proposal(&id, charlie_kp)
791 .await
792 .unwrap();
793 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
794
795 let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
797 .transaction
798 .conversation(&id)
799 .await
800 .unwrap()
801 .decrypt_message(bob_commit.to_bytes().unwrap())
802 .await
803 .unwrap();
804 assert!(
806 !alice_central
807 .get_conversation_unchecked(&id)
808 .await
809 .members()
810 .contains_key(b"charlie".as_slice())
811 );
812 assert!(delay.is_some());
814
815 assert!(!proposals.is_empty());
817 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
818 let renewed_proposal = proposals.first().unwrap();
819 assert_eq!(
820 commit_epoch.as_u64() + 1,
821 renewed_proposal.proposal.epoch().unwrap().as_u64()
822 );
823
824 alice_observer.reset().await;
827
828 bob_central
830 .transaction
831 .conversation(&id)
832 .await
833 .unwrap()
834 .decrypt_message(renewed_proposal.proposal.to_bytes().unwrap())
835 .await
836 .unwrap();
837 assert_eq!(bob_central.pending_proposals(&id).await.len(), 1);
838 bob_central
839 .transaction
840 .conversation(&id)
841 .await
842 .unwrap()
843 .commit_pending_proposals()
844 .await
845 .unwrap();
846 let commit = bob_central.mls_transport().await.latest_commit().await;
847 let _decrypted = alice_central
848 .transaction
849 .conversation(&id)
850 .await
851 .unwrap()
852 .decrypt_message(commit.to_bytes().unwrap())
853 .await
854 .unwrap();
855 assert!(
857 alice_central
858 .get_conversation_unchecked(&id)
859 .await
860 .members()
861 .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
862 );
863
864 assert!(
866 bob_central
867 .get_conversation_unchecked(&id)
868 .await
869 .members()
870 .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
871 );
872 assert!(alice_observer.has_changed().await);
873 })
874 .await
875 }
876
877 #[apply(all_cred_cipher)]
878 #[wasm_bindgen_test]
879 pub async fn decrypting_a_commit_should_discard_pending_external_proposals(case: TestContext) {
880 let [mut alice_central, bob_central, charlie_central] = case.sessions().await;
881 Box::pin(async move {
882 let id = conversation_id();
883 alice_central
884 .transaction
885 .new_conversation(&id, case.credential_type, case.cfg.clone())
886 .await
887 .unwrap();
888 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
889
890 let ext_proposal = charlie_central
895 .transaction
896 .new_external_add_proposal(
897 id.clone(),
898 alice_central.get_conversation_unchecked(&id).await.group.epoch(),
899 case.ciphersuite(),
900 case.credential_type,
901 )
902 .await
903 .unwrap();
904 assert!(alice_central.pending_proposals(&id).await.is_empty());
905 alice_central
906 .transaction
907 .conversation(&id)
908 .await
909 .unwrap()
910 .decrypt_message(ext_proposal.to_bytes().unwrap())
911 .await
912 .unwrap();
913 assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
914
915 bob_central
916 .transaction
917 .conversation(&id)
918 .await
919 .unwrap()
920 .update_key_material()
921 .await
922 .unwrap();
923 let commit = bob_central.mls_transport().await.latest_commit().await;
924 let alice_renewed_proposals = alice_central
925 .transaction
926 .conversation(&id)
927 .await
928 .unwrap()
929 .decrypt_message(commit.to_bytes().unwrap())
930 .await
931 .unwrap()
932 .proposals;
933 assert!(alice_renewed_proposals.is_empty());
934 assert!(alice_central.pending_proposals(&id).await.is_empty());
935 })
936 .await
937 }
938
939 #[apply(all_cred_cipher)]
940 #[wasm_bindgen_test]
941 async fn should_not_return_sender_client_id(case: TestContext) {
942 let [alice_central, bob_central] = case.sessions().await;
943 Box::pin(async move {
944 let id = conversation_id();
945 alice_central
946 .transaction
947 .new_conversation(&id, case.credential_type, case.cfg.clone())
948 .await
949 .unwrap();
950 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
951
952 alice_central
953 .transaction
954 .conversation(&id)
955 .await
956 .unwrap()
957 .update_key_material()
958 .await
959 .unwrap();
960 let commit = alice_central.mls_transport().await.latest_commit().await;
961
962 let sender_client_id = bob_central
963 .transaction
964 .conversation(&id)
965 .await
966 .unwrap()
967 .decrypt_message(commit.to_bytes().unwrap())
968 .await
969 .unwrap()
970 .sender_client_id;
971 assert!(sender_client_id.is_none());
972 })
973 .await
974 }
975 }
976
977 mod external_proposal {
978 use super::*;
979
980 #[apply(all_cred_cipher)]
981 #[wasm_bindgen_test]
982 async fn can_decrypt_external_proposal(case: TestContext) {
983 let [alice_central, bob_central, alice2_central] = case.sessions().await;
984 Box::pin(async move {
985 let id = conversation_id();
986 alice_central
987 .transaction
988 .new_conversation(&id, case.credential_type, case.cfg.clone())
989 .await
990 .unwrap();
991 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
992
993 let bob_observer = TestEpochObserver::new();
994 bob_central
995 .session()
996 .await
997 .register_epoch_observer(bob_observer.clone())
998 .await
999 .unwrap();
1000
1001 let epoch = alice_central.get_conversation_unchecked(&id).await.group.epoch();
1002 let ext_proposal = alice2_central
1003 .transaction
1004 .new_external_add_proposal(id.clone(), epoch, case.ciphersuite(), case.credential_type)
1005 .await
1006 .unwrap();
1007
1008 let decrypted = alice_central
1009 .transaction
1010 .conversation(&id)
1011 .await
1012 .unwrap()
1013 .decrypt_message(&ext_proposal.to_bytes().unwrap())
1014 .await
1015 .unwrap();
1016 assert!(decrypted.app_msg.is_none());
1017 assert!(decrypted.delay.is_some());
1018
1019 let decrypted = bob_central
1020 .transaction
1021 .conversation(&id)
1022 .await
1023 .unwrap()
1024 .decrypt_message(&ext_proposal.to_bytes().unwrap())
1025 .await
1026 .unwrap();
1027 assert!(decrypted.app_msg.is_none());
1028 assert!(decrypted.delay.is_some());
1029 assert!(!bob_observer.has_changed().await)
1030 })
1031 .await
1032 }
1033 }
1034
1035 mod proposal {
1036 use super::*;
1037
1038 #[apply(all_cred_cipher)]
1040 #[wasm_bindgen_test]
1041 async fn can_decrypt_proposal(case: TestContext) {
1042 let [alice_central, bob_central, charlie_central] = case.sessions().await;
1043 Box::pin(async move {
1044 let id = conversation_id();
1045 alice_central
1046 .transaction
1047 .new_conversation(&id, case.credential_type, case.cfg.clone())
1048 .await
1049 .unwrap();
1050 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1051
1052 let charlie_kp = charlie_central.get_one_key_package(&case).await;
1053 let proposal = alice_central
1054 .transaction
1055 .new_add_proposal(&id, charlie_kp)
1056 .await
1057 .unwrap()
1058 .proposal;
1059
1060 let decrypted = bob_central
1061 .transaction
1062 .conversation(&id)
1063 .await
1064 .unwrap()
1065 .decrypt_message(proposal.to_bytes().unwrap())
1066 .await
1067 .unwrap();
1068
1069 assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 2);
1070 bob_central
1072 .transaction
1073 .conversation(&id)
1074 .await
1075 .unwrap()
1076 .commit_pending_proposals()
1077 .await
1078 .unwrap();
1079 assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 3);
1080
1081 alice_central.verify_sender_identity(&case, &decrypted).await;
1082 })
1083 .await
1084 }
1085
1086 #[apply(all_cred_cipher)]
1087 #[wasm_bindgen_test]
1088 async fn should_not_return_sender_client_id(case: TestContext) {
1089 let [alice_central, bob_central] = case.sessions().await;
1090 Box::pin(async move {
1091 let id = conversation_id();
1092 alice_central
1093 .transaction
1094 .new_conversation(&id, case.credential_type, case.cfg.clone())
1095 .await
1096 .unwrap();
1097 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1098
1099 let proposal = alice_central
1100 .transaction
1101 .new_update_proposal(&id)
1102 .await
1103 .unwrap()
1104 .proposal;
1105
1106 let sender_client_id = bob_central
1107 .transaction
1108 .conversation(&id)
1109 .await
1110 .unwrap()
1111 .decrypt_message(proposal.to_bytes().unwrap())
1112 .await
1113 .unwrap()
1114 .sender_client_id;
1115 assert!(sender_client_id.is_none());
1116 })
1117 .await
1118 }
1119 }
1120
1121 mod app_message {
1122 use super::*;
1123
1124 #[apply(all_cred_cipher)]
1125 #[wasm_bindgen_test]
1126 async fn can_decrypt_app_message(case: TestContext) {
1127 let [alice_central, bob_central] = case.sessions().await;
1128 Box::pin(async move {
1129 let id = conversation_id();
1130 alice_central
1131 .transaction
1132 .new_conversation(&id, case.credential_type, case.cfg.clone())
1133 .await
1134 .unwrap();
1135 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1136
1137 let alice_observer = TestEpochObserver::new();
1138 alice_central
1139 .session()
1140 .await
1141 .register_epoch_observer(alice_observer.clone())
1142 .await
1143 .unwrap();
1144 let bob_observer = TestEpochObserver::new();
1145 bob_central
1146 .session()
1147 .await
1148 .register_epoch_observer(bob_observer.clone())
1149 .await
1150 .unwrap();
1151
1152 let msg = b"Hello bob";
1153 let encrypted = alice_central
1154 .transaction
1155 .conversation(&id)
1156 .await
1157 .unwrap()
1158 .encrypt_message(msg)
1159 .await
1160 .unwrap();
1161 assert_ne!(&msg[..], &encrypted[..]);
1162 let decrypted = bob_central
1163 .transaction
1164 .conversation(&id)
1165 .await
1166 .unwrap()
1167 .decrypt_message(encrypted)
1168 .await
1169 .unwrap();
1170 let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1171 assert_eq!(dec_msg, &msg[..]);
1172 assert!(!bob_observer.has_changed().await);
1173 alice_central.verify_sender_identity(&case, &decrypted).await;
1174
1175 let msg = b"Hello alice";
1176 let encrypted = bob_central
1177 .transaction
1178 .conversation(&id)
1179 .await
1180 .unwrap()
1181 .encrypt_message(msg)
1182 .await
1183 .unwrap();
1184 assert_ne!(&msg[..], &encrypted[..]);
1185 let decrypted = alice_central
1186 .transaction
1187 .conversation(&id)
1188 .await
1189 .unwrap()
1190 .decrypt_message(encrypted)
1191 .await
1192 .unwrap();
1193 let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1194 assert_eq!(dec_msg, &msg[..]);
1195 assert!(!alice_observer.has_changed().await);
1196 bob_central.verify_sender_identity(&case, &decrypted).await;
1197 })
1198 .await
1199 }
1200
1201 #[apply(all_cred_cipher)]
1202 #[wasm_bindgen_test]
1203 async fn cannot_decrypt_app_message_after_rejoining(case: TestContext) {
1204 let [alice_central, bob_central] = case.sessions().await;
1205 Box::pin(async move {
1206 let id = conversation_id();
1207 alice_central
1208 .transaction
1209 .new_conversation(&id, case.credential_type, case.cfg.clone())
1210 .await
1211 .unwrap();
1212 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1213
1214 let msg = b"Hello bob";
1216 let encrypted = alice_central
1217 .transaction
1218 .conversation(&id)
1219 .await
1220 .unwrap()
1221 .encrypt_message(msg)
1222 .await
1223 .unwrap();
1224
1225 let gi = alice_central.get_group_info(&id).await;
1228 bob_central
1229 .transaction
1230 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
1231 .await
1232 .unwrap();
1233
1234 let decrypt = bob_central
1236 .transaction
1237 .conversation(&id)
1238 .await
1239 .unwrap()
1240 .decrypt_message(&encrypted)
1241 .await;
1242 assert!(matches!(decrypt.unwrap_err(), Error::DecryptionError));
1243 })
1244 .await
1245 }
1246
1247 #[apply(all_cred_cipher)]
1248 #[wasm_bindgen_test]
1249 async fn cannot_decrypt_app_message_from_future_epoch(case: TestContext) {
1250 let [alice_central, bob_central] = case.sessions().await;
1251 Box::pin(async move {
1252 let id = conversation_id();
1253 alice_central
1254 .transaction
1255 .new_conversation(&id, case.credential_type, case.cfg.clone())
1256 .await
1257 .unwrap();
1258 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1259
1260 alice_central
1262 .transaction
1263 .conversation(&id)
1264 .await
1265 .unwrap()
1266 .update_key_material()
1267 .await
1268 .unwrap();
1269 let commit = alice_central.mls_transport().await.latest_commit().await;
1270
1271 let msg = b"Hello bob";
1273 let encrypted = alice_central
1274 .transaction
1275 .conversation(&id)
1276 .await
1277 .unwrap()
1278 .encrypt_message(msg)
1279 .await
1280 .unwrap();
1281
1282 let decrypt = bob_central
1284 .transaction
1285 .conversation(&id)
1286 .await
1287 .unwrap()
1288 .decrypt_message(&encrypted)
1289 .await;
1290 assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));
1291
1292 let decrypted_commit = bob_central
1293 .transaction
1294 .conversation(&id)
1295 .await
1296 .unwrap()
1297 .decrypt_message(commit.to_bytes().unwrap())
1298 .await
1299 .unwrap();
1300 let buffered_msg = decrypted_commit.buffered_messages.unwrap();
1301 let decrypted_msg = buffered_msg.first().unwrap().app_msg.clone().unwrap();
1302 assert_eq!(&decrypted_msg, msg);
1303 })
1304 .await
1305 }
1306
1307 #[apply(all_cred_cipher)]
1308 #[wasm_bindgen_test]
1309 async fn can_decrypt_app_message_in_any_order(mut case: TestContext) {
1310 case.cfg.custom.maximum_forward_distance = 0;
1313
1314 let [alice_central, bob_central] = case.sessions().await;
1315 Box::pin(async move {
1316 let id = conversation_id();
1317 alice_central
1318 .transaction
1319 .new_conversation(&id, case.credential_type, case.cfg.clone())
1320 .await
1321 .unwrap();
1322 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1323
1324 let out_of_order_tolerance = case.custom_cfg().out_of_order_tolerance;
1325 let nb_messages = out_of_order_tolerance * 2;
1326 let mut messages = vec![];
1327
1328 for i in 0..nb_messages {
1330 let msg = format!("Hello {i}");
1331 let encrypted = alice_central
1332 .transaction
1333 .conversation(&id)
1334 .await
1335 .unwrap()
1336 .encrypt_message(&msg)
1337 .await
1338 .unwrap();
1339 messages.push((msg, encrypted));
1340 }
1341
1342 messages.reverse();
1344 for (i, (original, encrypted)) in messages.iter().enumerate() {
1345 let decrypt = bob_central
1346 .transaction
1347 .conversation(&id)
1348 .await
1349 .unwrap()
1350 .decrypt_message(encrypted)
1351 .await;
1352 if i > out_of_order_tolerance as usize {
1353 let decrypted = decrypt.unwrap().app_msg.unwrap();
1354 assert_eq!(decrypted, original.as_bytes());
1355 } else {
1356 assert!(matches!(decrypt.unwrap_err(), Error::DuplicateMessage))
1357 }
1358 }
1359 })
1360 .await
1361 }
1362
1363 #[apply(all_cred_cipher)]
1364 #[wasm_bindgen_test]
1365 async fn returns_sender_client_id(case: TestContext) {
1366 let [alice_central, bob_central] = case.sessions().await;
1367 Box::pin(async move {
1368 let id = conversation_id();
1369 alice_central
1370 .transaction
1371 .new_conversation(&id, case.credential_type, case.cfg.clone())
1372 .await
1373 .unwrap();
1374 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1375
1376 let msg = b"Hello bob";
1377 let encrypted = alice_central
1378 .transaction
1379 .conversation(&id)
1380 .await
1381 .unwrap()
1382 .encrypt_message(msg)
1383 .await
1384 .unwrap();
1385 assert_ne!(&msg[..], &encrypted[..]);
1386
1387 let sender_client_id = bob_central
1388 .transaction
1389 .conversation(&id)
1390 .await
1391 .unwrap()
1392 .decrypt_message(encrypted)
1393 .await
1394 .unwrap()
1395 .sender_client_id
1396 .unwrap();
1397 assert_eq!(sender_client_id, alice_central.get_client_id().await);
1398 })
1399 .await
1400 }
1401 }
1402
1403 mod epoch_sync {
1404 use super::*;
1405
1406 #[apply(all_cred_cipher)]
1407 #[wasm_bindgen_test]
1408 async fn should_throw_specialized_error_when_epoch_too_old(mut case: TestContext) {
1409 case.cfg.custom.out_of_order_tolerance = 0;
1410
1411 let [alice_central, bob_central] = case.sessions().await;
1412 Box::pin(async move {
1413 let id = conversation_id();
1414 alice_central
1415 .transaction
1416 .new_conversation(&id, case.credential_type, case.cfg.clone())
1417 .await
1418 .unwrap();
1419 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1420
1421 let bob_message1 = alice_central
1423 .transaction
1424 .conversation(&id)
1425 .await
1426 .unwrap()
1427 .encrypt_message(b"Hello Bob")
1428 .await
1429 .unwrap();
1430 let bob_message2 = alice_central
1431 .transaction
1432 .conversation(&id)
1433 .await
1434 .unwrap()
1435 .encrypt_message(b"Hello again Bob")
1436 .await
1437 .unwrap();
1438
1439 for _ in 0..MAX_PAST_EPOCHS {
1441 alice_central
1442 .transaction
1443 .conversation(&id)
1444 .await
1445 .unwrap()
1446 .update_key_material()
1447 .await
1448 .unwrap();
1449 let commit = alice_central.mls_transport().await.latest_commit().await;
1450 bob_central
1451 .transaction
1452 .conversation(&id)
1453 .await
1454 .unwrap()
1455 .decrypt_message(commit.to_bytes().unwrap())
1456 .await
1457 .unwrap();
1458 }
1459 let decrypt = bob_central
1461 .transaction
1462 .conversation(&id)
1463 .await
1464 .unwrap()
1465 .decrypt_message(&bob_message1)
1466 .await
1467 .unwrap();
1468 assert_eq!(decrypt.app_msg.unwrap(), b"Hello Bob");
1469
1470 alice_central
1472 .transaction
1473 .conversation(&id)
1474 .await
1475 .unwrap()
1476 .update_key_material()
1477 .await
1478 .unwrap();
1479 let commit = alice_central.mls_transport().await.latest_commit().await;
1480 bob_central
1481 .transaction
1482 .conversation(&id)
1483 .await
1484 .unwrap()
1485 .decrypt_message(commit.to_bytes().unwrap())
1486 .await
1487 .unwrap();
1488
1489 let decrypt = bob_central
1490 .transaction
1491 .conversation(&id)
1492 .await
1493 .unwrap()
1494 .decrypt_message(&bob_message2)
1495 .await;
1496 assert!(matches!(decrypt.unwrap_err(), Error::MessageEpochTooOld));
1497 })
1498 .await
1499 }
1500
1501 #[apply(all_cred_cipher)]
1502 #[wasm_bindgen_test]
1503 async fn should_throw_specialized_error_when_epoch_desynchronized(mut case: TestContext) {
1504 case.cfg.custom.out_of_order_tolerance = 0;
1505
1506 let [alice_central, bob_central] = case.sessions().await;
1507 Box::pin(async move {
1508 let id = conversation_id();
1509 alice_central
1510 .transaction
1511 .new_conversation(&id, case.credential_type, case.cfg.clone())
1512 .await
1513 .unwrap();
1514 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1515
1516 let old_proposal = alice_central
1518 .transaction
1519 .new_update_proposal(&id)
1520 .await
1521 .unwrap()
1522 .proposal
1523 .to_bytes()
1524 .unwrap();
1525 alice_central
1526 .get_conversation_unchecked(&id)
1527 .await
1528 .group
1529 .clear_pending_proposals();
1530 let old_commit = alice_central
1531 .create_unmerged_commit(&id)
1532 .await
1533 .commit
1534 .to_bytes()
1535 .unwrap();
1536 alice_central
1537 .transaction
1538 .conversation(&id)
1539 .await
1540 .unwrap()
1541 .clear_pending_commit()
1542 .await
1543 .unwrap();
1544
1545 alice_central
1547 .transaction
1548 .conversation(&id)
1549 .await
1550 .unwrap()
1551 .update_key_material()
1552 .await
1553 .unwrap();
1554 let commit = alice_central.mls_transport().await.latest_commit().await;
1555 bob_central
1556 .transaction
1557 .conversation(&id)
1558 .await
1559 .unwrap()
1560 .decrypt_message(commit.to_bytes().unwrap())
1561 .await
1562 .unwrap();
1563
1564 let decrypt_err = bob_central
1566 .transaction
1567 .conversation(&id)
1568 .await
1569 .unwrap()
1570 .decrypt_message(&old_proposal)
1571 .await
1572 .unwrap_err();
1573
1574 assert!(matches!(decrypt_err, Error::StaleProposal));
1575
1576 let decrypt_err = bob_central
1577 .transaction
1578 .conversation(&id)
1579 .await
1580 .unwrap()
1581 .decrypt_message(&old_commit)
1582 .await
1583 .unwrap_err();
1584
1585 assert!(matches!(decrypt_err, Error::StaleCommit));
1586 })
1587 .await
1588 }
1589 }
1590}