core_crypto/mls/conversation/conversation_guard/
commit.rsuse openmls::prelude::KeyPackageIn;
use crate::mls::conversation::{ConversationWithMls as _, Error};
use crate::mls::credential::CredentialBundle;
use crate::prelude::MlsCredentialType;
use crate::{
LeafError, MlsError, MlsTransportResponse, RecursiveError,
e2e_identity::init_certificates::NewCrlDistributionPoint,
mls::{
conversation::{ConversationGuard, Result, commit::MlsCommitBundle},
credential::crl::{extract_crl_uris_from_credentials, get_new_crl_distribution_points},
},
prelude::ClientId,
};
#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) enum TransportedCommitPolicy {
Merge,
None,
}
impl ConversationGuard {
async fn send_and_merge_commit(&mut self, commit: MlsCommitBundle) -> Result<()> {
match self.send_commit(commit).await {
Ok(TransportedCommitPolicy::None) => Ok(()),
Ok(TransportedCommitPolicy::Merge) => {
let backend = self.mls_provider().await?;
let mut conversation = self.inner.write().await;
conversation.commit_accepted(&backend).await
}
Err(e @ Error::MessageRejected { .. }) => {
self.clear_pending_commit().await?;
Err(e)
}
Err(e) => Err(e),
}
}
async fn send_commit(&mut self, mut commit: MlsCommitBundle) -> Result<TransportedCommitPolicy> {
let transport = self
.central()
.await?
.mls_transport()
.await
.map_err(RecursiveError::root("getting mls transport"))?;
let transport = transport.as_ref().ok_or::<Error>(
RecursiveError::root("getting mls transport")(crate::Error::MlsTransportNotProvided).into(),
)?;
let client = self.mls_client().await?;
let backend = self.mls_provider().await?;
let inner = self.conversation().await;
let epoch_before_sending = inner.group().epoch().as_u64();
drop(inner);
loop {
match transport
.send_commit_bundle(commit.clone())
.await
.map_err(RecursiveError::root("sending commit bundle"))?
{
MlsTransportResponse::Success => {
return Ok(TransportedCommitPolicy::Merge);
}
MlsTransportResponse::Abort { reason } => {
return Err(Error::MessageRejected { reason });
}
MlsTransportResponse::Retry => {
let mut inner = self.conversation_mut().await;
let epoch_after_sending = inner.group().epoch().as_u64();
if epoch_before_sending == epoch_after_sending {
continue;
}
let Some(commit_to_retry) = inner.commit_pending_proposals(&client, &backend).await? else {
return Ok(TransportedCommitPolicy::None);
};
commit = commit_to_retry;
}
}
}
}
pub async fn add_members(&mut self, key_packages: Vec<KeyPackageIn>) -> Result<NewCrlDistributionPoint> {
let backend = self.mls_provider().await?;
let credential = self.credential_bundle().await?;
let signer = credential.signature_key();
let mut conversation = self.conversation_mut().await;
let crl_dps = extract_crl_uris_from_credentials(key_packages.iter().filter_map(|kp| {
let mls_credential = kp.credential().mls_credential();
matches!(mls_credential, openmls::prelude::MlsCredentialType::X509(_)).then_some(mls_credential)
}))
.map_err(RecursiveError::mls_credential("extracting crl uris from credentials"))?;
let crl_new_distribution_points = get_new_crl_distribution_points(&backend, crl_dps)
.await
.map_err(RecursiveError::mls_credential("getting new crl distribution points"))?;
let (commit, welcome, group_info) = conversation
.group
.add_members(&backend, signer, key_packages)
.await
.map_err(MlsError::wrap("group add members"))?;
let welcome = Some(welcome);
let group_info = Self::group_info(group_info)?;
conversation
.persist_group_when_changed(&backend.keystore(), false)
.await?;
drop(conversation);
let commit = MlsCommitBundle {
commit,
welcome,
group_info,
};
self.send_and_merge_commit(commit).await?;
Ok(crl_new_distribution_points)
}
pub async fn remove_members(&mut self, clients: &[ClientId]) -> Result<()> {
let backend = self.mls_provider().await?;
let credential = self.credential_bundle().await?;
let signer = credential.signature_key();
let mut conversation = self.inner.write().await;
let members = conversation
.group
.members()
.filter_map(|kp| {
clients
.iter()
.any(move |client_id| client_id.as_slice() == kp.credential.identity())
.then_some(kp.index)
})
.collect::<Vec<_>>();
let (commit, welcome, group_info) = conversation
.group
.remove_members(&backend, signer, &members)
.await
.map_err(MlsError::wrap("group remove members"))?;
let group_info = Self::group_info(group_info)?;
conversation
.persist_group_when_changed(&backend.keystore(), false)
.await?;
drop(conversation);
self.send_and_merge_commit(MlsCommitBundle {
commit,
welcome,
group_info,
})
.await
}
pub async fn update_key_material(&mut self) -> Result<()> {
let client = self.mls_client().await?;
let backend = self.mls_provider().await?;
let mut conversation = self.inner.write().await;
let commit = conversation
.update_keying_material(&client, &backend, None, None)
.await?;
drop(conversation);
self.send_and_merge_commit(commit).await
}
pub async fn e2ei_rotate(&mut self, cb: Option<&CredentialBundle>) -> Result<()> {
let client = &self.mls_client().await?;
let backend = &self.mls_provider().await?;
let mut conversation = self.inner.write().await;
let cb = match cb {
Some(cb) => cb,
None => &client
.find_most_recent_credential_bundle(
conversation.ciphersuite().signature_algorithm(),
MlsCredentialType::X509,
)
.await
.map_err(RecursiveError::mls_client("finding most recent x509 credential bundle"))?,
};
let mut leaf_node = conversation
.group
.own_leaf()
.ok_or(LeafError::InternalMlsError)?
.clone();
leaf_node.set_credential_with_key(cb.to_mls_credential_with_key());
let commit = conversation
.update_keying_material(client, backend, Some(cb), Some(leaf_node))
.await?;
drop(conversation);
self.send_and_merge_commit(commit).await
}
pub async fn commit_pending_proposals(&mut self) -> Result<()> {
let client = self.mls_client().await?;
let backend = self.mls_provider().await?;
let mut conversation = self.inner.write().await;
let commit = conversation.commit_pending_proposals(&client, &backend).await?;
drop(conversation);
let Some(commit) = commit else {
return Ok(());
};
self.send_and_merge_commit(commit).await
}
}