core_crypto_keystore/connection/
mod.rs

1pub mod platform;
2
3use std::{
4    borrow::Borrow,
5    fmt,
6    ops::{Deref, DerefMut},
7    sync::Arc,
8};
9
10use async_lock::{Mutex, MutexGuard, Semaphore};
11use async_trait::async_trait;
12use sha2::{Digest as _, Sha256};
13use zeroize::{Zeroize, ZeroizeOnDrop};
14
15pub use self::platform::*;
16use crate::{
17    CryptoKeystoreError, CryptoKeystoreResult,
18    entities::{MlsPendingMessage, PersistedMlsGroup},
19    traits::{
20        BorrowPrimaryKey, Entity, EntityDatabaseMutation, EntityDeleteBorrowed, EntityGetBorrowed, FetchFromDatabase,
21        KeyType, SearchableEntity,
22    },
23    transaction::KeystoreTransaction,
24};
25
26/// Limit on the length of a blob to be stored in the database.
27///
28/// This limit applies to both SQLCipher-backed stores and WASM.
29/// This limit is conservative on purpose when targeting WASM, as the lower bound that exists is Safari with a limit of
30/// 1GB per origin.
31///
32/// See: [SQLite limits](https://www.sqlite.org/limits.html)
33/// See: [IndexedDB limits](https://stackoverflow.com/a/63019999/1934177)
34pub const MAX_BLOB_LEN: usize = 1_000_000_000;
35
36#[cfg(not(target_family = "wasm"))]
37// ? Because of UniFFI async requirements, we need our keystore to be Send as well now
38pub trait DatabaseConnectionRequirements: Sized + Send {}
39#[cfg(target_family = "wasm")]
40// ? On the other hand, things cannot be Send on WASM because of platform restrictions (all things are copied across the
41// FFI)
42pub trait DatabaseConnectionRequirements: Sized {}
43
44/// The key used to encrypt the database.
45#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
46pub struct DatabaseKey([u8; Self::LEN]);
47
48impl DatabaseKey {
49    pub const LEN: usize = 32;
50
51    pub fn generate() -> DatabaseKey {
52        DatabaseKey(rand::random::<[u8; Self::LEN]>())
53    }
54}
55
56impl fmt::Debug for DatabaseKey {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
58        f.write_str("DatabaseKey(hash=")?;
59        for x in Sha256::digest(self).as_slice().iter().take(10) {
60            fmt::LowerHex::fmt(x, f)?
61        }
62        f.write_str("...)")
63    }
64}
65
66impl AsRef<[u8]> for DatabaseKey {
67    fn as_ref(&self) -> &[u8] {
68        &self.0
69    }
70}
71
72impl Deref for DatabaseKey {
73    type Target = [u8];
74
75    fn deref(&self) -> &Self::Target {
76        &self.0
77    }
78}
79
80impl TryFrom<&[u8]> for DatabaseKey {
81    type Error = CryptoKeystoreError;
82
83    fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
84        if buf.len() != Self::LEN {
85            Err(CryptoKeystoreError::InvalidDbKeySize {
86                expected: Self::LEN,
87                actual: buf.len(),
88            })
89        } else {
90            Ok(Self(buf.try_into().unwrap()))
91        }
92    }
93}
94
95#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
96#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
97pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
98    type Connection: 'a;
99
100    async fn open(location: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
101
102    async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
103
104    async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
105
106    /// Clear all data from the database and close it.
107    async fn wipe(self) -> CryptoKeystoreResult<()>;
108
109    fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
110        #[cfg(not(target_family = "wasm"))]
111        if size > i32::MAX as usize {
112            return Err(CryptoKeystoreError::BlobTooBig);
113        }
114
115        if size >= MAX_BLOB_LEN {
116            return Err(CryptoKeystoreError::BlobTooBig);
117        }
118
119        Ok(())
120    }
121
122    /// Returns the database location if persistent or None if in-memory
123    fn location(&self) -> Option<&str>;
124}
125
126#[derive(Debug, Clone)]
127pub struct Database {
128    pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
129    pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
130    transaction_semaphore: Arc<Semaphore>,
131}
132
133const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
134
135// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
136unsafe impl Send for Database {}
137// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
138unsafe impl Sync for Database {}
139
140/// Where to open a connection
141#[derive(Debug, Clone)]
142pub enum ConnectionType<'a> {
143    /// This connection is persistent at the provided path
144    Persistent(&'a str),
145    /// This connection is transient and lives in memory
146    InMemory,
147}
148
149/// Exclusive access to the database connection
150///
151/// Note that this is only ever constructed when we already hold exclusive access,
152/// and the connection has already been tested to ensure that it is non-empty.
153pub struct ConnectionGuard<'a> {
154    guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
155}
156
157impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
158    type Error = CryptoKeystoreError;
159
160    fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
161        guard
162            .is_some()
163            .then_some(Self { guard })
164            .ok_or(CryptoKeystoreError::Closed)
165    }
166}
167
168impl Deref for ConnectionGuard<'_> {
169    type Target = KeystoreDatabaseConnection;
170
171    fn deref(&self) -> &Self::Target {
172        self.guard
173            .as_ref()
174            .expect("we have exclusive access and already checked that the connection exists")
175    }
176}
177
178impl DerefMut for ConnectionGuard<'_> {
179    fn deref_mut(&mut self) -> &mut Self::Target {
180        self.guard
181            .as_mut()
182            .expect("we have exclusive access and already checked that the connection exists")
183    }
184}
185
186// Only the functions in this impl block directly mess with `self.conn`
187impl Database {
188    pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
189        let conn = match location {
190            ConnectionType::Persistent(location) => KeystoreDatabaseConnection::open(location, key).await?,
191            ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
192        };
193        let conn = Mutex::new(Some(conn));
194        #[allow(clippy::arc_with_non_send_sync)] // see https://github.com/rustwasm/wasm-bindgen/pull/955
195        let conn = Arc::new(conn);
196        Ok(Self {
197            conn,
198            transaction: Default::default(),
199            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
200        })
201    }
202
203    #[cfg(all(test, not(target_family = "wasm")))]
204    pub(crate) async fn open_at_schema_version(
205        name: &str,
206        key: &DatabaseKey,
207        version: MigrationTarget,
208    ) -> CryptoKeystoreResult<Self> {
209        let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
210        let conn = Mutex::new(Some(conn));
211        let conn = Arc::new(conn);
212        Ok(Self {
213            conn,
214            transaction: Default::default(),
215            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
216        })
217    }
218
219    pub async fn location(&self) -> CryptoKeystoreResult<Option<String>> {
220        return Ok(self.conn().await?.location().map(ToString::to_string));
221    }
222
223    /// Get direct exclusive access to the connection.
224    pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
225        self.conn.lock().await.try_into()
226    }
227
228    /// Wait for any running transaction to finish, then take the connection out of this database,
229    /// preventing this database from being used again.
230    async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
231        let _semaphore = self.transaction_semaphore.acquire_arc().await;
232
233        let mut guard = self.conn.lock().await;
234        guard.take().ok_or(CryptoKeystoreError::Closed)
235    }
236
237    // Close this database connection
238    pub async fn close(&self) -> CryptoKeystoreResult<()> {
239        #[cfg(not(target_family = "wasm"))]
240        self.take().await?;
241
242        #[cfg(target_family = "wasm")]
243        {
244            let conn = self.take().await?;
245            conn.close().await?;
246        }
247        Ok(())
248    }
249
250    /// Close this database and delete its contents.
251    pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
252        self.take().await?.wipe().await
253    }
254
255    /// Export a copy of the database to the specified path.
256    /// This creates a fully vacuumed and optimized copy of the database.
257    /// The copy will be encrypted with the same key as the source database.
258    ///
259    /// # Platform Support
260    /// This method is only available on platforms using SQLCipher (not WASM).
261    ///
262    /// # Arguments
263    /// * `destination_path` - The file path where the database copy should be created
264    ///
265    /// # Errors
266    /// Returns an error if:
267    /// - The database is in-memory (cannot export in-memory databases)
268    /// - The destination path is invalid
269    /// - The export operation fails
270    #[cfg(not(target_family = "wasm"))]
271    pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
272        let conn = self.conn().await?;
273        conn.export_copy(destination_path).await
274    }
275
276    pub async fn migrate_db_key_type_to_bytes(
277        name: &str,
278        old_key: &str,
279        new_key: &DatabaseKey,
280    ) -> CryptoKeystoreResult<()> {
281        KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
282    }
283
284    pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
285        self.conn().await?.update_key(new_key).await
286    }
287
288    /// Waits for the current transaction to be committed or rolled back, then starts a new one.
289    pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
290        let semaphore = self.transaction_semaphore.acquire_arc().await;
291        let mut transaction_guard = self.transaction.lock().await;
292        *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
293        Ok(())
294    }
295
296    pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
297        let mut transaction_guard = self.transaction.lock().await;
298        let Some(transaction) = transaction_guard.as_ref() else {
299            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
300        };
301        transaction.commit(self).await?;
302        *transaction_guard = None;
303        Ok(())
304    }
305
306    pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
307        let mut transaction_guard = self.transaction.lock().await;
308        if transaction_guard.is_none() {
309            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
310        };
311        *transaction_guard = None;
312        Ok(())
313    }
314
315    pub async fn child_groups(&self, entity: PersistedMlsGroup) -> CryptoKeystoreResult<Vec<PersistedMlsGroup>> {
316        let mut conn = self.conn().await?;
317        let persisted_records = entity.child_groups(conn.deref_mut()).await?;
318
319        let transaction_guard = self.transaction.lock().await;
320        let Some(transaction) = transaction_guard.as_ref() else {
321            return Ok(persisted_records);
322        };
323        transaction.child_groups(entity, persisted_records).await
324    }
325
326    pub async fn save<'a, E>(&self, entity: E) -> CryptoKeystoreResult<E::AutoGeneratedFields>
327    where
328        E: Entity + EntityDatabaseMutation<'a> + Send + Sync,
329    {
330        let transaction_guard = self.transaction.lock().await;
331        let Some(transaction) = transaction_guard.as_ref() else {
332            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
333        };
334        transaction.save(entity).await
335    }
336
337    pub async fn remove<'a, E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<()>
338    where
339        E: Entity + EntityDatabaseMutation<'a>,
340    {
341        let transaction_guard = self.transaction.lock().await;
342        let Some(transaction) = transaction_guard.as_ref() else {
343            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
344        };
345        transaction.remove::<E>(id).await
346    }
347
348    pub async fn remove_borrowed<'a, E>(&self, id: &E::BorrowedPrimaryKey) -> CryptoKeystoreResult<()>
349    where
350        E: Entity + EntityDatabaseMutation<'a> + BorrowPrimaryKey + EntityDeleteBorrowed<'a>,
351    {
352        let transaction_guard = self.transaction.lock().await;
353        let Some(transaction) = transaction_guard.as_ref() else {
354            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
355        };
356        transaction.remove_borrowed::<E>(id).await
357    }
358
359    pub async fn find_pending_messages_by_conversation_id(
360        &self,
361        conversation_id: &[u8],
362    ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
363        let mut conn = self.conn().await?;
364        let persisted_records = MlsPendingMessage::find_all_matching(&mut conn, &conversation_id.into()).await?;
365
366        let transaction_guard = self.transaction.lock().await;
367        let Some(transaction) = transaction_guard.as_ref() else {
368            return Ok(persisted_records);
369        };
370        transaction
371            .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
372            .await
373    }
374
375    pub async fn remove_pending_messages_by_conversation_id(
376        &self,
377        conversation_id: impl AsRef<[u8]> + Send,
378    ) -> CryptoKeystoreResult<()> {
379        let transaction_guard = self.transaction.lock().await;
380        let Some(transaction) = transaction_guard.as_ref() else {
381            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
382        };
383        transaction
384            .remove_pending_messages_by_conversation_id(conversation_id)
385            .await;
386        Ok(())
387    }
388}
389
390#[cfg_attr(target_family = "wasm", async_trait(?Send))]
391#[cfg_attr(not(target_family = "wasm"), async_trait)]
392impl FetchFromDatabase for Database {
393    async fn get<E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<Option<E>>
394    where
395        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
396    {
397        // If a transaction is in progress...
398        if let Some(transaction) = self.transaction.lock().await.as_ref()
399            //... and it has information about this entity, ...
400            && let Some(cached_record) = transaction.get(id).await
401        {
402            return Ok(cached_record.map(Arc::unwrap_or_clone));
403        }
404
405        // Otherwise get it from the database
406        let mut conn = self.conn().await?;
407        E::get(&mut conn, id).await
408    }
409
410    async fn get_borrowed<E>(&self, id: &<E as BorrowPrimaryKey>::BorrowedPrimaryKey) -> CryptoKeystoreResult<Option<E>>
411    where
412        E: EntityGetBorrowed<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
413        E::PrimaryKey: Borrow<E::BorrowedPrimaryKey>,
414        for<'a> &'a E::BorrowedPrimaryKey: KeyType,
415    {
416        // If a transaction is in progress...
417        if let Some(transaction) = self.transaction.lock().await.as_ref()
418            //... and it has information about this entity, ...
419            && let Some(cached_record) = transaction.get_borrowed(id).await
420        {
421            return Ok(cached_record.map(Arc::unwrap_or_clone));
422        }
423
424        // Otherwise get it from the database
425        let mut conn = self.conn().await?;
426        E::get_borrowed(&mut conn, id).await
427    }
428
429    async fn count<E>(&self) -> CryptoKeystoreResult<u32>
430    where
431        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
432    {
433        if self.transaction.lock().await.is_some() {
434            // Unfortunately, we have to do this because of possible record id overlap
435            // between cache and db.
436            let count = self.load_all::<E>().await?.len();
437            Ok(count as _)
438        } else {
439            let mut conn = self.conn().await?;
440            E::count(&mut conn).await
441        }
442    }
443
444    async fn load_all<E>(&self) -> CryptoKeystoreResult<Vec<E>>
445    where
446        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
447    {
448        let mut conn = self.conn().await?;
449        let persisted_records = E::load_all(&mut conn).await?;
450
451        let transaction_guard = self.transaction.lock().await;
452        let Some(transaction) = transaction_guard.as_ref() else {
453            return Ok(persisted_records);
454        };
455        transaction.find_all(persisted_records).await
456    }
457
458    async fn search<E, SearchKey>(&self, search_key: &SearchKey) -> CryptoKeystoreResult<Vec<E>>
459    where
460        E: Entity<ConnectionType = KeystoreDatabaseConnection> + SearchableEntity<SearchKey> + Clone + Send + Sync,
461        SearchKey: KeyType,
462    {
463        let mut conn = self.conn().await?;
464        let persisted_records = E::find_all_matching(&mut conn, search_key).await?;
465
466        let transaction_guard = self.transaction.lock().await;
467        let Some(transaction) = transaction_guard.as_ref() else {
468            return Ok(persisted_records);
469        };
470
471        transaction.search(persisted_records, search_key).await
472    }
473}