core_crypto_keystore/connection/
mod.rs

1use std::{fmt, ops::Deref};
2
3use sha2::{Digest as _, Sha256};
4use zeroize::{Zeroize, ZeroizeOnDrop};
5
6pub mod platform {
7    cfg_if::cfg_if! {
8        if #[cfg(target_family = "wasm")] {
9            mod wasm;
10            pub use self::wasm::WasmConnection as KeystoreDatabaseConnection;
11            pub use wasm::storage;
12            pub use self::wasm::storage::WasmStorageTransaction as TransactionWrapper;
13        } else {
14            mod generic;
15            pub use self::generic::SqlCipherConnection as KeystoreDatabaseConnection;
16            pub use self::generic::TransactionWrapper;
17            #[cfg(test)]
18            pub(crate) use generic::MigrationTarget;
19
20
21        }
22    }
23}
24
25use std::{ops::DerefMut, sync::Arc};
26
27use async_lock::{Mutex, MutexGuard, Semaphore};
28
29pub use self::platform::*;
30use crate::{
31    CryptoKeystoreError, CryptoKeystoreResult,
32    entities::{Entity, EntityFindParams, EntityTransactionExt, MlsPendingMessage, StringEntityId, UniqueEntity},
33    transaction::KeystoreTransaction,
34};
35
36/// Limit on the length of a blob to be stored in the database.
37///
38/// This limit applies to both SQLCipher-backed stores and WASM.
39/// This limit is conservative on purpose when targeting WASM, as the lower bound that exists is Safari with a limit of
40/// 1GB per origin.
41///
42/// See: [SQLite limits](https://www.sqlite.org/limits.html)
43/// See: [IndexedDB limits](https://stackoverflow.com/a/63019999/1934177)
44pub const MAX_BLOB_LEN: usize = 1_000_000_000;
45
46#[cfg(not(target_family = "wasm"))]
47// ? Because of UniFFI async requirements, we need our keystore to be Send as well now
48pub trait DatabaseConnectionRequirements: Sized + Send {}
49#[cfg(target_family = "wasm")]
50// ? On the other hand, things cannot be Send on WASM because of platform restrictions (all things are copied across the
51// FFI)
52pub trait DatabaseConnectionRequirements: Sized {}
53
54/// The key used to encrypt the database.
55#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
56pub struct DatabaseKey([u8; Self::LEN]);
57
58impl DatabaseKey {
59    pub const LEN: usize = 32;
60
61    pub fn generate() -> DatabaseKey {
62        DatabaseKey(rand::random::<[u8; Self::LEN]>())
63    }
64}
65
66impl fmt::Debug for DatabaseKey {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
68        f.write_str("DatabaseKey(hash=")?;
69        for x in Sha256::digest(self).as_slice().iter().take(10) {
70            fmt::LowerHex::fmt(x, f)?
71        }
72        f.write_str("...)")
73    }
74}
75
76impl AsRef<[u8]> for DatabaseKey {
77    fn as_ref(&self) -> &[u8] {
78        &self.0
79    }
80}
81
82impl Deref for DatabaseKey {
83    type Target = [u8];
84
85    fn deref(&self) -> &Self::Target {
86        &self.0
87    }
88}
89
90impl TryFrom<&[u8]> for DatabaseKey {
91    type Error = CryptoKeystoreError;
92
93    fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
94        if buf.len() != Self::LEN {
95            Err(CryptoKeystoreError::InvalidDbKeySize {
96                expected: Self::LEN,
97                actual: buf.len(),
98            })
99        } else {
100            Ok(Self(buf.try_into().unwrap()))
101        }
102    }
103}
104
105#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
106#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
107pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
108    type Connection: 'a;
109
110    async fn open(name: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
111
112    async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
113
114    async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
115
116    /// Clear all data from the database and close it.
117    async fn wipe(self) -> CryptoKeystoreResult<()>;
118
119    fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
120        #[cfg(not(target_family = "wasm"))]
121        if size > i32::MAX as usize {
122            return Err(CryptoKeystoreError::BlobTooBig);
123        }
124
125        if size >= MAX_BLOB_LEN {
126            return Err(CryptoKeystoreError::BlobTooBig);
127        }
128
129        Ok(())
130    }
131}
132
133#[derive(Debug, Clone)]
134pub struct Database {
135    pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
136    pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
137    transaction_semaphore: Arc<Semaphore>,
138}
139
140const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
141
142/// Interface to fetch from the database either from the connection directly or through a
143/// transaaction
144#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
145#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
146pub trait FetchFromDatabase: Send + Sync {
147    async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
148        &self,
149        id: impl AsRef<[u8]> + Send,
150    ) -> CryptoKeystoreResult<Option<E>>;
151
152    async fn find_unique<U: UniqueEntity<ConnectionType = KeystoreDatabaseConnection>>(
153        &self,
154    ) -> CryptoKeystoreResult<U>;
155
156    async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
157        &self,
158        params: EntityFindParams,
159    ) -> CryptoKeystoreResult<Vec<E>>;
160
161    async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
162        &self,
163        ids: &[Vec<u8>],
164    ) -> CryptoKeystoreResult<Vec<E>>;
165    async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize>;
166}
167
168// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
169unsafe impl Send for Database {}
170// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
171unsafe impl Sync for Database {}
172
173/// Where to open a connection
174#[derive(Debug, Clone)]
175pub enum ConnectionType<'a> {
176    /// This connection is persistent at the provided path
177    Persistent(&'a str),
178    /// This connection is transient and lives in memory
179    InMemory,
180}
181
182/// Exclusive access to the database connection
183///
184/// Note that this is only ever constructed when we already hold exclusive access,
185/// and the connection has already been tested to ensure that it is non-empty.
186pub struct ConnectionGuard<'a> {
187    guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
188}
189
190impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
191    type Error = CryptoKeystoreError;
192
193    fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
194        guard
195            .is_some()
196            .then_some(Self { guard })
197            .ok_or(CryptoKeystoreError::Closed)
198    }
199}
200
201impl Deref for ConnectionGuard<'_> {
202    type Target = KeystoreDatabaseConnection;
203
204    fn deref(&self) -> &Self::Target {
205        self.guard
206            .as_ref()
207            .expect("we have exclusive access and already checked that the connection exists")
208    }
209}
210
211impl DerefMut for ConnectionGuard<'_> {
212    fn deref_mut(&mut self) -> &mut Self::Target {
213        self.guard
214            .as_mut()
215            .expect("we have exclusive access and already checked that the connection exists")
216    }
217}
218
219// Only the functions in this impl block directly mess with `self.conn`
220impl Database {
221    pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
222        let conn = match location {
223            ConnectionType::Persistent(name) => KeystoreDatabaseConnection::open(name, key).await?,
224            ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
225        };
226        let conn = Mutex::new(Some(conn));
227        #[allow(clippy::arc_with_non_send_sync)] // see https://github.com/rustwasm/wasm-bindgen/pull/955
228        let conn = Arc::new(conn);
229        Ok(Self {
230            conn,
231            transaction: Default::default(),
232            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
233        })
234    }
235
236    #[cfg(all(test, not(target_family = "wasm")))]
237    pub(crate) async fn open_at_schema_version(
238        name: &str,
239        key: &DatabaseKey,
240        version: MigrationTarget,
241    ) -> CryptoKeystoreResult<Self> {
242        let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
243        let conn = Mutex::new(Some(conn));
244        let conn = Arc::new(conn);
245        Ok(Self {
246            conn,
247            transaction: Default::default(),
248            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
249        })
250    }
251
252    /// Get direct exclusive access to the connection.
253    pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
254        self.conn.lock().await.try_into()
255    }
256
257    /// Wait for any running transaction to finish, then take the connection out of this database,
258    /// preventing this database from being used again.
259    async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
260        let _semaphore = self.transaction_semaphore.acquire_arc().await;
261
262        let mut guard = self.conn.lock().await;
263        guard.take().ok_or(CryptoKeystoreError::Closed)
264    }
265
266    // Close this database connection
267    pub async fn close(&self) -> CryptoKeystoreResult<()> {
268        #[cfg(not(target_family = "wasm"))]
269        self.take().await?;
270
271        #[cfg(target_family = "wasm")]
272        {
273            let conn = self.take().await?;
274            conn.close().await?;
275        }
276        Ok(())
277    }
278}
279
280impl Database {
281    /// Close this database and delete its contents.
282    pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
283        self.take().await?.wipe().await
284    }
285
286    pub async fn migrate_db_key_type_to_bytes(
287        name: &str,
288        old_key: &str,
289        new_key: &DatabaseKey,
290    ) -> CryptoKeystoreResult<()> {
291        KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
292    }
293
294    pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
295        self.conn().await?.update_key(new_key).await
296    }
297
298    /// Waits for the current transaction to be committed or rolled back, then starts a new one.
299    pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
300        let semaphore = self.transaction_semaphore.acquire_arc().await;
301        let mut transaction_guard = self.transaction.lock().await;
302        *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
303        Ok(())
304    }
305
306    pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
307        let mut transaction_guard = self.transaction.lock().await;
308        let Some(transaction) = transaction_guard.as_ref() else {
309            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
310        };
311        transaction.commit(self).await?;
312        *transaction_guard = None;
313        Ok(())
314    }
315
316    pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
317        let mut transaction_guard = self.transaction.lock().await;
318        if transaction_guard.is_none() {
319            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
320        };
321        *transaction_guard = None;
322        Ok(())
323    }
324
325    pub async fn child_groups<
326        E: Entity<ConnectionType = KeystoreDatabaseConnection> + crate::entities::PersistedMlsGroupExt + Sync,
327    >(
328        &self,
329        entity: E,
330    ) -> CryptoKeystoreResult<Vec<E>> {
331        let mut conn = self.conn().await?;
332        let persisted_records = entity.child_groups(conn.deref_mut()).await?;
333
334        let transaction_guard = self.transaction.lock().await;
335        let Some(transaction) = transaction_guard.as_ref() else {
336            return Ok(persisted_records);
337        };
338        transaction.child_groups(entity, persisted_records).await
339    }
340
341    pub async fn save<E: Entity<ConnectionType = KeystoreDatabaseConnection> + Sync + EntityTransactionExt>(
342        &self,
343        entity: E,
344    ) -> CryptoKeystoreResult<E> {
345        let transaction_guard = self.transaction.lock().await;
346        let Some(transaction) = transaction_guard.as_ref() else {
347            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
348        };
349        transaction.save_mut(entity).await
350    }
351
352    pub async fn remove<
353        E: Entity<ConnectionType = KeystoreDatabaseConnection> + EntityTransactionExt,
354        S: AsRef<[u8]>,
355    >(
356        &self,
357        id: S,
358    ) -> CryptoKeystoreResult<()> {
359        let transaction_guard = self.transaction.lock().await;
360        let Some(transaction) = transaction_guard.as_ref() else {
361            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
362        };
363        transaction.remove::<E, S>(id).await
364    }
365
366    pub async fn find_pending_messages_by_conversation_id(
367        &self,
368        conversation_id: &[u8],
369    ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
370        let mut conn = self.conn().await?;
371        let persisted_records =
372            MlsPendingMessage::find_all_by_conversation_id(&mut conn, conversation_id, Default::default()).await?;
373
374        let transaction_guard = self.transaction.lock().await;
375        let Some(transaction) = transaction_guard.as_ref() else {
376            return Ok(persisted_records);
377        };
378        transaction
379            .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
380            .await
381    }
382
383    pub async fn remove_pending_messages_by_conversation_id(
384        &self,
385        conversation_id: impl AsRef<[u8]> + Send,
386    ) -> CryptoKeystoreResult<()> {
387        let transaction_guard = self.transaction.lock().await;
388        let Some(transaction) = transaction_guard.as_ref() else {
389            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
390        };
391        transaction
392            .remove_pending_messages_by_conversation_id(conversation_id)
393            .await
394    }
395}
396
397#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
398#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
399impl FetchFromDatabase for Database {
400    async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
401        &self,
402        id: impl AsRef<[u8]> + Send,
403    ) -> CryptoKeystoreResult<Option<E>> {
404        // If a transaction is in progress...
405        if let Some(transaction) = self.transaction.lock().await.as_ref()
406            //... and it has information about this entity, ...
407            && let Some(cached_record) = transaction.find::<E>(id.as_ref()).await?
408        {
409            // ... return that result
410            return Ok(cached_record);
411        }
412
413        // Otherwise get it from the database
414        let mut conn = self.conn().await?;
415        E::find_one(&mut conn, &id.as_ref().into()).await
416    }
417
418    async fn find_unique<U: UniqueEntity>(&self) -> CryptoKeystoreResult<U> {
419        // If a transaction is in progress...
420        if let Some(transaction) = self.transaction.lock().await.as_ref()
421            //... and it has information about this entity, ...
422            && let Some(cached_record) = transaction.find_unique::<U>().await?
423        {
424            // ... return that result
425            return Ok(cached_record);
426        }
427        // Otherwise get it from the database
428        let mut conn = self.conn().await?;
429        U::find_unique(&mut conn).await
430    }
431
432    async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
433        &self,
434        params: EntityFindParams,
435    ) -> CryptoKeystoreResult<Vec<E>> {
436        let mut conn = self.conn().await?;
437        let persisted_records = E::find_all(&mut conn, params.clone()).await?;
438
439        let transaction_guard = self.transaction.lock().await;
440        let Some(transaction) = transaction_guard.as_ref() else {
441            return Ok(persisted_records);
442        };
443        transaction.find_all(persisted_records, params).await
444    }
445
446    async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
447        &self,
448        ids: &[Vec<u8>],
449    ) -> CryptoKeystoreResult<Vec<E>> {
450        let entity_ids: Vec<StringEntityId> = ids.iter().map(|id| id.as_slice().into()).collect();
451        let mut conn = self.conn().await?;
452        let persisted_records = E::find_many(&mut conn, &entity_ids).await?;
453
454        let transaction_guard = self.transaction.lock().await;
455        let Some(transaction) = transaction_guard.as_ref() else {
456            return Ok(persisted_records);
457        };
458        transaction.find_many(persisted_records, ids).await
459    }
460
461    async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize> {
462        if self.transaction.lock().await.is_some() {
463            // Unfortunately, we have to do this because of possible record id overlap
464            // between cache and db.
465            return Ok(self.find_all::<E>(Default::default()).await?.len());
466        };
467        let mut conn = self.conn().await?;
468        E::count(&mut conn).await
469    }
470}