core_crypto_keystore/connection/
mod.rs1use std::{fmt, ops::Deref};
2
3use sha2::{Digest as _, Sha256};
4use zeroize::{Zeroize, ZeroizeOnDrop};
5
6pub mod platform {
7 cfg_if::cfg_if! {
8 if #[cfg(target_family = "wasm")] {
9 mod wasm;
10 pub use self::wasm::WasmConnection as KeystoreDatabaseConnection;
11 pub use wasm::storage;
12 pub use self::wasm::storage::WasmStorageTransaction as TransactionWrapper;
13 } else {
14 mod generic;
15 pub use self::generic::SqlCipherConnection as KeystoreDatabaseConnection;
16 pub use self::generic::TransactionWrapper;
17 }
18 }
19}
20
21use std::{ops::DerefMut, sync::Arc};
22
23use async_lock::{Mutex, MutexGuard, Semaphore};
24
25pub use self::platform::*;
26use crate::{
27 CryptoKeystoreError, CryptoKeystoreResult,
28 entities::{Entity, EntityFindParams, EntityTransactionExt, MlsPendingMessage, StringEntityId, UniqueEntity},
29 transaction::KeystoreTransaction,
30};
31
32pub const MAX_BLOB_LEN: usize = 1_000_000_000;
41
42#[cfg(not(target_family = "wasm"))]
43pub trait DatabaseConnectionRequirements: Sized + Send {}
45#[cfg(target_family = "wasm")]
46pub trait DatabaseConnectionRequirements: Sized {}
49
50#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
52pub struct DatabaseKey([u8; Self::LEN]);
53
54impl DatabaseKey {
55 pub const LEN: usize = 32;
56
57 pub fn generate() -> DatabaseKey {
58 DatabaseKey(rand::random::<[u8; Self::LEN]>())
59 }
60}
61
62impl fmt::Debug for DatabaseKey {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
64 f.write_str("DatabaseKey(hash=")?;
65 for x in Sha256::digest(self).as_slice().iter().take(10) {
66 fmt::LowerHex::fmt(x, f)?
67 }
68 f.write_str("...)")
69 }
70}
71
72impl AsRef<[u8]> for DatabaseKey {
73 fn as_ref(&self) -> &[u8] {
74 &self.0
75 }
76}
77
78impl Deref for DatabaseKey {
79 type Target = [u8];
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl TryFrom<&[u8]> for DatabaseKey {
87 type Error = CryptoKeystoreError;
88
89 fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
90 if buf.len() != Self::LEN {
91 Err(CryptoKeystoreError::InvalidDbKeySize {
92 expected: Self::LEN,
93 actual: buf.len(),
94 })
95 } else {
96 Ok(Self(buf.try_into().unwrap()))
97 }
98 }
99}
100
101#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
102#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
103pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
104 type Connection: 'a;
105
106 async fn open(name: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
107
108 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
109
110 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
111
112 async fn wipe(self) -> CryptoKeystoreResult<()>;
114
115 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
116 #[cfg(not(target_family = "wasm"))]
117 if size > i32::MAX as usize {
118 return Err(CryptoKeystoreError::BlobTooBig);
119 }
120
121 if size >= MAX_BLOB_LEN {
122 return Err(CryptoKeystoreError::BlobTooBig);
123 }
124
125 Ok(())
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct Database {
131 pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
132 pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
133 transaction_semaphore: Arc<Semaphore>,
134}
135
136const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
137
138#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
141#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
142pub trait FetchFromDatabase: Send + Sync {
143 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
144 &self,
145 id: impl AsRef<[u8]> + Send,
146 ) -> CryptoKeystoreResult<Option<E>>;
147
148 async fn find_unique<U: UniqueEntity<ConnectionType = KeystoreDatabaseConnection>>(
149 &self,
150 ) -> CryptoKeystoreResult<U>;
151
152 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
153 &self,
154 params: EntityFindParams,
155 ) -> CryptoKeystoreResult<Vec<E>>;
156
157 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
158 &self,
159 ids: &[Vec<u8>],
160 ) -> CryptoKeystoreResult<Vec<E>>;
161 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize>;
162}
163
164unsafe impl Send for Database {}
166unsafe impl Sync for Database {}
168
169#[derive(Debug, Clone)]
171pub enum ConnectionType<'a> {
172 Persistent(&'a str),
174 InMemory,
176}
177
178pub struct ConnectionGuard<'a> {
183 guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
184}
185
186impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
187 type Error = CryptoKeystoreError;
188
189 fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
190 guard
191 .is_some()
192 .then_some(Self { guard })
193 .ok_or(CryptoKeystoreError::Closed)
194 }
195}
196
197impl Deref for ConnectionGuard<'_> {
198 type Target = KeystoreDatabaseConnection;
199
200 fn deref(&self) -> &Self::Target {
201 self.guard
202 .as_ref()
203 .expect("we have exclusive access and already checked that the connection exists")
204 }
205}
206
207impl DerefMut for ConnectionGuard<'_> {
208 fn deref_mut(&mut self) -> &mut Self::Target {
209 self.guard
210 .as_mut()
211 .expect("we have exclusive access and already checked that the connection exists")
212 }
213}
214
215impl Database {
217 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
218 let conn = match location {
219 ConnectionType::Persistent(name) => KeystoreDatabaseConnection::open(name, key).await?,
220 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
221 };
222 let conn = Mutex::new(Some(conn));
223 #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(conn);
225 Ok(Self {
226 conn,
227 transaction: Default::default(),
228 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
229 })
230 }
231
232 pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
234 self.conn.lock().await.try_into()
235 }
236
237 async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
240 let _semaphore = self.transaction_semaphore.acquire_arc().await;
241
242 let mut guard = self.conn.lock().await;
243 guard.take().ok_or(CryptoKeystoreError::Closed)
244 }
245
246 pub async fn close(&self) -> CryptoKeystoreResult<()> {
248 #[cfg(not(target_family = "wasm"))]
249 self.take().await?;
250
251 #[cfg(target_family = "wasm")]
252 {
253 let conn = self.take().await?;
254 conn.close().await?;
255 }
256 Ok(())
257 }
258}
259
260impl Database {
261 pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
263 self.take().await?.wipe().await
264 }
265
266 pub async fn migrate_db_key_type_to_bytes(
267 name: &str,
268 old_key: &str,
269 new_key: &DatabaseKey,
270 ) -> CryptoKeystoreResult<()> {
271 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
272 }
273
274 pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
275 self.conn().await?.update_key(new_key).await
276 }
277
278 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
280 let semaphore = self.transaction_semaphore.acquire_arc().await;
281 let mut transaction_guard = self.transaction.lock().await;
282 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
283 Ok(())
284 }
285
286 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
287 let mut transaction_guard = self.transaction.lock().await;
288 let Some(transaction) = transaction_guard.as_ref() else {
289 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
290 };
291 transaction.commit(self).await?;
292 *transaction_guard = None;
293 Ok(())
294 }
295
296 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
297 let mut transaction_guard = self.transaction.lock().await;
298 if transaction_guard.is_none() {
299 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
300 };
301 *transaction_guard = None;
302 Ok(())
303 }
304
305 pub async fn child_groups<
306 E: Entity<ConnectionType = KeystoreDatabaseConnection> + crate::entities::PersistedMlsGroupExt + Sync,
307 >(
308 &self,
309 entity: E,
310 ) -> CryptoKeystoreResult<Vec<E>> {
311 let mut conn = self.conn().await?;
312 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
313
314 let transaction_guard = self.transaction.lock().await;
315 let Some(transaction) = transaction_guard.as_ref() else {
316 return Ok(persisted_records);
317 };
318 transaction.child_groups(entity, persisted_records).await
319 }
320
321 pub async fn save<E: Entity<ConnectionType = KeystoreDatabaseConnection> + Sync + EntityTransactionExt>(
322 &self,
323 entity: E,
324 ) -> CryptoKeystoreResult<E> {
325 let transaction_guard = self.transaction.lock().await;
326 let Some(transaction) = transaction_guard.as_ref() else {
327 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
328 };
329 transaction.save_mut(entity).await
330 }
331
332 pub async fn remove<
333 E: Entity<ConnectionType = KeystoreDatabaseConnection> + EntityTransactionExt,
334 S: AsRef<[u8]>,
335 >(
336 &self,
337 id: S,
338 ) -> CryptoKeystoreResult<()> {
339 let transaction_guard = self.transaction.lock().await;
340 let Some(transaction) = transaction_guard.as_ref() else {
341 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
342 };
343 transaction.remove::<E, S>(id).await
344 }
345
346 pub async fn find_pending_messages_by_conversation_id(
347 &self,
348 conversation_id: &[u8],
349 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
350 let mut conn = self.conn().await?;
351 let persisted_records =
352 MlsPendingMessage::find_all_by_conversation_id(&mut conn, conversation_id, Default::default()).await?;
353
354 let transaction_guard = self.transaction.lock().await;
355 let Some(transaction) = transaction_guard.as_ref() else {
356 return Ok(persisted_records);
357 };
358 transaction
359 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
360 .await
361 }
362
363 pub async fn remove_pending_messages_by_conversation_id(
364 &self,
365 conversation_id: impl AsRef<[u8]> + Send,
366 ) -> CryptoKeystoreResult<()> {
367 let transaction_guard = self.transaction.lock().await;
368 let Some(transaction) = transaction_guard.as_ref() else {
369 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
370 };
371 transaction
372 .remove_pending_messages_by_conversation_id(conversation_id)
373 .await
374 }
375
376 pub async fn cred_delete_by_credential(&self, cred: Vec<u8>) -> CryptoKeystoreResult<()> {
377 let transaction_guard = self.transaction.lock().await;
378 let Some(transaction) = transaction_guard.as_ref() else {
379 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
380 };
381 transaction.cred_delete_by_credential(cred).await
382 }
383}
384
385#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
386#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
387impl FetchFromDatabase for Database {
388 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
389 &self,
390 id: impl AsRef<[u8]> + Send,
391 ) -> CryptoKeystoreResult<Option<E>> {
392 if let Some(transaction) = self.transaction.lock().await.as_ref()
394 && let Some(cached_record) = transaction.find::<E>(id.as_ref()).await?
396 {
397 return Ok(cached_record);
399 }
400
401 let mut conn = self.conn().await?;
403 E::find_one(&mut conn, &id.as_ref().into()).await
404 }
405
406 async fn find_unique<U: UniqueEntity>(&self) -> CryptoKeystoreResult<U> {
407 if let Some(transaction) = self.transaction.lock().await.as_ref()
409 && let Some(cached_record) = transaction.find_unique::<U>().await?
411 {
412 return Ok(cached_record);
414 }
415 let mut conn = self.conn().await?;
417 U::find_unique(&mut conn).await
418 }
419
420 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
421 &self,
422 params: EntityFindParams,
423 ) -> CryptoKeystoreResult<Vec<E>> {
424 let mut conn = self.conn().await?;
425 let persisted_records = E::find_all(&mut conn, params.clone()).await?;
426
427 let transaction_guard = self.transaction.lock().await;
428 let Some(transaction) = transaction_guard.as_ref() else {
429 return Ok(persisted_records);
430 };
431 transaction.find_all(persisted_records, params).await
432 }
433
434 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
435 &self,
436 ids: &[Vec<u8>],
437 ) -> CryptoKeystoreResult<Vec<E>> {
438 let entity_ids: Vec<StringEntityId> = ids.iter().map(|id| id.as_slice().into()).collect();
439 let mut conn = self.conn().await?;
440 let persisted_records = E::find_many(&mut conn, &entity_ids).await?;
441
442 let transaction_guard = self.transaction.lock().await;
443 let Some(transaction) = transaction_guard.as_ref() else {
444 return Ok(persisted_records);
445 };
446 transaction.find_many(persisted_records, ids).await
447 }
448
449 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize> {
450 if self.transaction.lock().await.is_some() {
451 return Ok(self.find_all::<E>(Default::default()).await?.len());
454 };
455 let mut conn = self.conn().await?;
456 E::count(&mut conn).await
457 }
458}