core_crypto/mls/conversation/conversation_guard/decrypt/
mod.rs

1//! MLS defines 3 kind of messages: Proposal, Commits and Application messages. Since they can (should)
2//! be all encrypted we need to first decrypt them before deciding what to do with them.
3//!
4//! This table summarizes when a MLS group can decrypt any message:
5//!
6//! | can decrypt ?     | 0 pend. Commit | 1 pend. Commit |
7//! |-------------------|----------------|----------------|
8//! | 0 pend. Proposal  | ✅              | ✅              |
9//! | 1+ pend. Proposal | ✅              | ✅              |
10
11mod buffer_commit;
12pub(crate) mod buffer_messages;
13
14use super::{ConversationGuard, Result};
15use crate::e2e_identity::NewCrlDistributionPoints;
16use crate::mls::conversation::renew::Renew;
17use crate::mls::conversation::{Conversation, ConversationWithMls, Error};
18use crate::mls::credential::crl::{
19    extract_crl_uris_from_proposals, extract_crl_uris_from_update_path, get_new_crl_distribution_points,
20};
21use crate::mls::credential::ext::CredentialExt as _;
22use crate::obfuscate::Obfuscated;
23use crate::prelude::{ClientId, E2eiConversationState, Session};
24use crate::prelude::{MlsProposalBundle, WireIdentity};
25use crate::{MlsError, RecursiveError};
26use log::{debug, info};
27use openmls::framing::errors::{MessageDecryptionError, SecretTreeError};
28use openmls::framing::{MlsMessageIn, MlsMessageInBody, ProcessedMessage, ProtocolMessage};
29use openmls::prelude::{
30    ContentType, CredentialType, ProcessMessageError, ProcessedMessageContent, Proposal, StageCommitError,
31    StagedCommit, ValidationError,
32};
33use openmls_traits::OpenMlsCryptoProvider as _;
34use tls_codec::Deserialize as _;
35
36/// Represents the potential items a consumer might require after passing us an encrypted message we
37/// have decrypted for him
38#[derive(Debug)]
39pub struct MlsConversationDecryptMessage {
40    /// Decrypted text message
41    pub app_msg: Option<Vec<u8>>,
42    /// Only when decrypted message is a commit, CoreCrypto will renew local proposal which could not make it in the commit.
43    /// This will contain either:
44    /// * local pending proposal not in the accepted commit
45    /// * If there is a pending commit, its proposals which are not in the accepted commit
46    pub proposals: Vec<MlsProposalBundle>,
47    /// Is the conversation still active after receiving this commit aka has the user been removed from the group
48    pub is_active: bool,
49    /// Delay time in seconds to feed caller timer for committing
50    pub delay: Option<u64>,
51    /// [ClientId] of the sender of the message being decrypted. Only present for application messages.
52    pub sender_client_id: Option<ClientId>,
53    /// Is the epoch changed after decrypting this message
54    #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
55    pub has_epoch_changed: bool,
56    /// Identity claims present in the sender credential
57    /// Present for all messages
58    pub identity: WireIdentity,
59    /// Only set when the decrypted message is a commit.
60    /// Contains buffered messages for next epoch which were received before the commit creating the epoch
61    /// because the DS did not fan them out in order.
62    pub buffered_messages: Option<Vec<MlsBufferedConversationDecryptMessage>>,
63    /// New CRL distribution points that appeared by the introduction of a new credential
64    pub crl_new_distribution_points: NewCrlDistributionPoints,
65}
66
67/// Type safe recursion of [MlsConversationDecryptMessage]
68#[derive(Debug)]
69pub struct MlsBufferedConversationDecryptMessage {
70    /// see [MlsConversationDecryptMessage]
71    pub app_msg: Option<Vec<u8>>,
72    /// see [MlsConversationDecryptMessage]
73    pub proposals: Vec<MlsProposalBundle>,
74    /// see [MlsConversationDecryptMessage]
75    pub is_active: bool,
76    /// see [MlsConversationDecryptMessage]
77    pub delay: Option<u64>,
78    /// see [MlsConversationDecryptMessage]
79    pub sender_client_id: Option<ClientId>,
80    /// see [MlsConversationDecryptMessage]
81    #[deprecated = "This member will be removed in the future. Prefer using the `EpochObserver` interface."]
82    pub has_epoch_changed: bool,
83    /// see [MlsConversationDecryptMessage]
84    pub identity: WireIdentity,
85    /// see [MlsConversationDecryptMessage]
86    pub crl_new_distribution_points: NewCrlDistributionPoints,
87}
88
89impl From<MlsConversationDecryptMessage> for MlsBufferedConversationDecryptMessage {
90    fn from(from: MlsConversationDecryptMessage) -> Self {
91        // we still support the `has_epoch_changed` field, though we'll remove it later
92        #[expect(deprecated)]
93        Self {
94            app_msg: from.app_msg,
95            proposals: from.proposals,
96            is_active: from.is_active,
97            delay: from.delay,
98            sender_client_id: from.sender_client_id,
99            has_epoch_changed: from.has_epoch_changed,
100            identity: from.identity,
101            crl_new_distribution_points: from.crl_new_distribution_points,
102        }
103    }
104}
105
106struct ParsedMessage {
107    is_duplicate: bool,
108    protocol_message: ProtocolMessage,
109    content_type: ContentType,
110}
111
112#[derive(Clone, Copy, PartialEq, Eq)]
113enum RecursionPolicy {
114    AsNecessary,
115    None,
116}
117
118impl ConversationGuard {
119    /// Deserializes a TLS-serialized message, then processes it
120    ///
121    /// # Arguments
122    /// * `message` - the encrypted message as a byte array
123    ///
124    /// # Returns
125    /// An [MlsConversationDecryptMessage]
126    ///
127    /// # Errors
128    /// If a message has been buffered, this will be indicated by an error.
129    /// Other errors are originating from OpenMls and the KeyStore
130    pub async fn decrypt_message(&mut self, message: impl AsRef<[u8]>) -> Result<MlsConversationDecryptMessage> {
131        let mls_message_in =
132            MlsMessageIn::tls_deserialize(&mut message.as_ref()).map_err(Error::tls_deserialize("mls message in"))?;
133
134        let decrypt_message_result = self
135            .decrypt_message_inner(mls_message_in, RecursionPolicy::AsNecessary)
136            .await;
137
138        // In the inner `decrypt_message` above, we raise the `BufferedCommit` or
139        // `BufferedFutureMessage` errors, but we only handle them here.
140        // That's because in the scope they're raised, we don't have access to the raw message
141        // bytes; here, we do.
142        if let Err(Error::BufferedFutureMessage { message_epoch }) = decrypt_message_result {
143            self.buffer_future_message(message.as_ref()).await?;
144            let conversation = self.conversation().await;
145            info!(group_id = Obfuscated::from(conversation.id()); "Buffered future message from epoch {message_epoch}");
146        }
147        if let Err(Error::BufferedCommit) = decrypt_message_result {
148            self.buffer_commit(message).await?;
149        }
150
151        let decrypt_message = decrypt_message_result?;
152
153        if !decrypt_message.is_active {
154            self.wipe().await?;
155        }
156        Ok(decrypt_message)
157    }
158
159    /// We need an inner part, because this may be called recursively.
160    async fn decrypt_message_inner(
161        &mut self,
162        message: MlsMessageIn,
163        recursion_policy: RecursionPolicy,
164    ) -> Result<MlsConversationDecryptMessage> {
165        let client = &self.session().await?;
166        let backend = &self.crypto_provider().await?;
167        let parsed_message = self.parse_message(message.clone()).await?;
168
169        let message_result = self.process_message(parsed_message).await;
170
171        // Handles the case where we receive our own commits.
172        if let Err(Error::Mls(crate::MlsError {
173            source:
174                crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::OwnCommit)),
175            ..
176        })) = message_result
177        {
178            let mut conversation = self.conversation_mut().await;
179            let ct = conversation.extract_confirmation_tag_from_own_commit(&message)?;
180            let mut decrypted_message = conversation.handle_own_commit(client, backend, ct).await?;
181            debug_assert!(
182                decrypted_message.buffered_messages.is_none(),
183                "decrypted message should be constructed with empty buffer"
184            );
185            if recursion_policy == RecursionPolicy::AsNecessary {
186                drop(conversation);
187                decrypted_message.buffered_messages = self.restore_and_clear_pending_messages().await?;
188            }
189
190            return Ok(decrypted_message);
191        }
192
193        // In this error case, we have a missing proposal, so we need to buffer the commit.
194        // We can't do that here--we don't have the appropriate data in scope--but we can at least
195        // produce the proper error and return that, so our caller can handle it.
196        if let Err(Error::Mls(crate::MlsError {
197            source:
198                crate::MlsErrorKind::MlsMessageError(ProcessMessageError::InvalidCommit(StageCommitError::MissingProposal)),
199            ..
200        })) = message_result
201        {
202            return Err(Error::BufferedCommit);
203        }
204
205        let message = message_result?;
206
207        let credential = message.credential();
208        let epoch = message.epoch();
209
210        let identity = credential
211            .extract_identity(
212                self.ciphersuite().await,
213                backend.authentication_service().borrow().await.as_ref(),
214            )
215            .map_err(RecursiveError::mls_credential("extracting identity"))?;
216
217        let sender_client_id: ClientId = credential.credential.identity().into();
218
219        let decrypted = match message.into_content() {
220            ProcessedMessageContent::ApplicationMessage(app_msg) => {
221                let conversation = self.conversation().await;
222                debug!(
223                    group_id = Obfuscated::from(&conversation.id),
224                    epoch = epoch.as_u64(),
225                    sender_client_id = Obfuscated::from(&sender_client_id);
226                    "Application message"
227                );
228
229                // we still support the `has_epoch_changed` field, though we'll remove it later
230                #[expect(deprecated)]
231                MlsConversationDecryptMessage {
232                    app_msg: Some(app_msg.into_bytes()),
233                    proposals: vec![],
234                    is_active: true,
235                    delay: None,
236                    sender_client_id: Some(sender_client_id),
237                    has_epoch_changed: false,
238                    identity,
239                    buffered_messages: None,
240                    crl_new_distribution_points: None.into(),
241                }
242            }
243            ProcessedMessageContent::ProposalMessage(proposal) => {
244                let mut conversation = self.conversation_mut().await;
245                let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
246                    .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
247                let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
248                    .await
249                    .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
250
251                info!(
252                    group_id = Obfuscated::from(&conversation.id),
253                    sender = Obfuscated::from(proposal.sender()),
254                    proposals = Obfuscated::from(&proposal.proposal);
255                    "Received proposal"
256                );
257
258                conversation.group.store_pending_proposal(*proposal);
259                drop(conversation);
260                if let Some(commit) =
261                    self.retrieve_buffered_commit()
262                        .await
263                        .map_err(RecursiveError::mls_conversation(
264                            "retrieving buffered commit while handling proposal",
265                        ))?
266                {
267                    let process_result = self.try_process_buffered_commit(commit, recursion_policy).await;
268
269                    if process_result.is_ok() {
270                        self.clear_buffered_commit()
271                            .await
272                            .map_err(RecursiveError::mls_conversation(
273                                "clearing buffered commit after successful application",
274                            ))?;
275                    }
276                    // If we got back a buffered commit error, then we still don't have enough proposals.
277                    // In that case, we want to just proceed as normal for this proposal.
278                    //
279                    // In any other case, the result from the commit overrides the result from the proposal.
280                    if !matches!(process_result, Err(Error::BufferedCommit)) {
281                        // either the commit applied successfully, in which case its return value
282                        // should override the return value from the proposal, or it raised some kind
283                        // of error, in which case the caller needs to know about that.
284                        return process_result
285                            .map_err(RecursiveError::mls_conversation("processing buffered commit"))
286                            .map_err(Into::into);
287                    }
288                }
289
290                let conversation = self.conversation().await;
291                let delay = conversation.compute_next_commit_delay();
292
293                // we still support the `has_epoch_changed` field, though we'll remove it later
294                #[expect(deprecated)]
295                MlsConversationDecryptMessage {
296                    app_msg: None,
297                    proposals: vec![],
298                    is_active: true,
299                    delay,
300                    sender_client_id: None,
301                    has_epoch_changed: false,
302                    identity,
303                    buffered_messages: None,
304                    crl_new_distribution_points,
305                }
306            }
307            ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
308                self.validate_commit(&staged_commit).await?;
309                let mut conversation = self.conversation_mut().await;
310
311                let pending_proposals = conversation.self_pending_proposals().cloned().collect::<Vec<_>>();
312
313                let proposal_refs: Vec<Proposal> = pending_proposals
314                    .iter()
315                    .map(|p| p.proposal().clone())
316                    .chain(
317                        staged_commit
318                            .add_proposals()
319                            .map(|p| Proposal::Add(p.add_proposal().clone())),
320                    )
321                    .chain(
322                        staged_commit
323                            .update_proposals()
324                            .map(|p| Proposal::Update(p.update_proposal().clone())),
325                    )
326                    .collect();
327
328                // - This requires a change in OpenMLS to get access to it
329                let mut crl_dps = extract_crl_uris_from_proposals(&proposal_refs)
330                    .map_err(RecursiveError::mls_credential("extracting crl urls from proposals"))?;
331                crl_dps.extend(
332                    extract_crl_uris_from_update_path(&staged_commit)
333                        .map_err(RecursiveError::mls_credential("extracting crl urls from update path"))?,
334                );
335
336                let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
337                    .await
338                    .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
339
340                // getting the pending has to be done before `merge_staged_commit` otherwise it's wiped out
341                let pending_commit = conversation.group.pending_commit().cloned();
342
343                conversation
344                    .group
345                    .merge_staged_commit(backend, *staged_commit.clone())
346                    .await
347                    .map_err(MlsError::wrap("merge staged commit"))?;
348
349                let (proposals_to_renew, needs_update) = Renew::renew(
350                    &conversation.group.own_leaf_index(),
351                    pending_proposals.into_iter(),
352                    pending_commit.as_ref(),
353                    staged_commit.as_ref(),
354                );
355                let proposals = conversation
356                    .renew_proposals_for_current_epoch(client, backend, proposals_to_renew.into_iter(), needs_update)
357                    .await?;
358
359                // can't use `.then` because async
360                let mut buffered_messages = None;
361                // drop conversation to allow borrowing `self` again
362                drop(conversation);
363                if recursion_policy == RecursionPolicy::AsNecessary {
364                    buffered_messages = self.restore_and_clear_pending_messages().await?;
365                }
366
367                let conversation = self.conversation().await;
368                let epoch = staged_commit.staged_context().epoch().as_u64();
369                info!(
370                    group_id = Obfuscated::from(&conversation.id),
371                    epoch,
372                    proposals:? = staged_commit.queued_proposals().map(Obfuscated::from).collect::<Vec<_>>();
373                    "Epoch advanced"
374                );
375                client.notify_epoch_changed(conversation.id.clone(), epoch).await;
376
377                // we still support the `has_epoch_changed` field, though we'll remove it later
378                #[expect(deprecated)]
379                MlsConversationDecryptMessage {
380                    app_msg: None,
381                    proposals,
382                    is_active: conversation.group.is_active(),
383                    delay: conversation.compute_next_commit_delay(),
384                    sender_client_id: None,
385                    has_epoch_changed: true,
386                    identity,
387                    buffered_messages,
388                    crl_new_distribution_points,
389                }
390            }
391            ProcessedMessageContent::ExternalJoinProposalMessage(proposal) => {
392                let mut conversation = self.conversation_mut().await;
393                info!(
394                    group_id = Obfuscated::from(&conversation.id),
395                    sender = Obfuscated::from(proposal.sender());
396                    "Received external join proposal"
397                );
398
399                let crl_dps = extract_crl_uris_from_proposals(&[proposal.proposal().clone()])
400                    .map_err(RecursiveError::mls_credential("extracting crl uris from proposals"))?;
401                let crl_new_distribution_points = get_new_crl_distribution_points(backend, crl_dps)
402                    .await
403                    .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
404                conversation.group.store_pending_proposal(*proposal);
405
406                // we still support the `has_epoch_changed` field, though we'll remove it later
407                #[expect(deprecated)]
408                MlsConversationDecryptMessage {
409                    app_msg: None,
410                    proposals: vec![],
411                    is_active: true,
412                    delay: conversation.compute_next_commit_delay(),
413                    sender_client_id: None,
414                    has_epoch_changed: false,
415                    identity,
416                    buffered_messages: None,
417                    crl_new_distribution_points,
418                }
419            }
420        };
421
422        let mut conversation = self.conversation_mut().await;
423
424        conversation
425            .persist_group_when_changed(&backend.keystore(), false)
426            .await?;
427
428        Ok(decrypted)
429    }
430
431    async fn parse_message(&self, msg_in: MlsMessageIn) -> Result<ParsedMessage> {
432        let mut is_duplicate = false;
433        let conversation = self.conversation().await;
434        let backend = self.crypto_provider().await?;
435        let (protocol_message, content_type) = match msg_in.extract() {
436            MlsMessageInBody::PublicMessage(m) => {
437                is_duplicate = conversation.is_duplicate_message(&backend, &m)?;
438                let ct = m.content_type();
439                (ProtocolMessage::PublicMessage(m), ct)
440            }
441            MlsMessageInBody::PrivateMessage(m) => {
442                let ct = m.content_type();
443                (ProtocolMessage::PrivateMessage(m), ct)
444            }
445            _ => {
446                return Err(
447                    MlsError::wrap("parsing inbound message")(ProcessMessageError::IncompatibleWireFormat).into(),
448                );
449            }
450        };
451        Ok(ParsedMessage {
452            is_duplicate,
453            protocol_message,
454            content_type,
455        })
456    }
457
458    async fn process_message(
459        &mut self,
460        ParsedMessage {
461            is_duplicate,
462            protocol_message,
463            content_type,
464        }: ParsedMessage,
465    ) -> Result<ProcessedMessage> {
466        let msg_epoch = protocol_message.epoch().as_u64();
467        let backend = self.crypto_provider().await?;
468        let mut conversation = self.conversation_mut().await;
469        let group_epoch = conversation.group.epoch().as_u64();
470        let processed_msg = conversation
471            .group
472            .process_message(&backend, protocol_message)
473            .await
474            .map_err(|e| match e {
475                ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
476                    MessageDecryptionError::GenerationOutOfBound,
477                )) => Error::DuplicateMessage,
478                ProcessMessageError::ValidationError(ValidationError::WrongEpoch) => {
479                    if is_duplicate {
480                        Error::DuplicateMessage
481                    } else if msg_epoch == group_epoch + 1 {
482                        // limit to next epoch otherwise if we were buffering a commit for epoch + 2
483                        // we would fail when trying to decrypt it in [MlsCentral::commit_accepted]
484
485                        // We need to buffer the message until the group has advanced to the right
486                        // epoch. We can't do that here--we don't have the appropriate data in scope
487                        // --but we can at least produce the proper error and return that, so our
488                        // caller can handle it. Our caller needs to know about the epoch number, so
489                        // we pass it back inside the error.
490                        Error::BufferedFutureMessage {
491                            message_epoch: msg_epoch,
492                        }
493                    } else if msg_epoch < group_epoch {
494                        match content_type {
495                            ContentType::Application => Error::StaleMessage,
496                            ContentType::Commit => Error::StaleCommit,
497                            ContentType::Proposal => Error::StaleProposal,
498                        }
499                    } else {
500                        Error::UnbufferedFarFutureMessage
501                    }
502                }
503                ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
504                    MessageDecryptionError::AeadError,
505                )) => Error::DecryptionError,
506                ProcessMessageError::ValidationError(ValidationError::UnableToDecrypt(
507                    MessageDecryptionError::SecretTreeError(SecretTreeError::TooDistantInThePast),
508                )) => Error::MessageEpochTooOld,
509                _ => MlsError::wrap("processing message")(e).into(),
510            })?;
511        if is_duplicate {
512            return Err(Error::DuplicateMessage);
513        }
514        Ok(processed_msg)
515    }
516
517    async fn validate_commit(&self, commit: &StagedCommit) -> Result<()> {
518        let backend = self.crypto_provider().await?;
519        if backend.authentication_service().is_env_setup().await {
520            let credentials: Vec<_> = commit
521                .add_proposals()
522                .filter_map(|add_proposal| {
523                    let credential = add_proposal.add_proposal().key_package().leaf_node().credential();
524
525                    matches!(credential.credential_type(), CredentialType::X509).then(|| credential.clone())
526                })
527                .collect();
528            let state = Session::compute_conversation_state(
529                self.ciphersuite().await,
530                credentials.iter(),
531                crate::prelude::MlsCredentialType::X509,
532                backend.authentication_service().borrow().await.as_ref(),
533            )
534            .await;
535            if state != E2eiConversationState::Verified {
536                // FIXME: Uncomment when PKI env can be seeded - the computation is still done to assess performance and impact of the validations. Tracking issue: WPB-9665
537                // return Err(Error::InvalidCertificateChain);
538            }
539        }
540        Ok(())
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use wasm_bindgen_test::*;
547
548    use crate::{
549        mls::conversation::{config::MAX_PAST_EPOCHS, error::Error},
550        test_utils::*,
551    };
552
553    use super::*;
554
555    wasm_bindgen_test_configure!(run_in_browser);
556
557    mod is_active {
558        use super::*;
559
560        #[apply(all_cred_cipher)]
561        #[wasm_bindgen_test]
562        pub async fn decrypting_a_regular_commit_should_leave_conversation_active(case: TestCase) {
563            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
564                Box::pin(async move {
565                    let id = conversation_id();
566                    alice_central
567                        .context
568                        .new_conversation(&id, case.credential_type, case.cfg.clone())
569                        .await
570                        .unwrap();
571                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
572
573                    bob_central
574                        .context
575                        .conversation(&id)
576                        .await
577                        .unwrap()
578                        .update_key_material()
579                        .await
580                        .unwrap();
581                    let commit = bob_central.mls_transport.latest_commit().await;
582                    let MlsConversationDecryptMessage { is_active, .. } = alice_central
583                        .context
584                        .conversation(&id)
585                        .await
586                        .unwrap()
587                        .decrypt_message(commit.to_bytes().unwrap())
588                        .await
589                        .unwrap();
590                    assert!(is_active)
591                })
592            })
593            .await
594        }
595
596        #[apply(all_cred_cipher)]
597        #[wasm_bindgen_test]
598        pub async fn decrypting_a_commit_removing_self_should_set_conversation_inactive(case: TestCase) {
599            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
600                Box::pin(async move {
601                    let id = conversation_id();
602                    alice_central
603                        .context
604                        .new_conversation(&id, case.credential_type, case.cfg.clone())
605                        .await
606                        .unwrap();
607                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
608
609                    bob_central
610                        .context
611                        .conversation(&id)
612                        .await
613                        .unwrap()
614                        .remove_members(&[alice_central.get_client_id().await])
615                        .await
616                        .unwrap();
617                    let commit = bob_central.mls_transport.latest_commit().await;
618                    let MlsConversationDecryptMessage { is_active, .. } = alice_central
619                        .context
620                        .conversation(&id)
621                        .await
622                        .unwrap()
623                        .decrypt_message(commit.to_bytes().unwrap())
624                        .await
625                        .unwrap();
626                    assert!(!is_active)
627                })
628            })
629            .await
630        }
631    }
632
633    mod commit {
634        use super::*;
635
636        #[apply(all_cred_cipher)]
637        #[wasm_bindgen_test]
638        pub async fn decrypting_a_commit_should_succeed(case: TestCase) {
639            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
640                Box::pin(async move {
641                    let id = conversation_id();
642                    alice_central
643                        .context
644                        .new_conversation(&id, case.credential_type, case.cfg.clone())
645                        .await
646                        .unwrap();
647                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
648
649                    let bob_observer = TestEpochObserver::new();
650                    bob_central
651                        .session()
652                        .await
653                        .register_epoch_observer(bob_observer.clone())
654                        .await
655                        .unwrap();
656
657                    let epoch_before = alice_central.context.conversation(&id).await.unwrap().epoch().await;
658
659                    alice_central
660                        .context
661                        .conversation(&id)
662                        .await
663                        .unwrap()
664                        .update_key_material()
665                        .await
666                        .unwrap();
667                    let commit = alice_central.mls_transport.latest_commit().await;
668
669                    let decrypted = bob_central
670                        .context
671                        .conversation(&id)
672                        .await
673                        .unwrap()
674                        .decrypt_message(commit.to_bytes().unwrap())
675                        .await
676                        .unwrap();
677                    let epoch_after = bob_central.context.conversation(&id).await.unwrap().epoch().await;
678                    assert_eq!(epoch_after, epoch_before + 1);
679                    assert!(bob_observer.has_changed().await);
680                    assert!(decrypted.delay.is_none());
681                    assert!(decrypted.app_msg.is_none());
682
683                    alice_central.verify_sender_identity(&case, &decrypted).await;
684                })
685            })
686            .await
687        }
688
689        #[apply(all_cred_cipher)]
690        #[wasm_bindgen_test]
691        pub async fn decrypting_a_commit_should_not_renew_proposals_in_valid_commit(case: TestCase) {
692            run_test_with_client_ids(
693                case.clone(),
694                ["alice", "bob", "charlie"],
695                move |[mut alice_central, bob_central, charlie_central]| {
696                    Box::pin(async move {
697                        let id = conversation_id();
698                        alice_central
699                            .context
700                            .new_conversation(&id, case.credential_type, case.cfg.clone())
701                            .await
702                            .unwrap();
703                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
704
705                        let alice_observer = TestEpochObserver::new();
706                        alice_central
707                            .session()
708                            .await
709                            .register_epoch_observer(alice_observer.clone())
710                            .await
711                            .unwrap();
712
713                        // Bob will create a proposal to add Charlie
714                        // Alice will decrypt this proposal
715                        // Then Bob will create a commit to update
716                        // Alice will decrypt the commit but musn't renew the proposal to add Charlie
717                        let charlie_kp = charlie_central.get_one_key_package(&case).await;
718
719                        let add_charlie_proposal = bob_central.context.new_add_proposal(&id, charlie_kp).await.unwrap();
720                        alice_central
721                            .context
722                            .conversation(&id)
723                            .await
724                            .unwrap()
725                            .decrypt_message(add_charlie_proposal.proposal.to_bytes().unwrap())
726                            .await
727                            .unwrap();
728
729                        alice_observer.reset().await;
730
731                        bob_central
732                            .context
733                            .conversation(&id)
734                            .await
735                            .unwrap()
736                            .update_key_material()
737                            .await
738                            .unwrap();
739                        let commit = bob_central.mls_transport.latest_commit().await;
740                        let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
741                            .context
742                            .conversation(&id)
743                            .await
744                            .unwrap()
745                            .decrypt_message(commit.to_bytes().unwrap())
746                            .await
747                            .unwrap();
748                        assert!(proposals.is_empty());
749                        assert!(delay.is_none());
750                        assert!(alice_central.pending_proposals(&id).await.is_empty());
751                        assert!(alice_observer.has_changed().await);
752                    })
753                },
754            )
755            .await
756        }
757
758        // orphan proposal = not backed by the pending commit
759        #[apply(all_cred_cipher)]
760        #[wasm_bindgen_test]
761        pub async fn decrypting_a_commit_should_renew_orphan_pending_proposals(case: TestCase) {
762            run_test_with_client_ids(
763                case.clone(),
764                ["alice", "bob", "charlie"],
765                move |[mut alice_central, mut bob_central, charlie_central]| {
766                    Box::pin(async move {
767                        let id = conversation_id();
768                        alice_central
769                            .context
770                            .new_conversation(&id, case.credential_type, case.cfg.clone())
771                            .await
772                            .unwrap();
773                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
774
775                        let alice_observer = TestEpochObserver::new();
776                        alice_central
777                            .session()
778                            .await
779                            .register_epoch_observer(alice_observer.clone())
780                            .await
781                            .unwrap();
782
783                        // Alice will create a proposal to add Charlie
784                        // Bob will create a commit which Alice will decrypt
785                        // Then Alice will renew her proposal
786                        bob_central
787                            .context
788                            .conversation(&id)
789                            .await
790                            .unwrap()
791                            .update_key_material()
792                            .await
793                            .unwrap();
794                        let bob_commit = bob_central.mls_transport.latest_commit().await;
795                        let commit_epoch = bob_commit.epoch().unwrap();
796
797                        // Alice propose to add Charlie
798                        let charlie_kp = charlie_central.get_one_key_package(&case).await;
799                        alice_central.context.new_add_proposal(&id, charlie_kp).await.unwrap();
800                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
801
802                        // But first she receives Bob commit
803                        let MlsConversationDecryptMessage { proposals, delay, .. } = alice_central
804                            .context
805                            .conversation(&id)
806                            .await
807                            .unwrap()
808                            .decrypt_message(bob_commit.to_bytes().unwrap())
809                            .await
810                            .unwrap();
811                        // So Charlie has not been added to the group
812                        assert!(
813                            !alice_central
814                                .get_conversation_unchecked(&id)
815                                .await
816                                .members()
817                                .contains_key(b"charlie".as_slice())
818                        );
819                        // Make sure we are suggesting a commit delay
820                        assert!(delay.is_some());
821
822                        // But its proposal to add Charlie has been renewed and is also in store
823                        assert!(!proposals.is_empty());
824                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
825                        let renewed_proposal = proposals.first().unwrap();
826                        assert_eq!(
827                            commit_epoch.as_u64() + 1,
828                            renewed_proposal.proposal.epoch().unwrap().as_u64()
829                        );
830
831                        // we don't care if there was an epoch change before this,
832                        // but we want to see if the epoch changes for alice now
833                        alice_observer.reset().await;
834
835                        // Let's use this proposal to see if it works
836                        bob_central
837                            .context
838                            .conversation(&id)
839                            .await
840                            .unwrap()
841                            .decrypt_message(renewed_proposal.proposal.to_bytes().unwrap())
842                            .await
843                            .unwrap();
844                        assert_eq!(bob_central.pending_proposals(&id).await.len(), 1);
845                        bob_central
846                            .context
847                            .conversation(&id)
848                            .await
849                            .unwrap()
850                            .commit_pending_proposals()
851                            .await
852                            .unwrap();
853                        let commit = bob_central.mls_transport.latest_commit().await;
854                        let _decrypted = alice_central
855                            .context
856                            .conversation(&id)
857                            .await
858                            .unwrap()
859                            .decrypt_message(commit.to_bytes().unwrap())
860                            .await
861                            .unwrap();
862                        // Charlie is now in the group
863                        assert!(
864                            alice_central
865                                .get_conversation_unchecked(&id)
866                                .await
867                                .members()
868                                .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
869                        );
870
871                        // Bob also has Charlie in the group
872                        assert!(
873                            bob_central
874                                .get_conversation_unchecked(&id)
875                                .await
876                                .members()
877                                .contains_key::<Vec<u8>>(&charlie_central.get_client_id().await.to_vec())
878                        );
879                        assert!(alice_observer.has_changed().await);
880                    })
881                },
882            )
883            .await
884        }
885
886        #[apply(all_cred_cipher)]
887        #[wasm_bindgen_test]
888        pub async fn decrypting_a_commit_should_discard_pending_external_proposals(case: TestCase) {
889            run_test_with_client_ids(
890                case.clone(),
891                ["alice", "bob", "charlie"],
892                move |[mut alice_central, bob_central, charlie_central]| {
893                    Box::pin(async move {
894                        let id = conversation_id();
895                        alice_central
896                            .context
897                            .new_conversation(&id, case.credential_type, case.cfg.clone())
898                            .await
899                            .unwrap();
900                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
901
902                        // DS will create an external proposal to add Charlie
903                        // But meanwhile Bob, before receiving the external proposal,
904                        // will create a commit and send it to Alice.
905                        // Alice will not renew the external proposal
906                        let ext_proposal = charlie_central
907                            .context
908                            .new_external_add_proposal(
909                                id.clone(),
910                                alice_central.get_conversation_unchecked(&id).await.group.epoch(),
911                                case.ciphersuite(),
912                                case.credential_type,
913                            )
914                            .await
915                            .unwrap();
916                        assert!(alice_central.pending_proposals(&id).await.is_empty());
917                        alice_central
918                            .context
919                            .conversation(&id)
920                            .await
921                            .unwrap()
922                            .decrypt_message(ext_proposal.to_bytes().unwrap())
923                            .await
924                            .unwrap();
925                        assert_eq!(alice_central.pending_proposals(&id).await.len(), 1);
926
927                        bob_central
928                            .context
929                            .conversation(&id)
930                            .await
931                            .unwrap()
932                            .update_key_material()
933                            .await
934                            .unwrap();
935                        let commit = bob_central.mls_transport.latest_commit().await;
936                        let alice_renewed_proposals = alice_central
937                            .context
938                            .conversation(&id)
939                            .await
940                            .unwrap()
941                            .decrypt_message(commit.to_bytes().unwrap())
942                            .await
943                            .unwrap()
944                            .proposals;
945                        assert!(alice_renewed_proposals.is_empty());
946                        assert!(alice_central.pending_proposals(&id).await.is_empty());
947                    })
948                },
949            )
950            .await
951        }
952
953        #[apply(all_cred_cipher)]
954        #[wasm_bindgen_test]
955        async fn should_not_return_sender_client_id(case: TestCase) {
956            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
957                Box::pin(async move {
958                    let id = conversation_id();
959                    alice_central
960                        .context
961                        .new_conversation(&id, case.credential_type, case.cfg.clone())
962                        .await
963                        .unwrap();
964                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
965
966                    alice_central
967                        .context
968                        .conversation(&id)
969                        .await
970                        .unwrap()
971                        .update_key_material()
972                        .await
973                        .unwrap();
974                    let commit = alice_central.mls_transport.latest_commit().await;
975
976                    let sender_client_id = bob_central
977                        .context
978                        .conversation(&id)
979                        .await
980                        .unwrap()
981                        .decrypt_message(commit.to_bytes().unwrap())
982                        .await
983                        .unwrap()
984                        .sender_client_id;
985                    assert!(sender_client_id.is_none());
986                })
987            })
988            .await
989        }
990    }
991
992    mod external_proposal {
993        use super::*;
994
995        #[apply(all_cred_cipher)]
996        #[wasm_bindgen_test]
997        async fn can_decrypt_external_proposal(case: TestCase) {
998            run_test_with_client_ids(
999                case.clone(),
1000                ["alice", "bob", "alice2"],
1001                move |[alice_central, bob_central, alice2_central]| {
1002                    Box::pin(async move {
1003                        let id = conversation_id();
1004                        alice_central
1005                            .context
1006                            .new_conversation(&id, case.credential_type, case.cfg.clone())
1007                            .await
1008                            .unwrap();
1009                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1010
1011                        let bob_observer = TestEpochObserver::new();
1012                        bob_central
1013                            .session()
1014                            .await
1015                            .register_epoch_observer(bob_observer.clone())
1016                            .await
1017                            .unwrap();
1018
1019                        let epoch = alice_central.get_conversation_unchecked(&id).await.group.epoch();
1020                        let ext_proposal = alice2_central
1021                            .context
1022                            .new_external_add_proposal(id.clone(), epoch, case.ciphersuite(), case.credential_type)
1023                            .await
1024                            .unwrap();
1025
1026                        let decrypted = alice_central
1027                            .context
1028                            .conversation(&id)
1029                            .await
1030                            .unwrap()
1031                            .decrypt_message(&ext_proposal.to_bytes().unwrap())
1032                            .await
1033                            .unwrap();
1034                        assert!(decrypted.app_msg.is_none());
1035                        assert!(decrypted.delay.is_some());
1036
1037                        let decrypted = bob_central
1038                            .context
1039                            .conversation(&id)
1040                            .await
1041                            .unwrap()
1042                            .decrypt_message(&ext_proposal.to_bytes().unwrap())
1043                            .await
1044                            .unwrap();
1045                        assert!(decrypted.app_msg.is_none());
1046                        assert!(decrypted.delay.is_some());
1047                        assert!(!bob_observer.has_changed().await)
1048                    })
1049                },
1050            )
1051            .await
1052        }
1053    }
1054
1055    mod proposal {
1056        use super::*;
1057
1058        // Ensures decrypting an proposal is durable
1059        #[apply(all_cred_cipher)]
1060        #[wasm_bindgen_test]
1061        async fn can_decrypt_proposal(case: TestCase) {
1062            run_test_with_client_ids(
1063                case.clone(),
1064                ["alice", "bob", "charlie"],
1065                move |[alice_central, bob_central, charlie_central]| {
1066                    Box::pin(async move {
1067                        let id = conversation_id();
1068                        alice_central
1069                            .context
1070                            .new_conversation(&id, case.credential_type, case.cfg.clone())
1071                            .await
1072                            .unwrap();
1073                        alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1074
1075                        let charlie_kp = charlie_central.get_one_key_package(&case).await;
1076                        let proposal = alice_central
1077                            .context
1078                            .new_add_proposal(&id, charlie_kp)
1079                            .await
1080                            .unwrap()
1081                            .proposal;
1082
1083                        let decrypted = bob_central
1084                            .context
1085                            .conversation(&id)
1086                            .await
1087                            .unwrap()
1088                            .decrypt_message(proposal.to_bytes().unwrap())
1089                            .await
1090                            .unwrap();
1091
1092                        assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 2);
1093                        // if 'decrypt_message' is not durable the commit won't contain the add proposal
1094                        bob_central
1095                            .context
1096                            .conversation(&id)
1097                            .await
1098                            .unwrap()
1099                            .commit_pending_proposals()
1100                            .await
1101                            .unwrap();
1102                        assert_eq!(bob_central.get_conversation_unchecked(&id).await.members().len(), 3);
1103
1104                        alice_central.verify_sender_identity(&case, &decrypted).await;
1105                    })
1106                },
1107            )
1108            .await
1109        }
1110
1111        #[apply(all_cred_cipher)]
1112        #[wasm_bindgen_test]
1113        async fn should_not_return_sender_client_id(case: TestCase) {
1114            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1115                Box::pin(async move {
1116                    let id = conversation_id();
1117                    alice_central
1118                        .context
1119                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1120                        .await
1121                        .unwrap();
1122                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1123
1124                    let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
1125
1126                    let sender_client_id = bob_central
1127                        .context
1128                        .conversation(&id)
1129                        .await
1130                        .unwrap()
1131                        .decrypt_message(proposal.to_bytes().unwrap())
1132                        .await
1133                        .unwrap()
1134                        .sender_client_id;
1135                    assert!(sender_client_id.is_none());
1136                })
1137            })
1138            .await
1139        }
1140    }
1141
1142    mod app_message {
1143        use super::*;
1144
1145        #[apply(all_cred_cipher)]
1146        #[wasm_bindgen_test]
1147        async fn can_decrypt_app_message(case: TestCase) {
1148            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1149                Box::pin(async move {
1150                    let id = conversation_id();
1151                    alice_central
1152                        .context
1153                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1154                        .await
1155                        .unwrap();
1156                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1157
1158                    let alice_observer = TestEpochObserver::new();
1159                    alice_central
1160                        .session()
1161                        .await
1162                        .register_epoch_observer(alice_observer.clone())
1163                        .await
1164                        .unwrap();
1165                    let bob_observer = TestEpochObserver::new();
1166                    bob_central
1167                        .session()
1168                        .await
1169                        .register_epoch_observer(bob_observer.clone())
1170                        .await
1171                        .unwrap();
1172
1173                    let msg = b"Hello bob";
1174                    let encrypted = alice_central
1175                        .context
1176                        .conversation(&id)
1177                        .await
1178                        .unwrap()
1179                        .encrypt_message(msg)
1180                        .await
1181                        .unwrap();
1182                    assert_ne!(&msg[..], &encrypted[..]);
1183                    let decrypted = bob_central
1184                        .context
1185                        .conversation(&id)
1186                        .await
1187                        .unwrap()
1188                        .decrypt_message(encrypted)
1189                        .await
1190                        .unwrap();
1191                    let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1192                    assert_eq!(dec_msg, &msg[..]);
1193                    assert!(!bob_observer.has_changed().await);
1194                    alice_central.verify_sender_identity(&case, &decrypted).await;
1195
1196                    let msg = b"Hello alice";
1197                    let encrypted = bob_central
1198                        .context
1199                        .conversation(&id)
1200                        .await
1201                        .unwrap()
1202                        .encrypt_message(msg)
1203                        .await
1204                        .unwrap();
1205                    assert_ne!(&msg[..], &encrypted[..]);
1206                    let decrypted = alice_central
1207                        .context
1208                        .conversation(&id)
1209                        .await
1210                        .unwrap()
1211                        .decrypt_message(encrypted)
1212                        .await
1213                        .unwrap();
1214                    let dec_msg = decrypted.app_msg.as_ref().unwrap().as_slice();
1215                    assert_eq!(dec_msg, &msg[..]);
1216                    assert!(!alice_observer.has_changed().await);
1217                    bob_central.verify_sender_identity(&case, &decrypted).await;
1218                })
1219            })
1220            .await
1221        }
1222
1223        #[apply(all_cred_cipher)]
1224        #[wasm_bindgen_test]
1225        async fn cannot_decrypt_app_message_after_rejoining(case: TestCase) {
1226            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1227                Box::pin(async move {
1228                    let id = conversation_id();
1229                    alice_central
1230                        .context
1231                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1232                        .await
1233                        .unwrap();
1234                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1235
1236                    // encrypt a message in epoch 1
1237                    let msg = b"Hello bob";
1238                    let encrypted = alice_central
1239                        .context
1240                        .conversation(&id)
1241                        .await
1242                        .unwrap()
1243                        .encrypt_message(msg)
1244                        .await
1245                        .unwrap();
1246
1247                    // Now Bob will rejoin the group and try to decrypt Alice's message
1248                    // in epoch 2 which should fail
1249                    let gi = alice_central.get_group_info(&id).await;
1250                    bob_central
1251                        .context
1252                        .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
1253                        .await
1254                        .unwrap();
1255
1256                    // fails because of Forward Secrecy
1257                    let decrypt = bob_central
1258                        .context
1259                        .conversation(&id)
1260                        .await
1261                        .unwrap()
1262                        .decrypt_message(&encrypted)
1263                        .await;
1264                    assert!(matches!(decrypt.unwrap_err(), Error::DecryptionError));
1265                })
1266            })
1267            .await
1268        }
1269
1270        #[apply(all_cred_cipher)]
1271        #[wasm_bindgen_test]
1272        async fn cannot_decrypt_app_message_from_future_epoch(case: TestCase) {
1273            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1274                Box::pin(async move {
1275                    let id = conversation_id();
1276                    alice_central
1277                        .context
1278                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1279                        .await
1280                        .unwrap();
1281                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1282
1283                    // only Alice will change epoch without notifying Bob
1284                    alice_central
1285                        .context
1286                        .conversation(&id)
1287                        .await
1288                        .unwrap()
1289                        .update_key_material()
1290                        .await
1291                        .unwrap();
1292                    let commit = alice_central.mls_transport.latest_commit().await;
1293
1294                    // Now in epoch 2 Alice will encrypt a message
1295                    let msg = b"Hello bob";
1296                    let encrypted = alice_central
1297                        .context
1298                        .conversation(&id)
1299                        .await
1300                        .unwrap()
1301                        .encrypt_message(msg)
1302                        .await
1303                        .unwrap();
1304
1305                    // which Bob cannot decrypt because of Post CompromiseSecurity
1306                    let decrypt = bob_central
1307                        .context
1308                        .conversation(&id)
1309                        .await
1310                        .unwrap()
1311                        .decrypt_message(&encrypted)
1312                        .await;
1313                    assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));
1314
1315                    let decrypted_commit = bob_central
1316                        .context
1317                        .conversation(&id)
1318                        .await
1319                        .unwrap()
1320                        .decrypt_message(commit.to_bytes().unwrap())
1321                        .await
1322                        .unwrap();
1323                    let buffered_msg = decrypted_commit.buffered_messages.unwrap();
1324                    let decrypted_msg = buffered_msg.first().unwrap().app_msg.clone().unwrap();
1325                    assert_eq!(&decrypted_msg, msg);
1326                })
1327            })
1328            .await
1329        }
1330
1331        #[apply(all_cred_cipher)]
1332        #[wasm_bindgen_test]
1333        async fn can_decrypt_app_message_in_any_order(mut case: TestCase) {
1334            // otherwise the test would fail because we decrypt messages in reverse order which is
1335            // kinda dropping them
1336            case.cfg.custom.maximum_forward_distance = 0;
1337            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1338                Box::pin(async move {
1339                    let id = conversation_id();
1340                    alice_central
1341                        .context
1342                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1343                        .await
1344                        .unwrap();
1345                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1346
1347                    let out_of_order_tolerance = case.custom_cfg().out_of_order_tolerance;
1348                    let nb_messages = out_of_order_tolerance * 2;
1349                    let mut messages = vec![];
1350
1351                    // stack up encrypted messages..
1352                    for i in 0..nb_messages {
1353                        let msg = format!("Hello {i}");
1354                        let encrypted = alice_central
1355                            .context
1356                            .conversation(&id)
1357                            .await
1358                            .unwrap()
1359                            .encrypt_message(&msg)
1360                            .await
1361                            .unwrap();
1362                        messages.push((msg, encrypted));
1363                    }
1364
1365                    // ..then unstack them to see out_of_order_tolerance come into play
1366                    messages.reverse();
1367                    for (i, (original, encrypted)) in messages.iter().enumerate() {
1368                        let decrypt = bob_central
1369                            .context
1370                            .conversation(&id)
1371                            .await
1372                            .unwrap()
1373                            .decrypt_message(encrypted)
1374                            .await;
1375                        if i > out_of_order_tolerance as usize {
1376                            let decrypted = decrypt.unwrap().app_msg.unwrap();
1377                            assert_eq!(decrypted, original.as_bytes());
1378                        } else {
1379                            assert!(matches!(decrypt.unwrap_err(), Error::DuplicateMessage))
1380                        }
1381                    }
1382                })
1383            })
1384            .await
1385        }
1386
1387        #[apply(all_cred_cipher)]
1388        #[wasm_bindgen_test]
1389        async fn returns_sender_client_id(case: TestCase) {
1390            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1391                Box::pin(async move {
1392                    let id = conversation_id();
1393                    alice_central
1394                        .context
1395                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1396                        .await
1397                        .unwrap();
1398                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1399
1400                    let msg = b"Hello bob";
1401                    let encrypted = alice_central
1402                        .context
1403                        .conversation(&id)
1404                        .await
1405                        .unwrap()
1406                        .encrypt_message(msg)
1407                        .await
1408                        .unwrap();
1409                    assert_ne!(&msg[..], &encrypted[..]);
1410
1411                    let sender_client_id = bob_central
1412                        .context
1413                        .conversation(&id)
1414                        .await
1415                        .unwrap()
1416                        .decrypt_message(encrypted)
1417                        .await
1418                        .unwrap()
1419                        .sender_client_id
1420                        .unwrap();
1421                    assert_eq!(sender_client_id, alice_central.get_client_id().await);
1422                })
1423            })
1424            .await
1425        }
1426    }
1427
1428    mod epoch_sync {
1429        use super::*;
1430
1431        #[apply(all_cred_cipher)]
1432        #[wasm_bindgen_test]
1433        async fn should_throw_specialized_error_when_epoch_too_old(mut case: TestCase) {
1434            case.cfg.custom.out_of_order_tolerance = 0;
1435            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1436                Box::pin(async move {
1437                    let id = conversation_id();
1438                    alice_central
1439                        .context
1440                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1441                        .await
1442                        .unwrap();
1443                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1444
1445                    // Alice encrypts a message to Bob
1446                    let bob_message1 = alice_central
1447                        .context
1448                        .conversation(&id)
1449                        .await
1450                        .unwrap()
1451                        .encrypt_message(b"Hello Bob")
1452                        .await
1453                        .unwrap();
1454                    let bob_message2 = alice_central
1455                        .context
1456                        .conversation(&id)
1457                        .await
1458                        .unwrap()
1459                        .encrypt_message(b"Hello again Bob")
1460                        .await
1461                        .unwrap();
1462
1463                    // Move group's epoch forward by self updating
1464                    for _ in 0..MAX_PAST_EPOCHS {
1465                        alice_central
1466                            .context
1467                            .conversation(&id)
1468                            .await
1469                            .unwrap()
1470                            .update_key_material()
1471                            .await
1472                            .unwrap();
1473                        let commit = alice_central.mls_transport.latest_commit().await;
1474                        bob_central
1475                            .context
1476                            .conversation(&id)
1477                            .await
1478                            .unwrap()
1479                            .decrypt_message(commit.to_bytes().unwrap())
1480                            .await
1481                            .unwrap();
1482                    }
1483                    // Decrypt should work
1484                    let decrypt = bob_central
1485                        .context
1486                        .conversation(&id)
1487                        .await
1488                        .unwrap()
1489                        .decrypt_message(&bob_message1)
1490                        .await
1491                        .unwrap();
1492                    assert_eq!(decrypt.app_msg.unwrap(), b"Hello Bob");
1493
1494                    // Moving the epochs once more should cause an error
1495                    alice_central
1496                        .context
1497                        .conversation(&id)
1498                        .await
1499                        .unwrap()
1500                        .update_key_material()
1501                        .await
1502                        .unwrap();
1503                    let commit = alice_central.mls_transport.latest_commit().await;
1504                    bob_central
1505                        .context
1506                        .conversation(&id)
1507                        .await
1508                        .unwrap()
1509                        .decrypt_message(commit.to_bytes().unwrap())
1510                        .await
1511                        .unwrap();
1512
1513                    let decrypt = bob_central
1514                        .context
1515                        .conversation(&id)
1516                        .await
1517                        .unwrap()
1518                        .decrypt_message(&bob_message2)
1519                        .await;
1520                    assert!(matches!(decrypt.unwrap_err(), Error::MessageEpochTooOld));
1521                })
1522            })
1523            .await
1524        }
1525
1526        #[apply(all_cred_cipher)]
1527        #[wasm_bindgen_test]
1528        async fn should_throw_specialized_error_when_epoch_desynchronized(mut case: TestCase) {
1529            case.cfg.custom.out_of_order_tolerance = 0;
1530            run_test_with_client_ids(case.clone(), ["alice", "bob"], move |[alice_central, bob_central]| {
1531                Box::pin(async move {
1532                    let id = conversation_id();
1533                    alice_central
1534                        .context
1535                        .new_conversation(&id, case.credential_type, case.cfg.clone())
1536                        .await
1537                        .unwrap();
1538                    alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
1539
1540                    // Alice generates a bunch of soon to be outdated messages
1541                    let old_proposal = alice_central
1542                        .context
1543                        .new_update_proposal(&id)
1544                        .await
1545                        .unwrap()
1546                        .proposal
1547                        .to_bytes()
1548                        .unwrap();
1549                    alice_central
1550                        .get_conversation_unchecked(&id)
1551                        .await
1552                        .group
1553                        .clear_pending_proposals();
1554                    let old_commit = alice_central
1555                        .create_unmerged_commit(&id)
1556                        .await
1557                        .commit
1558                        .to_bytes()
1559                        .unwrap();
1560                    alice_central
1561                        .context
1562                        .conversation(&id)
1563                        .await
1564                        .unwrap()
1565                        .clear_pending_commit()
1566                        .await
1567                        .unwrap();
1568
1569                    // Now let's jump to next epoch
1570                    alice_central
1571                        .context
1572                        .conversation(&id)
1573                        .await
1574                        .unwrap()
1575                        .update_key_material()
1576                        .await
1577                        .unwrap();
1578                    let commit = alice_central.mls_transport.latest_commit().await;
1579                    bob_central
1580                        .context
1581                        .conversation(&id)
1582                        .await
1583                        .unwrap()
1584                        .decrypt_message(commit.to_bytes().unwrap())
1585                        .await
1586                        .unwrap();
1587
1588                    // trying to consume outdated messages should fail with a dedicated error
1589                    let decrypt_err = bob_central
1590                        .context
1591                        .conversation(&id)
1592                        .await
1593                        .unwrap()
1594                        .decrypt_message(&old_proposal)
1595                        .await
1596                        .unwrap_err();
1597
1598                    assert!(matches!(decrypt_err, Error::StaleProposal));
1599
1600                    let decrypt_err = bob_central
1601                        .context
1602                        .conversation(&id)
1603                        .await
1604                        .unwrap()
1605                        .decrypt_message(&old_commit)
1606                        .await
1607                        .unwrap_err();
1608
1609                    assert!(matches!(decrypt_err, Error::StaleCommit));
1610                })
1611            })
1612            .await
1613        }
1614    }
1615}