core_crypto_keystore/connection/
mod.rs1use std::fmt;
2use std::ops::Deref;
3
4use sha2::{Digest as _, Sha256};
5use zeroize::{Zeroize, ZeroizeOnDrop};
6
7pub mod platform {
8 cfg_if::cfg_if! {
9 if #[cfg(target_family = "wasm")] {
10 mod wasm;
11 pub use self::wasm::WasmConnection as KeystoreDatabaseConnection;
12 pub use wasm::storage;
13 pub use self::wasm::storage::WasmStorageTransaction as TransactionWrapper;
14 } else {
15 mod generic;
16 pub use self::generic::SqlCipherConnection as KeystoreDatabaseConnection;
17 pub use self::generic::TransactionWrapper;
18 }
19 }
20}
21
22pub use self::platform::*;
23use crate::entities::{Entity, EntityFindParams, MlsPendingMessage, StringEntityId};
24use std::ops::DerefMut;
25
26use crate::entities::{EntityTransactionExt, UniqueEntity};
27use crate::transaction::KeystoreTransaction;
28use crate::{CryptoKeystoreError, CryptoKeystoreResult};
29use async_lock::{Mutex, MutexGuard, Semaphore};
30use std::sync::Arc;
31
32pub const MAX_BLOB_LEN: usize = 1_000_000_000;
40
41#[cfg(not(target_family = "wasm"))]
42pub trait DatabaseConnectionRequirements: Sized + Send {}
44#[cfg(target_family = "wasm")]
45pub trait DatabaseConnectionRequirements: Sized {}
47
48#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
50pub struct DatabaseKey([u8; Self::LEN]);
51
52impl DatabaseKey {
53 pub const LEN: usize = 32;
54
55 pub fn generate() -> DatabaseKey {
56 DatabaseKey(rand::random::<[u8; Self::LEN]>())
57 }
58}
59
60impl fmt::Debug for DatabaseKey {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
62 f.write_str("DatabaseKey(hash=")?;
63 for x in Sha256::digest(self).as_slice().iter().take(10) {
64 fmt::LowerHex::fmt(x, f)?
65 }
66 f.write_str("...)")
67 }
68}
69
70impl AsRef<[u8]> for DatabaseKey {
71 fn as_ref(&self) -> &[u8] {
72 &self.0
73 }
74}
75
76impl Deref for DatabaseKey {
77 type Target = [u8];
78
79 fn deref(&self) -> &Self::Target {
80 &self.0
81 }
82}
83
84impl TryFrom<&[u8]> for DatabaseKey {
85 type Error = CryptoKeystoreError;
86
87 fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
88 if buf.len() != Self::LEN {
89 Err(CryptoKeystoreError::InvalidDbKeySize {
90 expected: Self::LEN,
91 actual: buf.len(),
92 })
93 } else {
94 Ok(Self(buf.try_into().unwrap()))
95 }
96 }
97}
98
99#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
100#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
101pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
102 type Connection: 'a;
103
104 async fn open(name: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
105
106 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
107
108 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
109
110 async fn close(self) -> CryptoKeystoreResult<()>;
111
112 async fn wipe(self) -> CryptoKeystoreResult<()> {
114 self.close().await
115 }
116
117 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
118 #[cfg(not(target_family = "wasm"))]
119 if size > i32::MAX as usize {
120 return Err(CryptoKeystoreError::BlobTooBig);
121 }
122
123 if size >= MAX_BLOB_LEN {
124 return Err(CryptoKeystoreError::BlobTooBig);
125 }
126
127 Ok(())
128 }
129}
130
131#[derive(Debug, Clone)]
132pub struct Database {
133 pub(crate) conn: Arc<Mutex<KeystoreDatabaseConnection>>,
134 pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
135 transaction_semaphore: Arc<Semaphore>,
136}
137
138const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
139
140#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
143#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
144pub trait FetchFromDatabase: Send + Sync {
145 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
146 &self,
147 id: &[u8],
148 ) -> CryptoKeystoreResult<Option<E>>;
149
150 async fn find_unique<U: UniqueEntity<ConnectionType = KeystoreDatabaseConnection>>(
151 &self,
152 ) -> CryptoKeystoreResult<U>;
153
154 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
155 &self,
156 params: EntityFindParams,
157 ) -> CryptoKeystoreResult<Vec<E>>;
158
159 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
160 &self,
161 ids: &[Vec<u8>],
162 ) -> CryptoKeystoreResult<Vec<E>>;
163 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize>;
164}
165
166unsafe impl Send for Database {}
168unsafe impl Sync for Database {}
170
171#[derive(Debug, Clone)]
173pub enum ConnectionType<'a> {
174 Persistent(&'a str),
176 InMemory,
178}
179
180impl Database {
181 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
182 let conn = match location {
183 ConnectionType::Persistent(name) => KeystoreDatabaseConnection::open(name, key).await?.into(),
184 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?.into(),
185 };
186 #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(conn);
188 Ok(Self {
189 conn,
190 transaction: Default::default(),
191 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
192 })
193 }
194
195 pub async fn borrow_conn(&self) -> CryptoKeystoreResult<MutexGuard<'_, KeystoreDatabaseConnection>> {
196 Ok(self.conn.lock().await)
197 }
198
199 #[cfg(not(target_family = "wasm"))]
215 pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
216 let conn = self.conn.lock().await;
217 conn.export_copy(destination_path).await
218 }
219
220 pub async fn migrate_db_key_type_to_bytes(
221 name: &str,
222 old_key: &str,
223 new_key: &DatabaseKey,
224 ) -> CryptoKeystoreResult<()> {
225 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
226 }
227
228 pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
229 self.conn.lock().await.update_key(new_key).await
230 }
231
232 pub async fn wipe(self) -> CryptoKeystoreResult<()> {
233 if self.transaction.lock().await.is_some() {
234 return Err(CryptoKeystoreError::TransactionInProgress {
235 attempted_operation: "wipe()".to_string(),
236 });
237 }
238 let conn: KeystoreDatabaseConnection = Arc::into_inner(self.conn).unwrap().into_inner();
239 conn.wipe().await?;
240 Ok(())
241 }
242
243 pub async fn close(self) -> CryptoKeystoreResult<()> {
245 let _semaphore = self.transaction_semaphore.acquire_arc().await;
247 let Some(conn) = Arc::into_inner(self.conn) else {
249 return Err(CryptoKeystoreError::CannotClose);
250 };
251 let conn = conn.into_inner();
252 conn.close().await?;
253 Ok(())
254 }
255
256 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
258 let semaphore = self.transaction_semaphore.acquire_arc().await;
259 let mut transaction_guard = self.transaction.lock().await;
260 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
261 Ok(())
262 }
263
264 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
265 let mut transaction_guard = self.transaction.lock().await;
266 let Some(transaction) = transaction_guard.as_ref() else {
267 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
268 };
269 transaction.commit(self).await?;
270 *transaction_guard = None;
271 Ok(())
272 }
273
274 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
275 let mut transaction_guard = self.transaction.lock().await;
276 if transaction_guard.is_none() {
277 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
278 };
279 *transaction_guard = None;
280 Ok(())
281 }
282
283 pub async fn child_groups<
284 E: Entity<ConnectionType = KeystoreDatabaseConnection> + crate::entities::PersistedMlsGroupExt + Sync,
285 >(
286 &self,
287 entity: E,
288 ) -> CryptoKeystoreResult<Vec<E>> {
289 let mut conn = self.conn.lock().await;
290 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
291
292 let transaction_guard = self.transaction.lock().await;
293 let Some(transaction) = transaction_guard.as_ref() else {
294 return Ok(persisted_records);
295 };
296 transaction.child_groups(entity, persisted_records).await
297 }
298
299 pub async fn save<E: Entity<ConnectionType = KeystoreDatabaseConnection> + Sync + EntityTransactionExt>(
300 &self,
301 entity: E,
302 ) -> CryptoKeystoreResult<E> {
303 let transaction_guard = self.transaction.lock().await;
304 let Some(transaction) = transaction_guard.as_ref() else {
305 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
306 };
307 transaction.save_mut(entity).await
308 }
309
310 pub async fn remove<
311 E: Entity<ConnectionType = KeystoreDatabaseConnection> + EntityTransactionExt,
312 S: AsRef<[u8]>,
313 >(
314 &self,
315 id: S,
316 ) -> CryptoKeystoreResult<()> {
317 let transaction_guard = self.transaction.lock().await;
318 let Some(transaction) = transaction_guard.as_ref() else {
319 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
320 };
321 transaction.remove::<E, S>(id).await
322 }
323
324 pub async fn find_pending_messages_by_conversation_id(
325 &self,
326 conversation_id: &[u8],
327 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
328 let mut conn = self.conn.lock().await;
329 let persisted_records =
330 MlsPendingMessage::find_all_by_conversation_id(&mut conn, conversation_id, Default::default()).await?;
331
332 let transaction_guard = self.transaction.lock().await;
333 let Some(transaction) = transaction_guard.as_ref() else {
334 return Ok(persisted_records);
335 };
336 transaction
337 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
338 .await
339 }
340
341 pub async fn remove_pending_messages_by_conversation_id(&self, conversation_id: &[u8]) -> CryptoKeystoreResult<()> {
342 let transaction_guard = self.transaction.lock().await;
343 let Some(transaction) = transaction_guard.as_ref() else {
344 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
345 };
346 transaction
347 .remove_pending_messages_by_conversation_id(conversation_id)
348 .await
349 }
350
351 pub async fn cred_delete_by_credential(&self, cred: Vec<u8>) -> CryptoKeystoreResult<()> {
352 let transaction_guard = self.transaction.lock().await;
353 let Some(transaction) = transaction_guard.as_ref() else {
354 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
355 };
356 transaction.cred_delete_by_credential(cred).await
357 }
358}
359
360#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
361#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
362impl FetchFromDatabase for Database {
363 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
364 &self,
365 id: &[u8],
366 ) -> CryptoKeystoreResult<Option<E>> {
367 if let Some(transaction) = self.transaction.lock().await.as_ref()
369 && let Some(cached_record) = transaction.find::<E>(id).await?
371 {
372 return Ok(cached_record);
374 }
375
376 let mut conn = self.conn.lock().await;
378 E::find_one(&mut conn, &id.into()).await
379 }
380
381 async fn find_unique<U: UniqueEntity>(&self) -> CryptoKeystoreResult<U> {
382 if let Some(transaction) = self.transaction.lock().await.as_ref()
384 && let Some(cached_record) = transaction.find_unique::<U>().await?
386 {
387 return Ok(cached_record);
389 }
390 let mut conn = self.conn.lock().await;
392 U::find_unique(&mut conn).await
393 }
394
395 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
396 &self,
397 params: EntityFindParams,
398 ) -> CryptoKeystoreResult<Vec<E>> {
399 let mut conn = self.conn.lock().await;
400 let persisted_records = E::find_all(&mut conn, params.clone()).await?;
401
402 let transaction_guard = self.transaction.lock().await;
403 let Some(transaction) = transaction_guard.as_ref() else {
404 return Ok(persisted_records);
405 };
406 transaction.find_all(persisted_records, params).await
407 }
408
409 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
410 &self,
411 ids: &[Vec<u8>],
412 ) -> CryptoKeystoreResult<Vec<E>> {
413 let entity_ids: Vec<StringEntityId> = ids.iter().map(|id| id.as_slice().into()).collect();
414 let mut conn = self.conn.lock().await;
415 let persisted_records = E::find_many(&mut conn, &entity_ids).await?;
416
417 let transaction_guard = self.transaction.lock().await;
418 let Some(transaction) = transaction_guard.as_ref() else {
419 return Ok(persisted_records);
420 };
421 transaction.find_many(persisted_records, ids).await
422 }
423
424 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize> {
425 if self.transaction.lock().await.is_some() {
426 return Ok(self.find_all::<E>(Default::default()).await?.len());
429 };
430 let mut conn = self.conn.lock().await;
431 E::count(&mut conn).await
432 }
433}