Skip to main content

core_crypto_keystore/connection/
mod.rs

1pub mod platform;
2
3use std::{
4    borrow::Borrow,
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use async_lock::{Mutex, MutexGuard, Semaphore};
10use async_trait::async_trait;
11
12pub use self::platform::*;
13use crate::{
14    CryptoKeystoreError, CryptoKeystoreResult, DatabaseKey,
15    entities::{MlsPendingMessage, PersistedMlsGroup},
16    traits::{
17        BorrowPrimaryKey, Entity, EntityDatabaseMutation, EntityDeleteBorrowed, EntityGetBorrowed, FetchFromDatabase,
18        KeyType, SearchableEntity,
19    },
20    transaction::KeystoreTransaction,
21};
22
23/// Limit on the length of a blob to be stored in the database.
24///
25/// This limit applies to both SQLCipher-backed stores and WASM.
26/// This limit is conservative on purpose when targeting WASM, as the lower bound that exists is Safari with a limit of
27/// 1GB per origin.
28///
29/// See: [SQLite limits](https://www.sqlite.org/limits.html)
30/// See: [IndexedDB limits](https://stackoverflow.com/a/63019999/1934177)
31pub const MAX_BLOB_LEN: usize = 1_000_000_000;
32
33#[cfg(not(target_os = "unknown"))]
34// ? Because of UniFFI async requirements, we need our keystore to be Send as well now
35pub trait DatabaseConnectionRequirements: Sized + Send {}
36#[cfg(target_os = "unknown")]
37// ? On the other hand, things cannot be Send on WASM because of platform restrictions (all things are copied across the
38// FFI)
39pub trait DatabaseConnectionRequirements: Sized {}
40
41#[cfg_attr(target_os = "unknown", async_trait::async_trait(?Send))]
42#[cfg_attr(not(target_os = "unknown"), async_trait::async_trait)]
43pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
44    type Connection: 'a;
45
46    async fn open(location: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
47
48    async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
49
50    async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
51
52    /// Clear all data from the database and close it.
53    async fn wipe(self) -> CryptoKeystoreResult<()>;
54
55    fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
56        #[cfg(not(target_os = "unknown"))]
57        if size > i32::MAX as usize {
58            return Err(CryptoKeystoreError::BlobTooBig);
59        }
60
61        if size >= MAX_BLOB_LEN {
62            return Err(CryptoKeystoreError::BlobTooBig);
63        }
64
65        Ok(())
66    }
67
68    /// Returns the database location if persistent or None if in-memory
69    fn location(&self) -> Option<&str>;
70}
71
72#[derive(Debug)]
73pub struct Database {
74    pub(crate) conn: Mutex<Option<KeystoreDatabaseConnection>>,
75    pub(crate) transaction: Mutex<Option<KeystoreTransaction>>,
76    // we need an internal Arc here so we can hand out `SemaphoreGuardArc`
77    // instances without keeping references with lifetimes to the semaphore
78    transaction_semaphore: Arc<Semaphore>,
79}
80
81const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
82
83// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
84unsafe impl Send for Database {}
85// SAFETY: this has mutexes and atomics protecting underlying data so this is safe to share between threads
86unsafe impl Sync for Database {}
87
88/// Where to open a connection
89#[derive(Debug, Clone)]
90pub enum ConnectionType<'a> {
91    /// This connection is persistent at the provided path
92    Persistent(&'a str),
93    /// This connection is transient and lives in memory
94    InMemory,
95}
96
97/// Exclusive access to the database connection
98///
99/// Note that this is only ever constructed when we already hold exclusive access,
100/// and the connection has already been tested to ensure that it is non-empty.
101pub struct ConnectionGuard<'a> {
102    guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
103}
104
105impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
106    type Error = CryptoKeystoreError;
107
108    fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
109        guard
110            .is_some()
111            .then_some(Self { guard })
112            .ok_or(CryptoKeystoreError::Closed)
113    }
114}
115
116impl Deref for ConnectionGuard<'_> {
117    type Target = KeystoreDatabaseConnection;
118
119    fn deref(&self) -> &Self::Target {
120        self.guard
121            .as_ref()
122            .expect("we have exclusive access and already checked that the connection exists")
123    }
124}
125
126impl DerefMut for ConnectionGuard<'_> {
127    fn deref_mut(&mut self) -> &mut Self::Target {
128        self.guard
129            .as_mut()
130            .expect("we have exclusive access and already checked that the connection exists")
131    }
132}
133
134// Only the functions in this impl block directly mess with `self.conn`
135impl Database {
136    pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Arc<Self>> {
137        let conn = match location {
138            ConnectionType::Persistent(location) => KeystoreDatabaseConnection::open(location, key).await?,
139            ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
140        };
141        let conn = Mutex::new(Some(conn));
142        Ok(Self {
143            conn,
144            transaction: Default::default(),
145            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
146        }
147        .into())
148    }
149
150    #[cfg(all(test, not(target_os = "unknown")))]
151    pub(crate) async fn open_at_schema_version(
152        name: &str,
153        key: &DatabaseKey,
154        version: MigrationTarget,
155    ) -> CryptoKeystoreResult<Self> {
156        let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
157        let conn = Mutex::new(Some(conn));
158        Ok(Self {
159            conn,
160            transaction: Default::default(),
161            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
162        })
163    }
164
165    #[cfg(all(test, target_os = "unknown"))]
166    pub(crate) async fn open_at_schema_version(
167        name: &str,
168        key: &DatabaseKey,
169        version: Option<u32>,
170    ) -> CryptoKeystoreResult<Arc<Self>> {
171        use crate::connection::{
172            storage::{WasmEncryptedStorage, WasmStorageWrapper},
173            wasm::migrations::{TARGET_VERSION, open_at},
174        };
175
176        let version = version.unwrap_or(TARGET_VERSION);
177        let idb_database = open_at(name, key, version).await;
178        let wasm_connection = KeystoreDatabaseConnection::from_inner(WasmEncryptedStorage::new(
179            key,
180            WasmStorageWrapper::Persistent(idb_database),
181        ));
182        let conn = Mutex::new(Some(wasm_connection));
183        Ok(Self {
184            conn,
185            transaction: Default::default(),
186            transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
187        }
188        .into())
189    }
190
191    pub async fn location(&self) -> CryptoKeystoreResult<Option<String>> {
192        return Ok(self.conn().await?.location().map(ToString::to_string));
193    }
194
195    /// Get direct exclusive access to the connection.
196    pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
197        self.conn.lock().await.try_into()
198    }
199}
200
201// These and all other database impls shold not refer directly to `self.conn` but should go through the `self.conn()`
202// wrapper
203impl Database {
204    /// Wait for any running transaction to finish, then take the connection out of this database,
205    /// preventing this database from being used again.
206    async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
207        let _semaphore = self.transaction_semaphore.acquire_arc().await;
208
209        let mut guard = self.conn.lock().await;
210        guard.take().ok_or(CryptoKeystoreError::Closed)
211    }
212
213    // Close this database connection
214    pub async fn close(&self) -> CryptoKeystoreResult<()> {
215        #[cfg(not(target_os = "unknown"))]
216        self.take().await?;
217
218        #[cfg(target_os = "unknown")]
219        {
220            let conn = self.take().await?;
221            conn.close().await?;
222        }
223        Ok(())
224    }
225
226    /// Close this database and delete its contents.
227    pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
228        self.take().await?.wipe().await
229    }
230
231    /// Export a copy of the database to the specified path.
232    /// This creates a fully vacuumed and optimized copy of the database.
233    /// The copy will be encrypted with the same key as the source database.
234    ///
235    /// # Platform Support
236    /// This method is only available on platforms using SQLCipher (not WASM).
237    ///
238    /// # Arguments
239    /// * `destination_path` - The file path where the database copy should be created
240    ///
241    /// # Errors
242    /// Returns an error if:
243    /// - The database is in-memory (cannot export in-memory databases)
244    /// - The destination path is invalid
245    /// - The export operation fails
246    #[cfg(not(target_os = "unknown"))]
247    pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
248        let conn = self.conn().await?;
249        conn.export_copy(destination_path).await
250    }
251
252    pub async fn migrate_db_key_type_to_bytes(
253        name: &str,
254        old_key: &str,
255        new_key: &DatabaseKey,
256    ) -> CryptoKeystoreResult<()> {
257        KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
258    }
259
260    pub async fn update_key(&self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
261        self.conn().await?.update_key(new_key).await
262    }
263
264    /// Waits for the current transaction to be committed or rolled back, then starts a new one.
265    pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
266        let semaphore = self.transaction_semaphore.acquire_arc().await;
267        let mut transaction_guard = self.transaction.lock().await;
268        *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
269        Ok(())
270    }
271
272    /// Start a new transaction if no other transaction is currently in progress.
273    ///
274    /// If a transaction is currently in progress, this will produce a `TransactionInProgress` error.
275    pub async fn try_new_immediate_transaction(&self) -> CryptoKeystoreResult<()> {
276        let semaphore = self
277            .transaction_semaphore
278            .try_acquire_arc()
279            .ok_or(CryptoKeystoreError::TransactionInProgress)?;
280        let mut transaction_guard = self.transaction.lock().await;
281        *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
282        Ok(())
283    }
284
285    pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
286        let mut transaction_guard = self.transaction.lock().await;
287        let Some(transaction) = transaction_guard.as_ref() else {
288            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
289        };
290        transaction.commit(self).await?;
291        *transaction_guard = None;
292        Ok(())
293    }
294
295    pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
296        let mut transaction_guard = self.transaction.lock().await;
297        if transaction_guard.is_none() {
298            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
299        };
300        *transaction_guard = None;
301        Ok(())
302    }
303
304    pub async fn child_groups(&self, entity: PersistedMlsGroup) -> CryptoKeystoreResult<Vec<PersistedMlsGroup>> {
305        let mut conn = self.conn().await?;
306        let persisted_records = entity.child_groups(conn.deref_mut()).await?;
307
308        let transaction_guard = self.transaction.lock().await;
309        let Some(transaction) = transaction_guard.as_ref() else {
310            return Ok(persisted_records);
311        };
312        transaction.child_groups(entity, persisted_records).await
313    }
314
315    pub async fn save<'a, E>(&self, entity: E) -> CryptoKeystoreResult<E::AutoGeneratedFields>
316    where
317        E: Entity + EntityDatabaseMutation<'a> + Send + Sync,
318    {
319        let transaction_guard = self.transaction.lock().await;
320        let Some(transaction) = transaction_guard.as_ref() else {
321            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
322        };
323        transaction.save(entity).await
324    }
325
326    pub async fn remove<'a, E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<()>
327    where
328        E: Entity + EntityDatabaseMutation<'a>,
329    {
330        let transaction_guard = self.transaction.lock().await;
331        let Some(transaction) = transaction_guard.as_ref() else {
332            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
333        };
334        transaction.remove::<E>(id).await
335    }
336
337    pub async fn remove_borrowed<'a, E>(&self, id: &E::BorrowedPrimaryKey) -> CryptoKeystoreResult<()>
338    where
339        E: Entity + EntityDatabaseMutation<'a> + BorrowPrimaryKey + EntityDeleteBorrowed<'a>,
340    {
341        let transaction_guard = self.transaction.lock().await;
342        let Some(transaction) = transaction_guard.as_ref() else {
343            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
344        };
345        transaction.remove_borrowed::<E>(id).await
346    }
347
348    pub async fn find_pending_messages_by_conversation_id(
349        &self,
350        conversation_id: &[u8],
351    ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
352        let mut conn = self.conn().await?;
353        let persisted_records = MlsPendingMessage::find_all_matching(&mut conn, &conversation_id.into()).await?;
354
355        let transaction_guard = self.transaction.lock().await;
356        let Some(transaction) = transaction_guard.as_ref() else {
357            return Ok(persisted_records);
358        };
359        transaction
360            .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
361            .await
362    }
363
364    pub async fn remove_pending_messages_by_conversation_id(
365        &self,
366        conversation_id: impl AsRef<[u8]> + Send,
367    ) -> CryptoKeystoreResult<()> {
368        let transaction_guard = self.transaction.lock().await;
369        let Some(transaction) = transaction_guard.as_ref() else {
370            return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
371        };
372        transaction
373            .remove_pending_messages_by_conversation_id(conversation_id)
374            .await;
375        Ok(())
376    }
377}
378
379#[cfg_attr(target_os = "unknown", async_trait(?Send))]
380#[cfg_attr(not(target_os = "unknown"), async_trait)]
381impl FetchFromDatabase for Database {
382    async fn get<E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<Option<E>>
383    where
384        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
385    {
386        // If a transaction is in progress...
387        if let Some(transaction) = self.transaction.lock().await.as_ref()
388            //... and it has information about this entity, ...
389            && let Some(cached_record) = transaction.get(id).await
390        {
391            return Ok(cached_record.map(Arc::unwrap_or_clone));
392        }
393
394        // Otherwise get it from the database
395        let mut conn = self.conn().await?;
396        E::get(&mut conn, id).await
397    }
398
399    async fn get_borrowed<E>(&self, id: &<E as BorrowPrimaryKey>::BorrowedPrimaryKey) -> CryptoKeystoreResult<Option<E>>
400    where
401        E: EntityGetBorrowed<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
402        E::PrimaryKey: Borrow<E::BorrowedPrimaryKey>,
403        for<'a> &'a E::BorrowedPrimaryKey: KeyType,
404    {
405        // If a transaction is in progress...
406        if let Some(transaction) = self.transaction.lock().await.as_ref()
407            //... and it has information about this entity, ...
408            && let Some(cached_record) = transaction.get_borrowed(id).await
409        {
410            return Ok(cached_record.map(Arc::unwrap_or_clone));
411        }
412
413        // Otherwise get it from the database
414        let mut conn = self.conn().await?;
415        E::get_borrowed(&mut conn, id).await
416    }
417
418    async fn count<E>(&self) -> CryptoKeystoreResult<u32>
419    where
420        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
421    {
422        if self.transaction.lock().await.is_some() {
423            // Unfortunately, we have to do this because of possible record id overlap
424            // between cache and db.
425            let count = self.load_all::<E>().await?.len();
426            Ok(count as _)
427        } else {
428            let mut conn = self.conn().await?;
429            E::count(&mut conn).await
430        }
431    }
432
433    async fn load_all<E>(&self) -> CryptoKeystoreResult<Vec<E>>
434    where
435        E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
436    {
437        let mut conn = self.conn().await?;
438        let persisted_records = E::load_all(&mut conn).await?;
439
440        let transaction_guard = self.transaction.lock().await;
441        let Some(transaction) = transaction_guard.as_ref() else {
442            return Ok(persisted_records);
443        };
444        transaction.find_all(persisted_records).await
445    }
446
447    async fn search<E, SearchKey>(&self, search_key: &SearchKey) -> CryptoKeystoreResult<Vec<E>>
448    where
449        E: Entity<ConnectionType = KeystoreDatabaseConnection> + SearchableEntity<SearchKey> + Clone + Send + Sync,
450        SearchKey: KeyType,
451    {
452        let mut conn = self.conn().await?;
453        let persisted_records = E::find_all_matching(&mut conn, search_key).await?;
454
455        let transaction_guard = self.transaction.lock().await;
456        let Some(transaction) = transaction_guard.as_ref() else {
457            return Ok(persisted_records);
458        };
459
460        transaction.search(persisted_records, search_key).await
461    }
462}