core_crypto/mls/conversation/
pending_conversation.rs1use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11 ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12 MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25#[derive(Debug)]
28pub struct PendingConversation {
29 inner: PersistedMlsPendingGroup,
30 context: TransactionContext,
31}
32
33impl PendingConversation {
34 pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35 Self { inner, context }
36 }
37
38 pub(crate) fn from_mls_group(
39 group: MlsGroup,
40 custom_cfg: MlsCustomConfiguration,
41 context: TransactionContext,
42 ) -> Result<Self> {
43 let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44 let serialized_group =
45 core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46 let group_id = group.group_id().to_vec();
47
48 let inner = PersistedMlsPendingGroup {
49 id: group_id,
50 state: serialized_group,
51 custom_configuration: serialized_cfg,
52 parent_id: None,
53 };
54 Ok(Self::new(inner, context))
55 }
56
57 async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58 self.context
59 .mls_provider()
60 .await
61 .map_err(RecursiveError::transaction("getting mls provider"))
62 .map_err(Into::into)
63 }
64
65 async fn keystore(&self) -> Result<CryptoKeystore> {
66 let backend = self.mls_provider().await?;
67 Ok(backend.keystore())
68 }
69
70 fn id(&self) -> &ConversationId {
71 &self.inner.id
72 }
73
74 pub(crate) async fn save(&self) -> Result<()> {
75 let keystore = self.keystore().await?;
76 keystore
77 .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78 .await
79 .map_err(KeystoreError::wrap("saving mls pending groups"))
80 .map_err(Into::into)
81 }
82
83 pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85 let transport = self
86 .context
87 .mls_transport()
88 .await
89 .map_err(RecursiveError::transaction("getting mls transport"))?;
90 let transport = transport.as_ref().ok_or::<Error>(
91 RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92 )?;
93
94 match transport
95 .send_commit_bundle(commit.clone())
96 .await
97 .map_err(RecursiveError::root("sending commit bundle"))?
98 {
99 MlsTransportResponse::Success => Ok(()),
100 MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101 MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102 }
103 }
104
105 pub async fn try_process_own_join_commit(
110 &mut self,
111 message: impl AsRef<[u8]>,
112 ) -> Result<MlsConversationDecryptMessage> {
113 if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115 return self.merge_and_restore_messages().await;
116 }
117
118 let keystore = self.keystore().await?;
119
120 let pending_msg = MlsPendingMessage {
121 foreign_id: self.id().clone(),
122 message: message.as_ref().to_vec(),
123 };
124 keystore
125 .save::<MlsPendingMessage>(pending_msg)
126 .await
127 .map_err(KeystoreError::wrap("saving mls pending message"))?;
128 Err(Error::BufferedForPendingConversation)
129 }
130
131 async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134 let backend = self.mls_provider().await?;
135 let keystore = backend.keystore();
136 let (group, _cfg) = keystore
138 .mls_pending_groups_load(self.id())
139 .await
140 .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141 let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142 .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144 mls_group
147 .merge_pending_commit(&backend)
148 .await
149 .map_err(MlsError::wrap("merging pending commit"))?;
150 let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151 .map_err(MlsError::wrap("deserializing mls message"))?;
152 let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153 return Ok(false);
154 };
155 let Some(msg_ct) = public_message.confirmation_tag() else {
156 return Ok(false);
157 };
158 let group_ct = mls_group
159 .compute_confirmation_tag(&backend)
160 .map_err(MlsError::wrap("computing confirmation tag"))?;
161 Ok(*msg_ct == group_ct)
162 }
163
164 async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166 let buffered_messages = self.merge().await?;
167 let context = &self.context;
168 let backend = self.mls_provider().await?;
169 let id = self.id();
170
171 let conversation = context
173 .conversation(id)
174 .await
175 .map_err(RecursiveError::transaction("getting conversation by id"))?;
176 let conversation = conversation.conversation().await;
177 let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179 let own_leaf_credential_with_key = CredentialWithKey {
181 credential: own_leaf.credential().clone(),
182 signature_key: own_leaf.signature_key().clone(),
183 };
184 let identity = own_leaf_credential_with_key
185 .extract_identity(conversation.ciphersuite(), None)
186 .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188 let crl_new_distribution_points = get_new_crl_distribution_points(
189 &backend,
190 extract_crl_uris_from_group(&conversation.group)
191 .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192 )
193 .await
194 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196 #[expect(deprecated)]
202 Ok(MlsConversationDecryptMessage {
203 app_msg: None,
204 proposals: vec![],
205 is_active: conversation.group.is_active(),
206 delay: conversation.compute_next_commit_delay(),
207 sender_client_id: None,
208 has_epoch_changed: true,
209 identity,
210 buffered_messages,
211 crl_new_distribution_points,
212 })
213 }
214
215 pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222 let mls_provider = self.mls_provider().await?;
223 let id = self.id();
224 let group = self.inner.state.clone();
225 let cfg = self.inner.custom_configuration.clone();
226
227 let mut mls_group =
228 core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230 mls_group
232 .merge_pending_commit(&mls_provider)
233 .await
234 .map_err(MlsError::wrap("merging pending commit"))?;
235
236 let custom_cfg =
238 serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239 let configuration = MlsConversationConfiguration {
240 ciphersuite: mls_group.ciphersuite().into(),
241 custom: custom_cfg,
242 ..Default::default()
243 };
244
245 let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248 trace!("External commit trying to rejoin group");
252 MessageRestorePolicy::ClearOnly
253 } else {
254 MessageRestorePolicy::DecryptAndClear
255 };
256
257 let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259 .await
260 .map_err(RecursiveError::mls_conversation(
261 "constructing conversation from mls group",
262 ))?;
263
264 let context = &self.context;
265
266 context
267 .mls_groups()
268 .await
269 .map_err(RecursiveError::transaction("getting mls groups"))?
270 .insert(id.clone(), conversation);
271
272 let mut conversation = context
274 .conversation(id)
275 .await
276 .map_err(RecursiveError::transaction("getting conversation by id"))?;
277 let pending_messages = conversation
278 .restore_pending_messages(restore_policy)
279 .await
280 .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282 if pending_messages.is_some() {
283 mls_provider
284 .key_store()
285 .remove::<MlsPendingMessage, _>(id)
286 .await
287 .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288 }
289
290 self.clear().await?;
292
293 Ok(pending_messages)
294 }
295
296 pub(crate) async fn clear(&mut self) -> Result<()> {
303 self.keystore()
304 .await?
305 .mls_pending_groups_delete(self.id())
306 .await
307 .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308 Ok(())
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::mls::conversation::Conversation as _;
316 use crate::prelude::MlsConversationDecryptMessage;
317 use crate::test_utils::*;
318 use wasm_bindgen_test::*;
319
320 wasm_bindgen_test_configure!(run_in_browser);
321
322 #[apply(all_cred_cipher)]
323 #[wasm_bindgen_test]
324 async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
325 let [alice_central, bob_central, charlie_central, debbie_central] = case.sessions().await;
326 Box::pin(async move {
327 let id = conversation_id();
328 alice_central
329 .transaction
330 .new_conversation(&id, case.credential_type, case.cfg.clone())
331 .await
332 .unwrap();
333 let gi = alice_central.get_group_info(&id).await;
335 let (external_commit, _) = bob_central
336 .create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
337 .await;
338
339 alice_central
341 .transaction
342 .conversation(&id)
343 .await
344 .unwrap()
345 .decrypt_message(external_commit.commit.to_bytes().unwrap())
346 .await
347 .unwrap();
348
349 let epoch = alice_central.transaction.conversation(&id).await.unwrap().epoch().await;
351 let external_proposal = debbie_central
352 .transaction
353 .new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
354 .await
355 .unwrap();
356
357 let app_msg = alice_central
359 .transaction
360 .conversation(&id)
361 .await
362 .unwrap()
363 .encrypt_message(b"Hello Bob !")
364 .await
365 .unwrap();
366 let proposal = alice_central
367 .transaction
368 .new_update_proposal(&id)
369 .await
370 .unwrap()
371 .proposal;
372 alice_central
373 .transaction
374 .conversation(&id)
375 .await
376 .unwrap()
377 .decrypt_message(external_proposal.to_bytes().unwrap())
378 .await
379 .unwrap();
380 let charlie = charlie_central.rand_key_package(&case).await;
381 alice_central
382 .transaction
383 .conversation(&id)
384 .await
385 .unwrap()
386 .add_members(vec![charlie])
387 .await
388 .unwrap();
389 let commit = alice_central.mls_transport().await.latest_commit_bundle().await;
390 charlie_central
391 .transaction
392 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
393 .await
394 .unwrap();
395 debbie_central
396 .transaction
397 .process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
398 .await
399 .unwrap();
400
401 let messages = vec![commit.commit, external_proposal, proposal]
405 .into_iter()
406 .map(|m| m.to_bytes().unwrap());
407 let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
408 bob_central.transaction.conversation(&id).await
409 else {
410 panic!("Bob should not have the conversation yet")
411 };
412 for m in messages {
413 let decrypt = pending_conversation.try_process_own_join_commit(m).await;
414 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
415 }
416 let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
417 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
418
419 assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 4);
421
422 let observer = TestEpochObserver::new();
423 bob_central
424 .session()
425 .await
426 .register_epoch_observer(observer.clone())
427 .await
428 .unwrap();
429
430 let MlsConversationDecryptMessage {
432 buffered_messages: Some(restored_messages),
433 ..
434 } = pending_conversation
435 .try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
436 .await
437 .unwrap()
438 else {
439 panic!("Alice's messages should have been restored at this point");
440 };
441
442 let observed_epochs = observer.observed_epochs().await;
443 assert_eq!(
444 observed_epochs.len(),
445 1,
446 "we should see exactly 1 epoch change in these 4 messages"
447 );
448 assert_eq!(observed_epochs[0].0, id, "conversation id must match");
449
450 for (idx, msg) in restored_messages.iter().enumerate() {
451 if idx == 0 {
452 assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
454 } else {
455 assert!(msg.app_msg.is_none());
456 }
457 }
458
459 assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
461 assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
463 assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
465
466 assert_eq!(bob_central.transaction.count_entities().await.pending_messages, 0);
468 })
469 .await
470 }
471
472 #[apply(all_cred_cipher)]
473 #[wasm_bindgen_test]
474 async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestContext) {
475 use crate::mls;
476
477 let [alice_central, bob_central] = case.sessions().await;
478 Box::pin(async move {
479 let id = conversation_id();
480 alice_central
481 .transaction
482 .new_conversation(&id, case.credential_type, case.cfg.clone())
483 .await
484 .unwrap();
485 alice_central.invite_all(&case, &id, [&bob_central]).await.unwrap();
486
487 bob_central
489 .transaction
490 .conversation(&id)
491 .await
492 .unwrap()
493 .update_key_material()
494 .await
495 .unwrap();
496
497 let msg1 = bob_central
498 .transaction
499 .conversation(&id)
500 .await
501 .unwrap()
502 .encrypt_message("A")
503 .await
504 .unwrap();
505 let msg2 = bob_central
506 .transaction
507 .conversation(&id)
508 .await
509 .unwrap()
510 .encrypt_message("B")
511 .await
512 .unwrap();
513
514 let decrypt = alice_central
516 .transaction
517 .conversation(&id)
518 .await
519 .unwrap()
520 .decrypt_message(msg1)
521 .await;
522 assert!(matches!(
523 decrypt.unwrap_err(),
524 mls::conversation::Error::BufferedFutureMessage { .. }
525 ));
526 let decrypt = alice_central
527 .transaction
528 .conversation(&id)
529 .await
530 .unwrap()
531 .decrypt_message(msg2)
532 .await;
533 assert!(matches!(
534 decrypt.unwrap_err(),
535 mls::conversation::Error::BufferedFutureMessage { .. }
536 ));
537 assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 2);
538
539 let gi = bob_central.get_group_info(&id).await;
540 alice_central
541 .transaction
542 .join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
543 .await
544 .unwrap();
545
546 let ext_commit = alice_central.mls_transport().await.latest_commit_bundle().await;
547
548 bob_central
549 .transaction
550 .conversation(&id)
551 .await
552 .unwrap()
553 .decrypt_message(ext_commit.commit.to_bytes().unwrap())
554 .await
555 .unwrap();
556 assert_eq!(alice_central.transaction.count_entities().await.pending_messages, 0);
558 })
559 .await
560 }
561}