core_crypto/mls/conversation/
pending_conversation.rsuse super::Result;
use super::{ConversationWithMls, Error};
use crate::context::CentralContext;
use crate::mls::conversation::conversation_guard::decrypt::buffer_messages::MessageRestorePolicy;
use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
use crate::mls::credential::ext::CredentialExt as _;
use crate::prelude::{
ConversationId, MlsBufferedConversationDecryptMessage, MlsCommitBundle, MlsConversation,
MlsConversationConfiguration, MlsConversationDecryptMessage, MlsCustomConfiguration,
};
use crate::{KeystoreError, LeafError, MlsError, MlsTransportResponse, RecursiveError};
use core_crypto_keystore::CryptoKeystoreMls as _;
use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
use log::trace;
use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
use openmls::credentials::CredentialWithKey;
use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
use openmls_traits::OpenMlsCryptoProvider;
use tls_codec::Deserialize as _;
#[derive(Debug)]
pub struct PendingConversation {
inner: PersistedMlsPendingGroup,
context: CentralContext,
}
impl PendingConversation {
pub(crate) fn new(inner: PersistedMlsPendingGroup, context: CentralContext) -> Self {
Self { inner, context }
}
pub(crate) fn from_mls_group(
group: MlsGroup,
custom_cfg: MlsCustomConfiguration,
context: CentralContext,
) -> Result<Self> {
let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
let serialized_group =
core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
let group_id = group.group_id().to_vec();
let inner = PersistedMlsPendingGroup {
id: group_id,
state: serialized_group,
custom_configuration: serialized_cfg,
parent_id: None,
};
Ok(Self::new(inner, context))
}
async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
self.context
.mls_provider()
.await
.map_err(RecursiveError::root("getting mls provider"))
.map_err(Into::into)
}
async fn keystore(&self) -> Result<CryptoKeystore> {
let backend = self.mls_provider().await?;
Ok(backend.keystore())
}
fn id(&self) -> &ConversationId {
&self.inner.id
}
pub(crate) async fn save(&self) -> Result<()> {
let keystore = self.keystore().await?;
keystore
.mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
.await
.map_err(KeystoreError::wrap("saving mls pending groups"))
.map_err(Into::into)
}
pub(crate) async fn send_commit(&self, commit: MlsCommitBundle) -> Result<()> {
let transport = self
.context
.mls_transport()
.await
.map_err(RecursiveError::root("getting mls transport"))?;
let transport = transport.as_ref().ok_or::<Error>(
RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
)?;
match transport
.send_commit_bundle(commit.clone())
.await
.map_err(RecursiveError::root("sending commit bundle"))?
{
MlsTransportResponse::Success => Ok(()),
MlsTransportResponse::Abort { reason } => Err(Error::MessageRejected { reason }),
MlsTransportResponse::Retry => Err(Error::CannotRetryWithoutConversation),
}
}
pub async fn try_process_own_join_commit(
&mut self,
message: impl AsRef<[u8]>,
) -> Result<MlsConversationDecryptMessage> {
if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
return self.merge_and_restore_messages().await;
}
let keystore = self.keystore().await?;
let pending_msg = MlsPendingMessage {
foreign_id: self.id().clone(),
message: message.as_ref().to_vec(),
};
keystore
.save::<MlsPendingMessage>(pending_msg)
.await
.map_err(KeystoreError::wrap("saving mls pending message"))?;
Err(Error::BufferedForPendingConversation)
}
async fn incoming_message_is_own_join_commit(&self, message: impl AsRef<[u8]>) -> Result<bool> {
let backend = self.mls_provider().await?;
let keystore = backend.keystore();
let (group, _cfg) = keystore
.mls_pending_groups_load(self.id())
.await
.map_err(KeystoreError::wrap("loading mls pending groups"))?;
let mut mls_group = core_crypto_keystore::deser::<MlsGroup>(&group)
.map_err(KeystoreError::wrap("deserializing mls pending groups"))?;
mls_group
.merge_pending_commit(&backend)
.await
.map_err(MlsError::wrap("merging pending commit"))?;
let message_in = MlsMessageIn::tls_deserialize(&mut message.as_ref())
.map_err(MlsError::wrap("deserializing mls message"))?;
let MlsMessageInBody::PublicMessage(public_message) = message_in.extract() else {
return Ok(false);
};
let Some(msg_ct) = public_message.confirmation_tag() else {
return Ok(false);
};
let group_ct = mls_group
.compute_confirmation_tag(&backend)
.map_err(MlsError::wrap("computing confirmation tag"))?;
Ok(*msg_ct == group_ct)
}
async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
let buffered_messages = self.merge().await?;
let context = &self.context;
let backend = self.mls_provider().await?;
let id = self.id();
let conversation = context.conversation(id).await?;
let conversation = conversation.conversation().await;
let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;
let own_leaf_credential_with_key = CredentialWithKey {
credential: own_leaf.credential().clone(),
signature_key: own_leaf.signature_key().clone(),
};
let identity = own_leaf_credential_with_key
.extract_identity(conversation.ciphersuite(), None)
.map_err(RecursiveError::mls_credential("extracting identity"))?;
let crl_new_distribution_points = get_new_crl_distribution_points(
&backend,
extract_crl_uris_from_group(&conversation.group)
.map_err(RecursiveError::mls_credential("extracting crl uris from group"))?,
)
.await
.map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
Ok(MlsConversationDecryptMessage {
app_msg: None,
proposals: vec![],
is_active: conversation.group.is_active(),
delay: conversation.compute_next_commit_delay(),
sender_client_id: None,
has_epoch_changed: true,
identity,
buffered_messages,
crl_new_distribution_points,
})
}
pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
let mls_provider = self.mls_provider().await?;
let id = self.id();
let group = self.inner.state.clone();
let cfg = self.inner.custom_configuration.clone();
let mut mls_group =
core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;
mls_group
.merge_pending_commit(&mls_provider)
.await
.map_err(MlsError::wrap("merging pending commit"))?;
let custom_cfg =
serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
let configuration = MlsConversationConfiguration {
ciphersuite: mls_group.ciphersuite().into(),
custom: custom_cfg,
..Default::default()
};
let restore_policy = if mls_provider.key_store().mls_group_exists(id.as_slice()).await {
trace!("External commit trying to rejoin group");
MessageRestorePolicy::ClearOnly
} else {
MessageRestorePolicy::DecryptAndClear
};
let conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
.await
.map_err(RecursiveError::mls_conversation(
"constructing conversation from mls group",
))?;
let context = &self.context;
context
.mls_groups()
.await
.map_err(RecursiveError::root("getting mls groups"))?
.insert(id.clone(), conversation);
let mut conversation = context.conversation(id).await?;
let pending_messages = conversation
.restore_pending_messages(restore_policy)
.await
.map_err(RecursiveError::mls_conversation("restoring pending messages"))?;
if pending_messages.is_some() {
mls_provider
.key_store()
.remove::<MlsPendingMessage, _>(id)
.await
.map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
}
self.clear().await?;
Ok(pending_messages)
}
pub(crate) async fn clear(&mut self) -> Result<()> {
self.keystore()
.await?
.mls_pending_groups_delete(self.id())
.await
.map_err(KeystoreError::wrap("deleting pending groups by id"))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mls::conversation::Conversation as _;
use crate::prelude::MlsConversationDecryptMessage;
use crate::test_utils::*;
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
#[apply(all_cred_cipher)]
#[wasm_bindgen_test]
async fn should_buffer_and_reapply_messages_after_external_commit_merged(case: TestCase) {
run_test_with_client_ids(
case.clone(),
["alice", "bob", "charlie", "debbie"],
move |[alice_central, bob_central, charlie_central, debbie_central]| {
Box::pin(async move {
let id = conversation_id();
alice_central
.context
.new_conversation(&id, case.credential_type, case.cfg.clone())
.await
.unwrap();
let gi = alice_central.get_group_info(&id).await;
let (external_commit, _) = bob_central
.create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
.await;
alice_central
.context
.conversation(&id)
.await
.unwrap()
.decrypt_message(external_commit.commit.to_bytes().unwrap())
.await
.unwrap();
let epoch = alice_central.context.conversation(&id).await.unwrap().epoch().await;
let external_proposal = debbie_central
.context
.new_external_add_proposal(id.clone(), epoch.into(), case.ciphersuite(), case.credential_type)
.await
.unwrap();
let app_msg = alice_central
.context
.conversation(&id)
.await
.unwrap()
.encrypt_message(b"Hello Bob !")
.await
.unwrap();
let proposal = alice_central.context.new_update_proposal(&id).await.unwrap().proposal;
alice_central
.context
.conversation(&id)
.await
.unwrap()
.decrypt_message(external_proposal.to_bytes().unwrap())
.await
.unwrap();
let charlie = charlie_central.rand_key_package(&case).await;
alice_central
.context
.conversation(&id)
.await
.unwrap()
.add_members(vec![charlie])
.await
.unwrap();
let commit = alice_central.mls_transport.latest_commit_bundle().await;
charlie_central
.context
.process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
.await
.unwrap();
debbie_central
.context
.process_welcome_message(commit.welcome.clone().unwrap().into(), case.custom_cfg())
.await
.unwrap();
let messages = vec![commit.commit, external_proposal, proposal]
.into_iter()
.map(|m| m.to_bytes().unwrap());
let Err(Error::PendingConversation(mut pending_conversation)) =
bob_central.context.conversation(&id).await
else {
panic!("Bob should not have the conversation yet")
};
for m in messages {
let decrypt = pending_conversation.try_process_own_join_commit(m).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
}
let decrypt = pending_conversation.try_process_own_join_commit(app_msg).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedForPendingConversation));
assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
let MlsConversationDecryptMessage {
buffered_messages: Some(restored_messages),
..
} = pending_conversation
.try_process_own_join_commit(external_commit.commit.to_bytes().unwrap())
.await
.unwrap()
else {
panic!("Alice's messages should have been restored at this point");
};
for (i, m) in restored_messages.into_iter().enumerate() {
match i {
0 => {
assert_eq!(&m.app_msg.unwrap(), b"Hello Bob !");
assert!(!m.has_epoch_changed);
}
1 | 2 => {
assert!(m.app_msg.is_none());
assert!(!m.has_epoch_changed);
}
3 => {
assert!(m.app_msg.is_none());
assert!(m.has_epoch_changed);
}
_ => unreachable!(),
}
}
assert!(bob_central.try_talk_to(&id, &alice_central).await.is_ok());
assert!(bob_central.try_talk_to(&id, &charlie_central).await.is_ok());
assert!(bob_central.try_talk_to(&id, &debbie_central).await.is_ok());
assert_eq!(bob_central.context.count_entities().await.pending_messages, 0);
})
},
)
.await
}
#[apply(all_cred_cipher)]
#[wasm_bindgen_test]
async fn should_not_reapply_buffered_messages_when_external_commit_contains_remove(case: TestCase) {
use crate::mls;
run_test_with_client_ids(
case.clone(),
["alice", "bob"],
move |[alice_central, mut bob_central]| {
Box::pin(async move {
let id = conversation_id();
alice_central
.context
.new_conversation(&id, case.credential_type, case.cfg.clone())
.await
.unwrap();
alice_central.invite_all(&case, &id, [&mut bob_central]).await.unwrap();
bob_central
.context
.conversation(&id)
.await
.unwrap()
.update_key_material()
.await
.unwrap();
let msg1 = bob_central
.context
.conversation(&id)
.await
.unwrap()
.encrypt_message("A")
.await
.unwrap();
let msg2 = bob_central
.context
.conversation(&id)
.await
.unwrap()
.encrypt_message("B")
.await
.unwrap();
let decrypt = alice_central
.context
.conversation(&id)
.await
.unwrap()
.decrypt_message(msg1)
.await;
assert!(matches!(
decrypt.unwrap_err(),
mls::conversation::Error::BufferedFutureMessage { .. }
));
let decrypt = alice_central
.context
.conversation(&id)
.await
.unwrap()
.decrypt_message(msg2)
.await;
assert!(matches!(
decrypt.unwrap_err(),
mls::conversation::Error::BufferedFutureMessage { .. }
));
assert_eq!(alice_central.context.count_entities().await.pending_messages, 2);
let gi = bob_central.get_group_info(&id).await;
alice_central
.context
.join_by_external_commit(gi, case.custom_cfg(), case.credential_type)
.await
.unwrap();
let ext_commit = alice_central.mls_transport.latest_commit_bundle().await;
bob_central
.context
.conversation(&id)
.await
.unwrap()
.decrypt_message(ext_commit.commit.to_bytes().unwrap())
.await
.unwrap();
assert_eq!(alice_central.context.count_entities().await.pending_messages, 0);
})
},
)
.await
}
}