core_crypto_keystore/connection/
mod.rs

1use std::{fmt, ops::Deref};
2
3use sha2::{Digest as _, Sha256};
4use zeroize::{Zeroize, ZeroizeOnDrop};
5
6pub mod platform {
7    cfg_if::cfg_if! {
8        if #[cfg(target_family = "wasm")] {
9            mod wasm;
10            pub use self::wasm::WasmConnection as KeystoreDatabaseConnection;
11            pub use wasm::storage;
12            pub use self::wasm::storage::WasmStorageTransaction as TransactionWrapper;
13        } else {
14            mod generic;
15            pub use self::generic::SqlCipherConnection as KeystoreDatabaseConnection;
16            pub use self::generic::TransactionWrapper;
17        }
18    }
19}
20
21use std::{ops::DerefMut, sync::Arc};
22
23use async_lock::{Mutex, MutexGuard, Semaphore};
24
25pub use self::platform::*;
26use crate::{
27    CryptoKeystoreError, CryptoKeystoreResult,
28    entities::{Entity, EntityFindParams, EntityTransactionExt, MlsPendingMessage, StringEntityId, UniqueEntity},
29    transaction::KeystoreTransaction,
30};
31
32/// Limit on the length of a blob to be stored in the database.
33///
34/// This limit applies to both SQLCipher-backed stores and WASM.
35/// This limit is conservative on purpose when targeting WASM, as the lower bound that exists is Safari with a limit of
36/// 1GB per origin.
37///
38/// See: [SQLite limits](https://www.sqlite.org/limits.html)
39/// See: [IndexedDB limits](https://stackoverflow.com/a/63019999/1934177)
40pub const MAX_BLOB_LEN: usize = 1_000_000_000;
41
42#[cfg(not(target_family = "wasm"))]
43// ? Because of UniFFI async requirements, we need our keystore to be Send as well now
44pub trait DatabaseConnectionRequirements: Sized + Send {}
45#[cfg(target_family = "wasm")]
46// ? On the other hand, things cannot be Send on WASM because of platform restrictions (all things are copied across the
47// FFI)
48pub trait DatabaseConnectionRequirements: Sized {}
49
50/// The key used to encrypt the database.
51#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
52pub struct DatabaseKey([u8; Self::LEN]);
53
54impl DatabaseKey {
55    pub const LEN: usize = 32;
56
57    pub fn generate() -> DatabaseKey {
58        DatabaseKey(rand::random::<[u8; Self::LEN]>())
59    }
60}
61
62impl fmt::Debug for DatabaseKey {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
64        f.write_str("DatabaseKey(hash=")?;
65        for x in Sha256::digest(self).as_slice().iter().take(10) {
66            fmt::LowerHex::fmt(x, f)?
67        }
68        f.write_str("...)")
69    }
70}
71
72impl AsRef<[u8]> for DatabaseKey {
73    fn as_ref(&self) -> &[u8] {
74        &self.0
75    }
76}
77
78impl Deref for DatabaseKey {
79    type Target = [u8];
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl TryFrom<&[u8]> for DatabaseKey {
87    type Error = CryptoKeystoreError;
88
89    fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
90        if buf.len() != Self::LEN {
91            Err(CryptoKeystoreError::InvalidDbKeySize {
92                expected: Self::LEN,
93                actual: buf.len(),
94            })
95        } else {
96            Ok(Self(buf.try_into().unwrap()))
97        }
98    }
99}
100
101#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
102#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
103pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
104    type Connection: 'a;
105
106    async fn open(name: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
107
108    async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
109
110    async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
111
112    /// Clear all data from the database and close it.
113    async fn wipe(self) -> CryptoKeystoreResult<()>;
114
115    fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
116        #[cfg(not(target_family = "wasm"))]
117        if size > i32::MAX as usize {
118            return Err(CryptoKeystoreError::BlobTooBig);
119        }
120
121        if size >= MAX_BLOB_LEN {
122            return Err(CryptoKeystoreError::BlobTooBig);
123        }
124
125        Ok(())
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct Database {
131    pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
132    pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
133    transaction_semaphore: Arc<Semaphore>,
134}
135
136const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
137
138/// Interface to fetch from the database either from the connection directly or through a
139/// transaaction
140#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
141#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
142pub trait FetchFromDatabase: Send + Sync {
143    async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
144        &self,
145        id: impl AsRef<[u8]> + Send,
146    ) -> CryptoKeystoreResult<Option<E>>;
147
148    async fn find_unique<U: UniqueEntity<ConnectionType = KeystoreDatabaseConnection>>(
149        &self,
150    ) -> CryptoKeystoreResult<U>;
151
152    async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
153        &self,
154        params: EntityFindParams,
155    ) -> CryptoKeystoreResult<Vec<E>>;
156
157    async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
158        &self,
159        ids: &[Vec<u8>],
160    ) -> CryptoKeystoreResult<Vec<E>>;
161    async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize>;
162}
163
164// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
165unsafe impl Send for Database {}
166// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
167unsafe impl Sync for Database {}
168
169/// Where to open a connection
170#[derive(Debug, Clone)]
171pub enum ConnectionType<'a> {
172    /// This connection is persistent at the provided path
173    Persistent(&'a str),
174    /// This connection is transient and lives in memory
175    InMemory,
176}
177
178/// Exclusive access to the database connection
179///
180/// Note that this is only ever constructed when we already hold exclusive access,
181/// and the connection has already been tested to ensure that it is non-empty.
182pub struct ConnectionGuard<'a> {
183    guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
184}
185
186impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
187    type Error = CryptoKeystoreError;
188
189    fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
190        guard
191            .is_some()
192            .then_some(Self { guard })
193            .ok_or(CryptoKeystoreError::Closed)
194    }
195}
196
197impl Deref for ConnectionGuard<'_> {
198    type Target = KeystoreDatabaseConnection;
199
200    fn deref(&self) -> &Self::Target {
201        self.guard
202            .as_ref()
203            .expect("we have exclusive access and already checked that the connection exists")
204    }
205}
206
207impl DerefMut for ConnectionGuard<'_> {
208    fn deref_mut(&mut self) -> &mut Self::Target {
209        self.guard
210            .as_mut()
211            .expect("we have exclusive access and already checked that the connection exists")
212    }
213}
214
215// Only the functions in this impl block directly mess with `self.conn`
216impl Database {
217    pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
218        let conn = match location {
219            ConnectionType::Persistent(name) => KeystoreDatabaseConnection::open(name, key).await?,
220            ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
221        };
222        let conn = Mutex::new(Some(conn));
223        #[allow(clippy::arc_with_non_send_sync)] // see https://github.com/rustwasm/wasm-bindgen/pull/955
224        let conn = Arc::new(conn);
225        Ok(Self {
226            conn,
227            transaction: Default::default(),
228            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
229        })
230    }
231
232    /// Get direct exclusive access to the connection.
233    pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
234        self.conn.lock().await.try_into()
235    }
236
237    /// Wait for any running transaction to finish, then take the connection out of this database,
238    /// preventing this database from being used again.
239    async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
240        let _semaphore = self.transaction_semaphore.acquire_arc().await;
241
242        let mut guard = self.conn.lock().await;
243        guard.take().ok_or(CryptoKeystoreError::Closed)
244    }
245
246    // Close this database connection
247    pub async fn close(&self) -> CryptoKeystoreResult<()> {
248        #[cfg(not(target_family = "wasm"))]
249        self.take().await?;
250
251        #[cfg(target_family = "wasm")]
252        {
253            let conn = self.take().await?;
254            conn.close().await?;
255        }
256        Ok(())
257    }
258}
259
260impl Database {
261    /// Close this database and delete its contents.
262    pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
263        self.take().await?.wipe().await
264    }
265
266    pub async fn migrate_db_key_type_to_bytes(
267        name: &str,
268        old_key: &str,
269        new_key: &DatabaseKey,
270    ) -> CryptoKeystoreResult<()> {
271        KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
272    }
273
274    pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
275        self.conn().await?.update_key(new_key).await
276    }
277
278    /// Waits for the current transaction to be committed or rolled back, then starts a new one.
279    pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
280        let semaphore = self.transaction_semaphore.acquire_arc().await;
281        let mut transaction_guard = self.transaction.lock().await;
282        *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
283        Ok(())
284    }
285
286    pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
287        let mut transaction_guard = self.transaction.lock().await;
288        let Some(transaction) = transaction_guard.as_ref() else {
289            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
290        };
291        transaction.commit(self).await?;
292        *transaction_guard = None;
293        Ok(())
294    }
295
296    pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
297        let mut transaction_guard = self.transaction.lock().await;
298        if transaction_guard.is_none() {
299            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
300        };
301        *transaction_guard = None;
302        Ok(())
303    }
304
305    pub async fn child_groups<
306        E: Entity<ConnectionType = KeystoreDatabaseConnection> + crate::entities::PersistedMlsGroupExt + Sync,
307    >(
308        &self,
309        entity: E,
310    ) -> CryptoKeystoreResult<Vec<E>> {
311        let mut conn = self.conn().await?;
312        let persisted_records = entity.child_groups(conn.deref_mut()).await?;
313
314        let transaction_guard = self.transaction.lock().await;
315        let Some(transaction) = transaction_guard.as_ref() else {
316            return Ok(persisted_records);
317        };
318        transaction.child_groups(entity, persisted_records).await
319    }
320
321    pub async fn save<E: Entity<ConnectionType = KeystoreDatabaseConnection> + Sync + EntityTransactionExt>(
322        &self,
323        entity: E,
324    ) -> CryptoKeystoreResult<E> {
325        let transaction_guard = self.transaction.lock().await;
326        let Some(transaction) = transaction_guard.as_ref() else {
327            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
328        };
329        transaction.save_mut(entity).await
330    }
331
332    pub async fn remove<
333        E: Entity<ConnectionType = KeystoreDatabaseConnection> + EntityTransactionExt,
334        S: AsRef<[u8]>,
335    >(
336        &self,
337        id: S,
338    ) -> CryptoKeystoreResult<()> {
339        let transaction_guard = self.transaction.lock().await;
340        let Some(transaction) = transaction_guard.as_ref() else {
341            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
342        };
343        transaction.remove::<E, S>(id).await
344    }
345
346    pub async fn find_pending_messages_by_conversation_id(
347        &self,
348        conversation_id: &[u8],
349    ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
350        let mut conn = self.conn().await?;
351        let persisted_records =
352            MlsPendingMessage::find_all_by_conversation_id(&mut conn, conversation_id, Default::default()).await?;
353
354        let transaction_guard = self.transaction.lock().await;
355        let Some(transaction) = transaction_guard.as_ref() else {
356            return Ok(persisted_records);
357        };
358        transaction
359            .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
360            .await
361    }
362
363    pub async fn remove_pending_messages_by_conversation_id(
364        &self,
365        conversation_id: impl AsRef<[u8]> + Send,
366    ) -> CryptoKeystoreResult<()> {
367        let transaction_guard = self.transaction.lock().await;
368        let Some(transaction) = transaction_guard.as_ref() else {
369            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
370        };
371        transaction
372            .remove_pending_messages_by_conversation_id(conversation_id)
373            .await
374    }
375
376    pub async fn cred_delete_by_credential(&self, cred: Vec<u8>) -> CryptoKeystoreResult<()> {
377        let transaction_guard = self.transaction.lock().await;
378        let Some(transaction) = transaction_guard.as_ref() else {
379            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
380        };
381        transaction.cred_delete_by_credential(cred).await
382    }
383}
384
385#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
386#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
387impl FetchFromDatabase for Database {
388    async fn find<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
389        &self,
390        id: impl AsRef<[u8]> + Send,
391    ) -> CryptoKeystoreResult<Option<E>> {
392        // If a transaction is in progress...
393        if let Some(transaction) = self.transaction.lock().await.as_ref()
394            //... and it has information about this entity, ...
395            && let Some(cached_record) = transaction.find::<E>(id.as_ref()).await?
396        {
397            // ... return that result
398            return Ok(cached_record);
399        }
400
401        // Otherwise get it from the database
402        let mut conn = self.conn().await?;
403        E::find_one(&mut conn, &id.as_ref().into()).await
404    }
405
406    async fn find_unique<U: UniqueEntity>(&self) -> CryptoKeystoreResult<U> {
407        // If a transaction is in progress...
408        if let Some(transaction) = self.transaction.lock().await.as_ref()
409            //... and it has information about this entity, ...
410            && let Some(cached_record) = transaction.find_unique::<U>().await?
411        {
412            // ... return that result
413            return Ok(cached_record);
414        }
415        // Otherwise get it from the database
416        let mut conn = self.conn().await?;
417        U::find_unique(&mut conn).await
418    }
419
420    async fn find_all<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
421        &self,
422        params: EntityFindParams,
423    ) -> CryptoKeystoreResult<Vec<E>> {
424        let mut conn = self.conn().await?;
425        let persisted_records = E::find_all(&mut conn, params.clone()).await?;
426
427        let transaction_guard = self.transaction.lock().await;
428        let Some(transaction) = transaction_guard.as_ref() else {
429            return Ok(persisted_records);
430        };
431        transaction.find_all(persisted_records, params).await
432    }
433
434    async fn find_many<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(
435        &self,
436        ids: &[Vec<u8>],
437    ) -> CryptoKeystoreResult<Vec<E>> {
438        let entity_ids: Vec<StringEntityId> = ids.iter().map(|id| id.as_slice().into()).collect();
439        let mut conn = self.conn().await?;
440        let persisted_records = E::find_many(&mut conn, &entity_ids).await?;
441
442        let transaction_guard = self.transaction.lock().await;
443        let Some(transaction) = transaction_guard.as_ref() else {
444            return Ok(persisted_records);
445        };
446        transaction.find_many(persisted_records, ids).await
447    }
448
449    async fn count<E: Entity<ConnectionType = KeystoreDatabaseConnection>>(&self) -> CryptoKeystoreResult<usize> {
450        if self.transaction.lock().await.is_some() {
451            // Unfortunately, we have to do this because of possible record id overlap
452            // between cache and db.
453            return Ok(self.find_all::<E>(Default::default()).await?.len());
454        };
455        let mut conn = self.conn().await?;
456        E::count(&mut conn).await
457    }
458}