core_crypto/mls/conversation/
pending_conversation.rs1use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::context::CentralContext;
8use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
9use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
10use crate::mls::credential::ext::CredentialExt as _;
11use crate::prelude::{
12 ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
13 MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
14};
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25#[derive(Debug)]
28pub struct PendingConversation {
29 inner: PersistedMlsPendingGroup,
30 context: CentralContext,
31}
32
33impl PendingConversation {
34 pub(crate) fn new(inner: PersistedMlsPendingGroup, context: CentralContext) -> Self {
35 Self { inner, context }
36 }
37
38 pub(crate) fn from_mls_group(
39 group: MlsGroup,
40 custom_cfg: MlsCustomConfiguration,
41 context: CentralContext,
42 ) -> Result<Self> {
43 let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44 let serialized_group =
45 core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46 let group_id = group.group_id().to_vec();
47
48 let inner = PersistedMlsPendingGroup {
49 id: group_id,
50 state: serialized_group,
51 custom_configuration: serialized_cfg,
52 parent_id: None,
53 };
54 Ok(Self::new(inner, context))
55 }
56
57 async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58 self.context
59 .mls_provider()
60 .await
61 .map_err(RecursiveError::root("getting mls provider"))
62 .map_err(Into::into)
63 }
64
65 async fn keystore(&self) -> Result<CryptoKeystore> {
66 let backend = self.mls_provider().await?;
67 Ok(backend.keystore())
68 }
69
70 fn id(&self) -> &ConversationId {
71 &self.inner.id
72 }
73
74 pub(crate) async fn save(&self) -> Result<()> {
75 let keystore = self.keystore().await?;
76 keystore
77 .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78 .await
79 .map_err(KeystoreError::wrap("saving mls pending groups"))
80 .map_err(Into::into)
81 }
82
83 pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85 let transport = self
86 .context
87 .mls_transport()
88 .await
89 .map_err(RecursiveError::root("getting mls transport"))?;
90 let transport = transport.as_ref().ok_or::<Error>(
91 RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92 )?;
93
94 match transport
95 .send_commit_bundle(commit.clone())
96 .await
97 .map_err(RecursiveError::root("sending commit bundle"))?
98 {
99 MlsTransportResponse::Success => Ok(()),
100 MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101 MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102 }
103 }
104
105 pub async fn try_process_own_join_commit(
110 &mut self,
111 message: impl AsRef<[u8]>,
112 ) -> Result<MlsConversationDecryptMessage> {
113 if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115 return self.merge_and_restore_messages().await;
116 }
117
118 let keystore = self.keystore().await?;
119
120 let pending_msg = MlsPendingMessage {
121 foreign_id: self.id().clone(),
122 message: message.as_ref().to_vec(),
123 };
124 keystore
125 .save::<MlsPendingMessage>(pending_msg)
126 .await
127 .map_err(KeystoreError::wrap("saving mls pending message"))?;
128 Err(Error::BufferedForPendingConversation)
129 }
130
131 async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134 let backend = self.mls_provider().await?;
135 let keystore = backend.keystore();
136 let (group, _cfg) = keystore
138 .mls_pending_groups_load(self.id())
139 .await
140 .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141 let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142 .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144 mls_group
147 .merge_pending_commit(&backend)
148 .await
149 .map_err(MlsError::wrap("merging pending commit"))?;
150 let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151 .map_err(MlsError::wrap("deserializing mls message"))?;
152 let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153 return Ok(false);
154 };
155 let Some(msg_ct) = public_message.confirmation_tag() else {
156 return Ok(false);
157 };
158 let group_ct = mls_group
159 .compute_confirmation_tag(&backend)
160 .map_err(MlsError::wrap("computing confirmation tag"))?;
161 Ok(*msg_ct == group_ct)
162 }
163
164 async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166 let buffered_messages = self.merge().await?;
167 let context = &self.context;
168 let backend = self.mls_provider().await?;
169 let id = self.id();
170
171 let conversation = context.conversation(id).await?;
173 let conversation = conversation.conversation().await;
174 let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
175
176 let own_leaf_credential_with_key = CredentialWithKey {
178 credential: own_leaf.credential().clone(),
179 signature_key: own_leaf.signature_key().clone(),
180 };
181 let identity = own_leaf_credential_with_key
182 .extract_identity(conversation.ciphersuite(), None)
183 .map_err(RecursiveError::mls_credential("extracting identity"))?;
184
185 let crl_new_distribution_points = get_new_crl_distribution_points(
186 &backend,
187 extract_crl_uris_from_group(&conversation.group)
188 .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
189 )
190 .await
191 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
192
193 #[expect(deprecated)]
199 Ok(MlsConversationDecryptMessage {
200 app_msg: None,
201 proposals: vec![],
202 is_active: conversation.group.is_active(),
203 delay: conversation.compute_next_commit_delay(),
204 sender_client_id: None,
205 has_epoch_changed: true,
206 identity,
207 buffered_messages,
208 crl_new_distribution_points,
209 })
210 }
211
212 pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
219 let mls_provider = self.mls_provider().await?;
220 let id = self.id();
221 let group = self.inner.state.clone();
222 let cfg = self.inner.custom_configuration.clone();
223
224 let mut mls_group =
225 core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
226
227 mls_group
229 .merge_pending_commit(&mls_provider)
230 .await
231 .map_err(MlsError::wrap("merging pending commit"))?;
232
233 let custom_cfg =
235 serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
236 let configuration = MlsConversationConfiguration {
237 ciphersuite: mls_group.ciphersuite().into(),
238 custom: custom_cfg,
239 ..Default::default()
240 };
241
242 let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
245 trace!("External commit trying to rejoin group");
249 MessageRestorePolicy::ClearOnly
250 } else {
251 MessageRestorePolicy::DecryptAndClear
252 };
253
254 let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
256 .await
257 .map_err(RecursiveError::mls_conversation(
258 "constructing conversation from mls group",
259 ))?;
260
261 let context = &self.context;
262
263 context
264 .mls_groups()
265 .await
266 .map_err(RecursiveError::root("getting mls groups"))?
267 .insert(id.clone(), conversation);
268
269 let mut conversation = context.conversation(id).await?;
271 let pending_messages = conversation
272 .restore_pending_messages(restore_policy)
273 .await
274 .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
275
276 if pending_messages.is_some() {
277 mls_provider
278 .key_store()
279 .remove::<MlsPendingMessage, _>(id)
280 .await
281 .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
282 }
283
284 self.clear().await?;
286
287 Ok(pending_messages)
288 }
289
290 pub(crate) async fn clear(&mut self) -> Result<()> {
297 self.keystore()
298 .await?
299 .mls_pending_groups_delete(self.id())
300 .await
301 .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
302 Ok(())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::mls::conversation::Conversation as _;
310 use crate::prelude::MlsConversationDecryptMessage;
311 use crate::test_utils::*;
312 use wasm_bindgen_test::*;
313
314 wasm_bindgen_test_configure!(run_in_browser);
315
316 #[apply(all_cred_cipher)]
317 #[wasm_bindgen_test]
318 async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestCase) {
319 run_test_with_client_ids(
320 case.clone(),
321 ["alice", "bob", "charlie", "debbie"],
322 move |[alice_central, bob_central, charlie_central, debbie_central]| {
323 Box::pin(async move {
324 let id = conversation_id();
325 alice_central
326 .context
327 .new_conversation(&id, case.credential_type, case.cfg.clone())
328 .await
329 .unwrap();
330 let gi = alice_central.get_group_info(&id).await;
332 let (external_commit, _) = bob_central
333 .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
334 .await;
335
336 alice_central
338 .context
339 .conversation(&id)
340 .await
341 .unwrap()
342 .decrypt_message(external_commit.commit.to_bytes().unwrap())
343 .await
344 .unwrap();
345
346 let epoch = alice_central.context.conversation(&id).await.unwrap().epoch().await;
348 let external_proposal = debbie_central
349 .context
350 .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
351 .await
352 .unwrap();
353
354 let app_msg = alice_central
356 .context
357 .conversation(&id)
358 .await
359 .unwrap()
360 .encrypt_message(b"Hello Bob !")
361 .await
362 .unwrap();
363 let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
364 alice_central
365 .context
366 .conversation(&id)
367 .await
368 .unwrap()
369 .decrypt_message(external_proposal.to_bytes().unwrap())
370 .await
371 .unwrap();
372 let charlie = charlie_central.rand_key_package(&case).await;
373 alice_central
374 .context
375 .conversation(&id)
376 .await
377 .unwrap()
378 .add_members(vec![charlie])
379 .await
380 .unwrap();
381 let commit = alice_central.mls_transport.latest_commit_bundle().await;
382 charlie_central
383 .context
384 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
385 .await
386 .unwrap();
387 debbie_central
388 .context
389 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
390 .await
391 .unwrap();
392
393 let messages = vec![commit.commit, external_proposal, proposal]
397 .into_iter()
398 .map(|m| m.to_bytes().unwrap());
399 let Err(Error::PendingConversation(mut pending_conversation)) =
400 bob_central.context.conversation(&id).await
401 else {
402 panic!("Bob should not have the conversation yet")
403 };
404 for m in messages {
405 let decrypt = pending_conversation.try_process_own_join_commit(m).await;
406 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
407 }
408 let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
409 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
410
411 assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
413
414 let observer = TestEpochObserver::new();
415 bob_central
416 .client()
417 .await
418 .register_epoch_observer(observer.clone())
419 .await
420 .unwrap();
421
422 let MlsConversationDecryptMessage {
424 buffered_messages: Some(restored_messages),
425 ..
426 } = pending_conversation
427 .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
428 .await
429 .unwrap()
430 else {
431 panic!("Alice's messages should have been restored at this point");
432 };
433
434 let observed_epochs = observer.observed_epochs().await;
435 assert_eq!(
436 observed_epochs.len(),
437 1,
438 "we should see exactly 1 epoch change in these 4 messages"
439 );
440 assert_eq!(observed_epochs[0].0, id, "conversation id must match");
441
442 for (idx, msg) in restored_messages.iter().enumerate() {
443 if idx == 0 {
444 assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
446 } else {
447 assert!(msg.app_msg.is_none());
448 }
449 }
450
451 assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
453 assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
455 assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
457
458 assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
460 })
461 },
462 )
463 .await
464 }
465
466 #[apply(all_cred_cipher)]
467 #[wasm_bindgen_test]
468 async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestCase) {
469 use crate::mls;
470
471 run_test_with_client_ids(
472 case.clone(),
473 ["alice", "bob"],
474 move |[alice_central, mut bob_central]| {
475 Box::pin(async move {
476 let id = conversation_id();
477 alice_central
478 .context
479 .new_conversation(&id, case.credential_type, case.cfg.clone())
480 .await
481 .unwrap();
482 alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
483
484 bob_central
486 .context
487 .conversation(&id)
488 .await
489 .unwrap()
490 .update_key_material()
491 .await
492 .unwrap();
493
494 let msg1 = bob_central
495 .context
496 .conversation(&id)
497 .await
498 .unwrap()
499 .encrypt_message("A")
500 .await
501 .unwrap();
502 let msg2 = bob_central
503 .context
504 .conversation(&id)
505 .await
506 .unwrap()
507 .encrypt_message("B")
508 .await
509 .unwrap();
510
511 let decrypt = alice_central
513 .context
514 .conversation(&id)
515 .await
516 .unwrap()
517 .decrypt_message(msg1)
518 .await;
519 assert!(matches!(
520 decrypt.unwrap_err(),
521 mls::conversation::Error::BufferedFutureMessage { .. }
522 ));
523 let decrypt = alice_central
524 .context
525 .conversation(&id)
526 .await
527 .unwrap()
528 .decrypt_message(msg2)
529 .await;
530 assert!(matches!(
531 decrypt.unwrap_err(),
532 mls::conversation::Error::BufferedFutureMessage { .. }
533 ));
534 assert_eq!(alice_central.context.count_entities().await.pending_messages, 2);
535
536 let gi = bob_central.get_group_info(&id).await;
537 alice_central
538 .context
539 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
540 .await
541 .unwrap();
542
543 let ext_commit = alice_central.mls_transport.latest_commit_bundle().await;
544
545 bob_central
546 .context
547 .conversation(&id)
548 .await
549 .unwrap()
550 .decrypt_message(ext_commit.commit.to_bytes().unwrap())
551 .await
552 .unwrap();
553 assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
555 })
556 },
557 )
558 .await
559 }
560}