core_crypto/mls/conversation/
pending_conversation.rs1use super::Result;
6use super::{ConversationWithMls, Error};
7use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
8use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
9use crate::mls::credential::ext::CredentialExt as _;
10use crate::prelude::{
11 ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
12 MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
13};
14use crate::transaction_context::TransactionContext;
15use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
16use core_crypto_keystore::CryptoKeystoreMls as _;
17use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
18use log::trace;
19use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
20use openmls::credentials::CredentialWithKey;
21use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
22use openmls_traits::OpenMlsCryptoProvider;
23use tls_codec::Deserialize as _;
24
25#[derive(Debug)]
28pub struct PendingConversation {
29 inner: PersistedMlsPendingGroup,
30 context: TransactionContext,
31}
32
33impl PendingConversation {
34 pub(crate) fn new(inner: PersistedMlsPendingGroup, context: TransactionContext) -> Self {
35 Self { inner, context }
36 }
37
38 pub(crate) fn from_mls_group(
39 group: MlsGroup,
40 custom_cfg: MlsCustomConfiguration,
41 context: TransactionContext,
42 ) -> Result<Self> {
43 let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
44 let serialized_group =
45 core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
46 let group_id = group.group_id().to_vec();
47
48 let inner = PersistedMlsPendingGroup {
49 id: group_id,
50 state: serialized_group,
51 custom_configuration: serialized_cfg,
52 parent_id: None,
53 };
54 Ok(Self::new(inner, context))
55 }
56
57 async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
58 self.context
59 .mls_provider()
60 .await
61 .map_err(RecursiveError::transaction("getting mls provider"))
62 .map_err(Into::into)
63 }
64
65 async fn keystore(&self) -> Result<CryptoKeystore> {
66 let backend = self.mls_provider().await?;
67 Ok(backend.keystore())
68 }
69
70 fn id(&self) -> &ConversationId {
71 &self.inner.id
72 }
73
74 pub(crate) async fn save(&self) -> Result<()> {
75 let keystore = self.keystore().await?;
76 keystore
77 .mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
78 .await
79 .map_err(KeystoreError::wrap("saving mls pending groups"))
80 .map_err(Into::into)
81 }
82
83 pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
85 let transport = self
86 .context
87 .mls_transport()
88 .await
89 .map_err(RecursiveError::transaction("getting mls transport"))?;
90 let transport = transport.as_ref().ok_or::<Error>(
91 RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
92 )?;
93
94 match transport
95 .send_commit_bundle(commit.clone())
96 .await
97 .map_err(RecursiveError::root("sending commit bundle"))?
98 {
99 MlsTransportResponse::Success => Ok(()),
100 MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
101 MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
102 }
103 }
104
105 pub async fn try_process_own_join_commit(
110 &mut self,
111 message: impl AsRef<[u8]>,
112 ) -> Result<MlsConversationDecryptMessage> {
113 if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
115 return self.merge_and_restore_messages().await;
116 }
117
118 let keystore = self.keystore().await?;
119
120 let pending_msg = MlsPendingMessage {
121 foreign_id: self.id().clone(),
122 message: message.as_ref().to_vec(),
123 };
124 keystore
125 .save::<MlsPendingMessage>(pending_msg)
126 .await
127 .map_err(KeystoreError::wrap("saving mls pending message"))?;
128 Err(Error::BufferedForPendingConversation)
129 }
130
131 async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
134 let backend = self.mls_provider().await?;
135 let keystore = backend.keystore();
136 let (group, _cfg) = keystore
138 .mls_pending_groups_load(self.id())
139 .await
140 .map_err(KeystoreError::wrap("loading mls pending groups"))?;
141 let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
142 .map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
143
144 mls_group
147 .merge_pending_commit(&backend)
148 .await
149 .map_err(MlsError::wrap("merging pending commit"))?;
150 let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
151 .map_err(MlsError::wrap("deserializing mls message"))?;
152 let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
153 return Ok(false);
154 };
155 let Some(msg_ct) = public_message.confirmation_tag() else {
156 return Ok(false);
157 };
158 let group_ct = mls_group
159 .compute_confirmation_tag(&backend)
160 .map_err(MlsError::wrap("computing confirmation tag"))?;
161 Ok(*msg_ct == group_ct)
162 }
163
164 async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
166 let buffered_messages = self.merge().await?;
167 let context = &self.context;
168 let backend = self.mls_provider().await?;
169 let id = self.id();
170
171 let conversation = context
173 .conversation(id)
174 .await
175 .map_err(RecursiveError::transaction("getting conversation by id"))?;
176 let conversation = conversation.conversation().await;
177 let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
178
179 let own_leaf_credential_with_key = CredentialWithKey {
181 credential: own_leaf.credential().clone(),
182 signature_key: own_leaf.signature_key().clone(),
183 };
184 let identity = own_leaf_credential_with_key
185 .extract_identity(conversation.ciphersuite(), None)
186 .map_err(RecursiveError::mls_credential("extracting identity"))?;
187
188 let crl_new_distribution_points = get_new_crl_distribution_points(
189 &backend,
190 extract_crl_uris_from_group(&conversation.group)
191 .map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
192 )
193 .await
194 .map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
195
196 #[expect(deprecated)]
202 Ok(MlsConversationDecryptMessage {
203 app_msg: None,
204 proposals: vec![],
205 is_active: conversation.group.is_active(),
206 delay: conversation.compute_next_commit_delay(),
207 sender_client_id: None,
208 has_epoch_changed: true,
209 identity,
210 buffered_messages,
211 crl_new_distribution_points,
212 })
213 }
214
215 pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
222 let mls_provider = self.mls_provider().await?;
223 let id = self.id();
224 let group = self.inner.state.clone();
225 let cfg = self.inner.custom_configuration.clone();
226
227 let mut mls_group =
228 core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
229
230 mls_group
232 .merge_pending_commit(&mls_provider)
233 .await
234 .map_err(MlsError::wrap("merging pending commit"))?;
235
236 let custom_cfg =
238 serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
239 let configuration = MlsConversationConfiguration {
240 ciphersuite: mls_group.ciphersuite().into(),
241 custom: custom_cfg,
242 ..Default::default()
243 };
244
245 let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
248 trace!("External commit trying to rejoin group");
252 MessageRestorePolicy::ClearOnly
253 } else {
254 MessageRestorePolicy::DecryptAndClear
255 };
256
257 let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
259 .await
260 .map_err(RecursiveError::mls_conversation(
261 "constructing conversation from mls group",
262 ))?;
263
264 let context = &self.context;
265
266 context
267 .mls_groups()
268 .await
269 .map_err(RecursiveError::transaction("getting mls groups"))?
270 .insert(id.clone(), conversation);
271
272 let mut conversation = context
274 .conversation(id)
275 .await
276 .map_err(RecursiveError::transaction("getting conversation by id"))?;
277 let pending_messages = conversation
278 .restore_pending_messages(restore_policy)
279 .await
280 .map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
281
282 if pending_messages.is_some() {
283 mls_provider
284 .key_store()
285 .remove::<MlsPendingMessage, _>(id)
286 .await
287 .map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
288 }
289
290 self.clear().await?;
292
293 Ok(pending_messages)
294 }
295
296 pub(crate) async fn clear(&mut self) -> Result<()> {
303 self.keystore()
304 .await?
305 .mls_pending_groups_delete(self.id())
306 .await
307 .map_err(KeystoreError::wrap("deleting pending groups by id"))?;
308 Ok(())
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::prelude::MlsConversationDecryptMessage;
316 use crate::test_utils::*;
317 use wasm_bindgen_test::*;
318
319 wasm_bindgen_test_configure!(run_in_browser);
320
321 #[apply(all_cred_cipher)]
322 #[wasm_bindgen_test]
323 async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestContext) {
324 let [alice, bob, charlie, debbie] = case.sessions().await;
325 Box::pin(async move {
326 let conversation = case.create_conversation([&alice]).await;
327 let (commit_guard, _pending_conversation) = conversation.external_join_unmerged(&bob).await;
329 let external_commit = commit_guard.message();
330
331 let conversation = commit_guard
333 .notify_member(&alice)
334 .await
335 .process_member_changes()
336 .await
337 .finish();
338
339 let proposal_guard = conversation.external_join_proposal(&debbie).await;
341 let external_proposal = proposal_guard.message();
342
343 let app_msg = proposal_guard
345 .conversation()
346 .guard()
347 .await
348 .encrypt_message(b"Hello Bob !")
349 .await
350 .unwrap();
351
352 let conversation = proposal_guard.notify_member(&alice).await.finish();
353 let proposal_guard = conversation.update_proposal().await;
354 let proposal = proposal_guard.message();
355 let conversation = proposal_guard.finish();
356
357 let commit_guard = conversation.invite([&charlie]).await;
358 let commit = commit_guard.message();
359 let conversation = commit_guard.process_member_changes().await.finish();
360
361 let messages = vec![commit, external_proposal, proposal]
365 .into_iter()
366 .map(|m| m.to_bytes().unwrap());
367 let Err(crate::transaction_context::Error::PendingConversation(mut pending_conversation)) =
368 bob.transaction.conversation(conversation.id()).await
369 else {
370 panic!("Bob should not have the conversation yet")
371 };
372 for m in messages {
373 let decrypt = pending_conversation.try_process_own_join_commit(m).await;
374 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
375 }
376 let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
377 assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
378
379 assert_eq!(bob.transaction.count_entities().await.pending_messages, 4);
381
382 let observer = TestEpochObserver::new();
383 bob.session()
384 .await
385 .register_epoch_observer(observer.clone())
386 .await
387 .unwrap();
388
389 let MlsConversationDecryptMessage {
391 buffered_messages: Some(restored_messages),
392 ..
393 } = pending_conversation
394 .try_process_own_join_commit(external_commit.to_bytes().unwrap())
395 .await
396 .unwrap()
397 else {
398 panic!("Alice's messages should have been restored at this point");
399 };
400
401 let observed_epochs = observer.observed_epochs().await;
402 assert_eq!(
403 observed_epochs.len(),
404 1,
405 "we should see exactly 1 epoch change in these 4 messages"
406 );
407 assert_eq!(observed_epochs[0].0, *conversation.id(), "conversation id must match");
408
409 for (idx, msg) in restored_messages.iter().enumerate() {
410 if idx == 0 {
411 assert_eq!(msg.app_msg.as_deref(), Some(b"Hello Bob !" as _));
413 } else {
414 assert!(msg.app_msg.is_none());
415 }
416 }
417
418 assert_eq!(conversation.member_count().await, 4);
419 assert!(
420 conversation
421 .is_functional_and_contains([&alice, &bob, &charlie, &debbie])
422 .await
423 );
424
425 assert_eq!(bob.transaction.count_entities().await.pending_messages, 0);
427 })
428 .await
429 }
430
431 #[apply(all_cred_cipher)]
432 #[wasm_bindgen_test]
433 async fn should_not_reapply_buffered_messages_when_rejoining(case: TestContext) {
434 use crate::mls;
435
436 let [alice, bob] = case.sessions().await;
437 Box::pin(async move {
438 let conversation = case.create_conversation([&alice, &bob]).await;
439
440 let conversation = conversation.acting_as(&bob).await;
442 let commit_guard = conversation.update().await;
443 let conversation = commit_guard.conversation();
444
445 let msg1 = conversation.guard().await.encrypt_message("A").await.unwrap();
446 let msg2 = conversation.guard().await.encrypt_message("B").await.unwrap();
447
448 let conversation = commit_guard.finish();
449 let decrypt = conversation.guard().await.decrypt_message(msg1).await;
451 assert!(matches!(
452 decrypt.unwrap_err(),
453 mls::conversation::Error::BufferedFutureMessage { .. }
454 ));
455 let decrypt = conversation.guard().await.decrypt_message(msg2).await;
456 assert!(matches!(
457 decrypt.unwrap_err(),
458 mls::conversation::Error::BufferedFutureMessage { .. }
459 ));
460 assert_eq!(alice.transaction.count_entities().await.pending_messages, 2);
461
462 let conversation = conversation.acting_as(&bob).await.external_join_notify(&alice).await;
463 assert_eq!(alice.transaction.count_entities().await.pending_messages, 0);
465 assert!(conversation.is_functional_and_contains([&alice, &bob]).await);
466 })
467 .await
468 }
469}