core_crypto/mls/
buffer_external_commit.rs1use crate::prelude::{ConversationId, CryptoError, CryptoResult, MlsConversationDecryptMessage};
9use core_crypto_keystore::{
10 connection::FetchFromDatabase,
11 entities::{MlsPendingMessage, PersistedMlsPendingGroup},
12};
13
14use crate::context::CentralContext;
15
16impl CentralContext {
17 pub(crate) async fn handle_when_group_is_pending(
18 &self,
19 id: &ConversationId,
20 message: impl AsRef<[u8]>,
21 ) -> CryptoResult<MlsConversationDecryptMessage> {
22 let keystore = self.keystore().await?;
23 let Some(pending_group) = keystore.find::<PersistedMlsPendingGroup>(id).await? else {
24 return Err(CryptoError::ConversationNotFound(id.clone()));
25 };
26
27 let pending_msg = MlsPendingMessage {
28 foreign_id: pending_group.id.clone(),
29 message: message.as_ref().to_vec(),
30 };
31 keystore.save::<MlsPendingMessage>(pending_msg).await?;
32 Err(CryptoError::UnmergedPendingGroup)
33 }
34}
35
36#[cfg(test)]
37mod tests {
38 use crate::{test_utils::*, CryptoError};
39 use wasm_bindgen_test::*;
40
41 wasm_bindgen_test_configure!(run_in_browser);
42
43 #[apply(all_cred_cipher)]
44 #[wasm_bindgen_test]
45 async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestCase) {
46 run_test_with_client_ids(
47 case.clone(),
48 ["alice", "bob", "charlie", "debbie"],
49 move |[alice_central, bob_central, charlie_central, debbie_central]| {
50 Box::pin(async move {
51 let id = conversation_id();
52 alice_central
53 .context
54 .new_conversation(&id, case.credential_type, case.cfg.clone())
55 .await
56 .unwrap();
57 let gi = alice_central.get_group_info(&id).await;
59 let external_commit = bob_central
60 .context
61 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
62 .await
63 .unwrap();
64
65 alice_central
67 .context
68 .decrypt_message(&id, external_commit.commit.to_bytes().unwrap())
69 .await
70 .unwrap();
71
72 let epoch = alice_central.context.conversation_epoch(&id).await.unwrap();
74 let external_proposal = debbie_central
75 .context
76 .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
77 .await
78 .unwrap();
79
80 let app_msg = alice_central
82 .context
83 .encrypt_message(&id, b"Hello Bob !")
84 .await
85 .unwrap();
86 let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
87 alice_central
88 .context
89 .decrypt_message(&id, external_proposal.to_bytes().unwrap())
90 .await
91 .unwrap();
92 let charlie = charlie_central.rand_key_package(&case).await;
93 let commit = alice_central
94 .context
95 .add_members_to_conversation(&id, vec![charlie])
96 .await
97 .unwrap();
98 alice_central.context.commit_accepted(&id).await.unwrap();
99 charlie_central
100 .context
101 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
102 .await
103 .unwrap();
104 debbie_central
105 .context
106 .process_welcome_message(commit.welcome.clone().into(), case.custom_cfg())
107 .await
108 .unwrap();
109
110 let messages = vec![commit.commit, external_proposal, proposal]
114 .into_iter()
115 .map(|m| m.to_bytes().unwrap());
116 for m in messages {
117 let decrypt = bob_central.context.decrypt_message(&id, m).await;
118 assert!(matches!(decrypt.unwrap_err(), CryptoError::UnmergedPendingGroup));
119 }
120 let decrypt = bob_central.context.decrypt_message(&id, app_msg).await;
121 assert!(matches!(decrypt.unwrap_err(), CryptoError::UnmergedPendingGroup));
122
123 assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
125
126 let Some(restored_messages) = bob_central
128 .context
129 .merge_pending_group_from_external_commit(&id)
130 .await
131 .unwrap()
132 else {
133 panic!("Alice's messages should have been restored at this point");
134 };
135 for (i, m) in restored_messages.into_iter().enumerate() {
136 match i {
137 0 => {
138 assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !");
140 assert!(!m.has_epoch_changed);
141 }
142 1 | 2 => {
143 assert!(m.app_msg.is_none());
145 assert!(!m.has_epoch_changed);
146 }
147 3 => {
148 assert!(m.app_msg.is_none());
150 assert!(m.has_epoch_changed);
151 }
152 _ => unreachable!(),
153 }
154 }
155 assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
157 assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
159 assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
161
162 assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
164 })
165 },
166 )
167 .await
168 }
169
170 #[apply(all_cred_cipher)]
171 #[wasm_bindgen_test]
172 async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestCase) {
173 run_test_with_client_ids(
174 case.clone(),
175 ["alice", "bob"],
176 move |[alice_central, mut bob_central]| {
177 Box::pin(async move {
178 let id = conversation_id();
179 alice_central
180 .context
181 .new_conversation(&id, case.credential_type, case.cfg.clone())
182 .await
183 .unwrap();
184 alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
185
186 bob_central.context.update_keying_material(&id).await.unwrap();
188 bob_central.context.commit_accepted(&id).await.unwrap();
189
190 let msg1 = bob_central.context.encrypt_message(&id, "A").await.unwrap();
191 let msg2 = bob_central.context.encrypt_message(&id, "B").await.unwrap();
192
193 let decrypt = alice_central.context.decrypt_message(&id, msg1).await;
195 assert!(matches!(
196 decrypt.unwrap_err(),
197 CryptoError::BufferedFutureMessage { .. }
198 ));
199 let decrypt = alice_central.context.decrypt_message(&id, msg2).await;
200 assert!(matches!(
201 decrypt.unwrap_err(),
202 CryptoError::BufferedFutureMessage { .. }
203 ));
204 assert_eq!(alice_central.context.count_entities().await.pending_messages, 2);
205
206 let gi = bob_central.get_group_info(&id).await;
207 let ext_commit = alice_central
208 .context
209 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
210 .await
211 .unwrap();
212 alice_central
213 .context
214 .merge_pending_group_from_external_commit(&id)
215 .await
216 .unwrap();
217
218 bob_central
219 .context
220 .decrypt_message(&id, ext_commit.commit.to_bytes().unwrap())
221 .await
222 .unwrap();
223 assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
225 })
226 },
227 )
228 .await
229 }
230}