core_crypto_keystore/connection/
mod.rs1use std::{fmt, ops::Deref};
2
3use sha2::{Digest as _, Sha256};
4use zeroize::{Zeroize, ZeroizeOnDrop};
5
6pub mod platform {
7 cfg_if::cfg_if! {
8 if #[cfg(target_family = "wasm")] {
9 mod wasm;
10 pub use self::wasm::WasmConnection as KeystoreDatabaseConnection;
11 pub use wasm::storage;
12 pub use self::wasm::storage::WasmStorageTransaction as TransactionWrapper;
13 } else {
14 mod generic;
15 pub use self::generic::SqlCipherConnection as KeystoreDatabaseConnection;
16 pub use self::generic::TransactionWrapper;
17 #[cfg(test)]
18 pub(crate) use generic::MigrationTarget;
19
20
21 }
22 }
23}
24
25use std::{ops::DerefMut, sync::Arc};
26
27use async_lock::{Mutex, MutexGuard, Semaphore};
28
29pub use self::platform::*;
30use crate::{
31 CryptoKeystoreError, CryptoKeystoreResult,
32 entities::{Entity, EntityFindParams, EntityTransactionExt, MlsPendingMessage, StringEntityId, UniqueEntity},
33 transaction::KeystoreTransaction,
34};
35
36pub const MAX_BLOB_LEN: usize = 1_000_000_000;
45
46#[cfg(not(target_family = "wasm"))]
47pub trait DatabaseConnectionRequirements: Sized + Send {}
49#[cfg(target_family = "wasm")]
50pub trait DatabaseConnectionRequirements: Sized {}
53
54#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
56pub struct DatabaseKey([u8; Self::LEN]);
57
58impl DatabaseKey {
59 pub const LEN: usize = 32;
60
61 pub fn generate() -> DatabaseKey {
62 DatabaseKey(rand::random::<[u8; Self::LEN]>())
63 }
64}
65
66impl fmt::Debug for DatabaseKey {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
68 f.write_str("DatabaseKey(hash=")?;
69 for x in Sha256::digest(self).as_slice().iter().take(10) {
70 fmt::LowerHex::fmt(x, f)?
71 }
72 f.write_str("...)")
73 }
74}
75
76impl AsRef<[u8]> for DatabaseKey {
77 fn as_ref(&self) -> &[u8] {
78 &self.0
79 }
80}
81
82impl Deref for DatabaseKey {
83 type Target = [u8];
84
85 fn deref(&self) -> &Self::Target {
86 &self.0
87 }
88}
89
90impl TryFrom<&[u8]> for DatabaseKey {
91 type Error = CryptoKeystoreError;
92
93 fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
94 if buf.len() != Self::LEN {
95 Err(CryptoKeystoreError::InvalidDbKeySize {
96 expected: Self::LEN,
97 actual: buf.len(),
98 })
99 } else {
100 Ok(Self(buf.try_into().unwrap()))
101 }
102 }
103}
104
105#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
106#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
107pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
108 type Connection: 'a;
109
110 async fn open(name: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
111
112 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
113
114 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
115
116 async fn wipe(self) -> CryptoKeystoreResult<()>;
118
119 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
120 #[cfg(not(target_family = "wasm"))]
121 if size > i32::MAX as usize {
122 return Err(CryptoKeystoreError::BlobTooBig);
123 }
124
125 if size >= MAX_BLOB_LEN {
126 return Err(CryptoKeystoreError::BlobTooBig);
127 }
128
129 Ok(())
130 }
131}
132
133#[derive(Debug, Clone)]
134pub struct Database {
135 pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
136 pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
137 transaction_semaphore: Arc<Semaphore>,
138}
139
140const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
141
142#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
145#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
146pub trait FetchFromDatabase: Send + Sync {
147 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
148 &self,
149 id: impl AsRef<[u8]> + Send,
150 ) -> CryptoKeystoreResult<Option<E>>;
151
152 async fn find_unique<U: UniqueEntity<ConnectionType = KeystoreDatabaseConnection>>(
153 &self,
154 ) -> CryptoKeystoreResult<U>;
155
156 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
157 &self,
158 params: EntityFindParams,
159 ) -> CryptoKeystoreResult<Vec<E>>;
160
161 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
162 &self,
163 ids: &[Vec<u8>],
164 ) -> CryptoKeystoreResult<Vec<E>>;
165 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize>;
166}
167
168unsafe impl Send for Database {}
170unsafe impl Sync for Database {}
172
173#[derive(Debug, Clone)]
175pub enum ConnectionType<'a> {
176 Persistent(&'a str),
178 InMemory,
180}
181
182pub struct ConnectionGuard<'a> {
187 guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
188}
189
190impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
191 type Error = CryptoKeystoreError;
192
193 fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
194 guard
195 .is_some()
196 .then_some(Self { guard })
197 .ok_or(CryptoKeystoreError::Closed)
198 }
199}
200
201impl Deref for ConnectionGuard<'_> {
202 type Target = KeystoreDatabaseConnection;
203
204 fn deref(&self) -> &Self::Target {
205 self.guard
206 .as_ref()
207 .expect("we have exclusive access and already checked that the connection exists")
208 }
209}
210
211impl DerefMut for ConnectionGuard<'_> {
212 fn deref_mut(&mut self) -> &mut Self::Target {
213 self.guard
214 .as_mut()
215 .expect("we have exclusive access and already checked that the connection exists")
216 }
217}
218
219impl Database {
221 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
222 let conn = match location {
223 ConnectionType::Persistent(name) => KeystoreDatabaseConnection::open(name, key).await?,
224 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
225 };
226 let conn = Mutex::new(Some(conn));
227 #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(conn);
229 Ok(Self {
230 conn,
231 transaction: Default::default(),
232 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
233 })
234 }
235
236 #[cfg(all(test, not(target_family = "wasm")))]
237 pub(crate) async fn open_at_schema_version(
238 name: &str,
239 key: &DatabaseKey,
240 version: MigrationTarget,
241 ) -> CryptoKeystoreResult<Self> {
242 let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
243 let conn = Mutex::new(Some(conn));
244 let conn = Arc::new(conn);
245 Ok(Self {
246 conn,
247 transaction: Default::default(),
248 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
249 })
250 }
251
252 pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
254 self.conn.lock().await.try_into()
255 }
256
257 async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
260 let _semaphore = self.transaction_semaphore.acquire_arc().await;
261
262 let mut guard = self.conn.lock().await;
263 guard.take().ok_or(CryptoKeystoreError::Closed)
264 }
265
266 pub async fn close(&self) -> CryptoKeystoreResult<()> {
268 #[cfg(not(target_family = "wasm"))]
269 self.take().await?;
270
271 #[cfg(target_family = "wasm")]
272 {
273 let conn = self.take().await?;
274 conn.close().await?;
275 }
276 Ok(())
277 }
278}
279
280impl Database {
281 pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
283 self.take().await?.wipe().await
284 }
285
286 pub async fn migrate_db_key_type_to_bytes(
287 name: &str,
288 old_key: &str,
289 new_key: &DatabaseKey,
290 ) -> CryptoKeystoreResult<()> {
291 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
292 }
293
294 pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
295 self.conn().await?.update_key(new_key).await
296 }
297
298 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
300 let semaphore = self.transaction_semaphore.acquire_arc().await;
301 let mut transaction_guard = self.transaction.lock().await;
302 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
303 Ok(())
304 }
305
306 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
307 let mut transaction_guard = self.transaction.lock().await;
308 let Some(transaction) = transaction_guard.as_ref() else {
309 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
310 };
311 transaction.commit(self).await?;
312 *transaction_guard = None;
313 Ok(())
314 }
315
316 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
317 let mut transaction_guard = self.transaction.lock().await;
318 if transaction_guard.is_none() {
319 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
320 };
321 *transaction_guard = None;
322 Ok(())
323 }
324
325 pub async fn child_groups<
326 E: Entity<ConnectionType = KeystoreDatabaseConnection> + crate::entities::PersistedMlsGroupExt + Sync,
327 >(
328 &self,
329 entity: E,
330 ) -> CryptoKeystoreResult<Vec<E>> {
331 let mut conn = self.conn().await?;
332 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
333
334 let transaction_guard = self.transaction.lock().await;
335 let Some(transaction) = transaction_guard.as_ref() else {
336 return Ok(persisted_records);
337 };
338 transaction.child_groups(entity, persisted_records).await
339 }
340
341 pub async fn save<E: Entity<ConnectionType = KeystoreDatabaseConnection> + Sync + EntityTransactionExt>(
342 &self,
343 entity: E,
344 ) -> CryptoKeystoreResult<E> {
345 let transaction_guard = self.transaction.lock().await;
346 let Some(transaction) = transaction_guard.as_ref() else {
347 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
348 };
349 transaction.save_mut(entity).await
350 }
351
352 pub async fn remove<
353 E: Entity<ConnectionType = KeystoreDatabaseConnection> + EntityTransactionExt,
354 S: AsRef<[u8]>,
355 >(
356 &self,
357 id: S,
358 ) -> CryptoKeystoreResult<()> {
359 let transaction_guard = self.transaction.lock().await;
360 let Some(transaction) = transaction_guard.as_ref() else {
361 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
362 };
363 transaction.remove::<E, S>(id).await
364 }
365
366 pub async fn find_pending_messages_by_conversation_id(
367 &self,
368 conversation_id: &[u8],
369 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
370 let mut conn = self.conn().await?;
371 let persisted_records =
372 MlsPendingMessage::find_all_by_conversation_id(&mut conn, conversation_id, Default::default()).await?;
373
374 let transaction_guard = self.transaction.lock().await;
375 let Some(transaction) = transaction_guard.as_ref() else {
376 return Ok(persisted_records);
377 };
378 transaction
379 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
380 .await
381 }
382
383 pub async fn remove_pending_messages_by_conversation_id(
384 &self,
385 conversation_id: impl AsRef<[u8]> + Send,
386 ) -> CryptoKeystoreResult<()> {
387 let transaction_guard = self.transaction.lock().await;
388 let Some(transaction) = transaction_guard.as_ref() else {
389 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
390 };
391 transaction
392 .remove_pending_messages_by_conversation_id(conversation_id)
393 .await
394 }
395}
396
397#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
398#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
399impl FetchFromDatabase for Database {
400 async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
401 &self,
402 id: impl AsRef<[u8]> + Send,
403 ) -> CryptoKeystoreResult<Option<E>> {
404 if let Some(transaction) = self.transaction.lock().await.as_ref()
406 && let Some(cached_record) = transaction.find::<E>(id.as_ref()).await?
408 {
409 return Ok(cached_record);
411 }
412
413 let mut conn = self.conn().await?;
415 E::find_one(&mut conn, &id.as_ref().into()).await
416 }
417
418 async fn find_unique<U: UniqueEntity>(&self) -> CryptoKeystoreResult<U> {
419 if let Some(transaction) = self.transaction.lock().await.as_ref()
421 && let Some(cached_record) = transaction.find_unique::<U>().await?
423 {
424 return Ok(cached_record);
426 }
427 let mut conn = self.conn().await?;
429 U::find_unique(&mut conn).await
430 }
431
432 async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
433 &self,
434 params: EntityFindParams,
435 ) -> CryptoKeystoreResult<Vec<E>> {
436 let mut conn = self.conn().await?;
437 let persisted_records = E::find_all(&mut conn, params.clone()).await?;
438
439 let transaction_guard = self.transaction.lock().await;
440 let Some(transaction) = transaction_guard.as_ref() else {
441 return Ok(persisted_records);
442 };
443 transaction.find_all(persisted_records, params).await
444 }
445
446 async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
447 &self,
448 ids: &[Vec<u8>],
449 ) -> CryptoKeystoreResult<Vec<E>> {
450 let entity_ids: Vec<StringEntityId> = ids.iter().map(|id| id.as_slice().into()).collect();
451 let mut conn = self.conn().await?;
452 let persisted_records = E::find_many(&mut conn, &entity_ids).await?;
453
454 let transaction_guard = self.transaction.lock().await;
455 let Some(transaction) = transaction_guard.as_ref() else {
456 return Ok(persisted_records);
457 };
458 transaction.find_many(persisted_records, ids).await
459 }
460
461 async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize> {
462 if self.transaction.lock().await.is_some() {
463 return Ok(self.find_all::<E>(Default::default()).await?.len());
466 };
467 let mut conn = self.conn().await?;
468 E::count(&mut conn).await
469 }
470}