core_crypto/mls/conversation/
pending_conversation.rs1use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11 ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12 MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25#[derive(Debug)]
28pub struct PendingConversation {
29 inner: PersistedMlsPendingGroup,
30 context: TransactionContext,
31}
32
33impl PendingConversation {
34 pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35 Self { inner, context }
36 }
37
38 pub(crate) fn from_mls_group(
39 group: MlsGroup,
40 custom_cfg: MlsCustomConfiguration,
41 context: TransactionContext,
42 ) -> Result<Self> {
43 let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44 let serialized_group =
45 core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46 let group_id = group.group_id().to_vec();
47
48 let inner = PersistedMlsPendingGroup {
49 id: group_id,
50 state: serialized_group,
51 custom_configuration: serialized_cfg,
52 parent_id: None,
53 };
54 Ok(Self::new(inner, context))
55 }
56
57 async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58 self.context
59 .mls_provider()
60 .await
61 .map_err(RecursiveError::transaction("getting mls provider"))
62 .map_err(Into::into)
63 }
64
65 async fn keystore(&self) -> Result<CryptoKeystore> {
66 let backend = self.mls_provider().await?;
67 Ok(backend.keystore())
68 }
69
70 fn id(&self) -> &ConversationId {
71 &self.inner.id
72 }
73
74 pub(crate) async fn save(&self) -> Result<()> {
75 let keystore = self.keystore().await?;
76 keystore
77 .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78 .await
79 .map_err(KeystoreError::wrap("saving mls pending groups"))
80 .map_err(Into::into)
81 }
82
83 pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85 let transport = self
86 .context
87 .mls_transport()
88 .await
89 .map_err(RecursiveError::transaction("getting mls transport"))?;
90 let transport = transport.as_ref().ok_or::<Error>(
91 RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92 )?;
93
94 match transport
95 .send_commit_bundle(commit.clone())
96 .await
97 .map_err(RecursiveError::root("sending commit bundle"))?
98 {
99 MlsTransportResponse::Success => Ok(()),
100 MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101 MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102 }
103 }
104
105 pub async fn try_process_own_join_commit(
110 &mut self,
111 message: impl AsRef<[u8]>,
112 ) -> Result<MlsConversationDecryptMessage> {
113 if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115 return self.merge_and_restore_messages().await;
116 }
117
118 let keystore = self.keystore().await?;
119
120 let pending_msg = MlsPendingMessage {
121 foreign_id: self.id().clone(),
122 message: message.as_ref().to_vec(),
123 };
124 keystore
125 .save::<MlsPendingMessage>(pending_msg)
126 .await
127 .map_err(KeystoreError::wrap("saving mls pending message"))?;
128 Err(Error::BufferedForPendingConversation)
129 }
130
131 async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134 let backend = self.mls_provider().await?;
135 let keystore = backend.keystore();
136 let (group, _cfg) = keystore
138 .mls_pending_groups_load(self.id())
139 .await
140 .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141 let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142 .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144 mls_group
147 .merge_pending_commit(&backend)
148 .await
149 .map_err(MlsError::wrap("merging pending commit"))?;
150 let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151 .map_err(MlsError::wrap("deserializing mls message"))?;
152 let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153 return Ok(false);
154 };
155 let Some(msg_ct) = public_message.confirmation_tag() else {
156 return Ok(false);
157 };
158 let group_ct = mls_group
159 .compute_confirmation_tag(&backend)
160 .map_err(MlsError::wrap("computing confirmation tag"))?;
161 Ok(*msg_ct == group_ct)
162 }
163
164 async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166 let buffered_messages = self.merge().await?;
167 let context = &self.context;
168 let backend = self.mls_provider().await?;
169 let id = self.id();
170
171 let conversation = context
173 .conversation(id)
174 .await
175 .map_err(RecursiveError::transaction("getting conversation by id"))?;
176 let conversation = conversation.conversation().await;
177 let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179 let own_leaf_credential_with_key = CredentialWithKey {
181 credential: own_leaf.credential().clone(),
182 signature_key: own_leaf.signature_key().clone(),
183 };
184 let identity = own_leaf_credential_with_key
185 .extract_identity(conversation.ciphersuite(), None)
186 .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188 let crl_new_distribution_points = get_new_crl_distribution_points(
189 &backend,
190 extract_crl_uris_from_group(&conversation.group)
191 .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192 )
193 .await
194 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196 #[expect(deprecated)]
202 Ok(MlsConversationDecryptMessage {
203 app_msg: None,
204 proposals: vec![],
205 is_active: conversation.group.is_active(),
206 delay: conversation.compute_next_commit_delay(),
207 sender_client_id: None,
208 has_epoch_changed: true,
209 identity,
210 buffered_messages,
211 crl_new_distribution_points,
212 })
213 }
214
215 pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222 let mls_provider = self.mls_provider().await?;
223 let id = self.id();
224 let group = self.inner.state.clone();
225 let cfg = self.inner.custom_configuration.clone();
226
227 let mut mls_group =
228 core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230 mls_group
232 .merge_pending_commit(&mls_provider)
233 .await
234 .map_err(MlsError::wrap("merging pending commit"))?;
235
236 let custom_cfg =
238 serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239 let configuration = MlsConversationConfiguration {
240 ciphersuite: mls_group.ciphersuite().into(),
241 custom: custom_cfg,
242 ..Default::default()
243 };
244
245 let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248 trace!("External commit trying to rejoin group");
252 MessageRestorePolicy::ClearOnly
253 } else {
254 MessageRestorePolicy::DecryptAndClear
255 };
256
257 let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259 .await
260 .map_err(RecursiveError::mls_conversation(
261 "constructing conversation from mls group",
262 ))?;
263
264 let context = &self.context;
265
266 context
267 .mls_groups()
268 .await
269 .map_err(RecursiveError::transaction("getting mls groups"))?
270 .insert(id.clone(), conversation);
271
272 let mut conversation = context
274 .conversation(id)
275 .await
276 .map_err(RecursiveError::transaction("getting conversation by id"))?;
277 let pending_messages = conversation
278 .restore_pending_messages(restore_policy)
279 .await
280 .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282 if pending_messages.is_some() {
283 mls_provider
284 .key_store()
285 .remove::<MlsPendingMessage, _>(id)
286 .await
287 .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288 }
289
290 self.clear().await?;
292
293 Ok(pending_messages)
294 }
295
296 pub(crate) async fn clear(&mut self) -> Result<()> {
303 self.keystore()
304 .await?
305 .mls_pending_groups_delete(self.id())
306 .await
307 .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308 Ok(())
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::mls::conversation::Conversation as _;
316 use crate::prelude::MlsConversationDecryptMessage;
317 use crate::test_utils::*;
318 use wasm_bindgen_test::*;
319
320 wasm_bindgen_test_configure!(run_in_browser);
321
322 #[apply(all_cred_cipher)]
323 #[wasm_bindgen_test]
324 async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
325 run_test_with_client_ids(
326 case.clone(),
327 ["alice", "bob", "charlie", "debbie"],
328 move |[alice_central, bob_central, charlie_central, debbie_central]| {
329 Box::pin(async move {
330 let id = conversation_id();
331 alice_central
332 .transaction
333 .new_conversation(&id, case.credential_type, case.cfg.clone())
334 .await
335 .unwrap();
336 let gi = alice_central.get_group_info(&id).await;
338 let (external_commit, _) = bob_central
339 .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
340 .await;
341
342 alice_central
344 .transaction
345 .conversation(&id)
346 .await
347 .unwrap()
348 .decrypt_message(external_commit.commit.to_bytes().unwrap())
349 .await
350 .unwrap();
351
352 let epoch = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
354 let external_proposal = debbie_central
355 .transaction
356 .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
357 .await
358 .unwrap();
359
360 let app_msg = alice_central
362 .transaction
363 .conversation(&id)
364 .await
365 .unwrap()
366 .encrypt_message(b"Hello Bob !")
367 .await
368 .unwrap();
369 let proposal = alice_central
370 .transaction
371 .new_update_proposal(&id)
372 .await
373 .unwrap()
374 .proposal;
375 alice_central
376 .transaction
377 .conversation(&id)
378 .await
379 .unwrap()
380 .decrypt_message(external_proposal.to_bytes().unwrap())
381 .await
382 .unwrap();
383 let charlie = charlie_central.rand_key_package(&case).await;
384 alice_central
385 .transaction
386 .conversation(&id)
387 .await
388 .unwrap()
389 .add_members(vec![charlie])
390 .await
391 .unwrap();
392 let commit = alice_central.mls_transport.latest_commit_bundle().await;
393 charlie_central
394 .transaction
395 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
396 .await
397 .unwrap();
398 debbie_central
399 .transaction
400 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
401 .await
402 .unwrap();
403
404 let messages = vec![commit.commit, external_proposal, proposal]
408 .into_iter()
409 .map(|m| m.to_bytes().unwrap());
410 let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
411 bob_central.transaction.conversation(&id).await
412 else {
413 panic!("Bob should not have the conversation yet")
414 };
415 for m in messages {
416 let decrypt = pending_conversation.try_process_own_join_commit(m).await;
417 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
418 }
419 let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
420 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
421
422 assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 4);
424
425 let observer = TestEpochObserver::new();
426 bob_central
427 .session()
428 .await
429 .register_epoch_observer(observer.clone())
430 .await
431 .unwrap();
432
433 let MlsConversationDecryptMessage {
435 buffered_messages: Some(restored_messages),
436 ..
437 } = pending_conversation
438 .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
439 .await
440 .unwrap()
441 else {
442 panic!("Alice's messages should have been restored at this point");
443 };
444
445 let observed_epochs = observer.observed_epochs().await;
446 assert_eq!(
447 observed_epochs.len(),
448 1,
449 "we should see exactly 1 epoch change in these 4 messages"
450 );
451 assert_eq!(observed_epochs[0].0, id, "conversation id must match");
452
453 for (idx, msg) in restored_messages.iter().enumerate() {
454 if idx == 0 {
455 assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
457 } else {
458 assert!(msg.app_msg.is_none());
459 }
460 }
461
462 assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
464 assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
466 assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
468
469 assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 0);
471 })
472 },
473 )
474 .await
475 }
476
477 #[apply(all_cred_cipher)]
478 #[wasm_bindgen_test]
479 async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestContext) {
480 use crate::mls;
481
482 run_test_with_client_ids(
483 case.clone(),
484 ["alice", "bob"],
485 move |[alice_central, mut bob_central]| {
486 Box::pin(async move {
487 let id = conversation_id();
488 alice_central
489 .transaction
490 .new_conversation(&id, case.credential_type, case.cfg.clone())
491 .await
492 .unwrap();
493 alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
494
495 bob_central
497 .transaction
498 .conversation(&id)
499 .await
500 .unwrap()
501 .update_key_material()
502 .await
503 .unwrap();
504
505 let msg1 = bob_central
506 .transaction
507 .conversation(&id)
508 .await
509 .unwrap()
510 .encrypt_message("A")
511 .await
512 .unwrap();
513 let msg2 = bob_central
514 .transaction
515 .conversation(&id)
516 .await
517 .unwrap()
518 .encrypt_message("B")
519 .await
520 .unwrap();
521
522 let decrypt = alice_central
524 .transaction
525 .conversation(&id)
526 .await
527 .unwrap()
528 .decrypt_message(msg1)
529 .await;
530 assert!(matches!(
531 decrypt.unwrap_err(),
532 mls::conversation::Error::BufferedFutureMessage { .. }
533 ));
534 let decrypt = alice_central
535 .transaction
536 .conversation(&id)
537 .await
538 .unwrap()
539 .decrypt_message(msg2)
540 .await;
541 assert!(matches!(
542 decrypt.unwrap_err(),
543 mls::conversation::Error::BufferedFutureMessage { .. }
544 ));
545 assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 2);
546
547 let gi = bob_central.get_group_info(&id).await;
548 alice_central
549 .transaction
550 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
551 .await
552 .unwrap();
553
554 let ext_commit = alice_central.mls_transport.latest_commit_bundle().await;
555
556 bob_central
557 .transaction
558 .conversation(&id)
559 .await
560 .unwrap()
561 .decrypt_message(ext_commit.commit.to_bytes().unwrap())
562 .await
563 .unwrap();
564 assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 0);
566 })
567 },
568 )
569 .await
570 }
571}