core_crypto_keystore/connection/
mod.rs1pub mod platform;
2
3use std::{
4 borrow::Borrow,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use async_lock::{Mutex, MutexGuard, Semaphore};
10use async_trait::async_trait;
11
12pub use self::platform::*;
13use crate::{
14 CryptoKeystoreError, CryptoKeystoreResult, DatabaseKey,
15 entities::{MlsPendingMessage, PersistedMlsGroup},
16 traits::{
17 BorrowPrimaryKey, Entity, EntityDatabaseMutation, EntityDeleteBorrowed, EntityGetBorrowed, FetchFromDatabase,
18 KeyType, SearchableEntity,
19 },
20 transaction::KeystoreTransaction,
21};
22
23pub const MAX_BLOB_LEN: usize = 1_000_000_000;
32
33#[cfg(not(target_os = "unknown"))]
34pub trait DatabaseConnectionRequirements: Sized + Send {}
36#[cfg(target_os = "unknown")]
37pub trait DatabaseConnectionRequirements: Sized {}
40
41#[cfg_attr(target_os = "unknown", async_trait::async_trait(?Send))]
42#[cfg_attr(not(target_os = "unknown"), async_trait::async_trait)]
43pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
44 type Connection: 'a;
45
46 async fn open(location: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
47
48 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
49
50 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
51
52 async fn wipe(self) -> CryptoKeystoreResult<()>;
54
55 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
56 #[cfg(not(target_os = "unknown"))]
57 if size > i32::MAX as usize {
58 return Err(CryptoKeystoreError::BlobTooBig);
59 }
60
61 if size >= MAX_BLOB_LEN {
62 return Err(CryptoKeystoreError::BlobTooBig);
63 }
64
65 Ok(())
66 }
67
68 fn location(&self) -> Option<&str>;
70}
71
72#[derive(Debug, Clone)]
73pub struct Database {
74 pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
75 pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
76 transaction_semaphore: Arc<Semaphore>,
77}
78
79const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
80
81unsafe impl Send for Database {}
83unsafe impl Sync for Database {}
85
86#[derive(Debug, Clone)]
88pub enum ConnectionType<'a> {
89 Persistent(&'a str),
91 InMemory,
93}
94
95pub struct ConnectionGuard<'a> {
100 guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
101}
102
103impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
104 type Error = CryptoKeystoreError;
105
106 fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
107 guard
108 .is_some()
109 .then_some(Self { guard })
110 .ok_or(CryptoKeystoreError::Closed)
111 }
112}
113
114impl Deref for ConnectionGuard<'_> {
115 type Target = KeystoreDatabaseConnection;
116
117 fn deref(&self) -> &Self::Target {
118 self.guard
119 .as_ref()
120 .expect("we have exclusive access and already checked that the connection exists")
121 }
122}
123
124impl DerefMut for ConnectionGuard<'_> {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 self.guard
127 .as_mut()
128 .expect("we have exclusive access and already checked that the connection exists")
129 }
130}
131
132impl Database {
134 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
135 let conn = match location {
136 ConnectionType::Persistent(location) => KeystoreDatabaseConnection::open(location, key).await?,
137 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
138 };
139 let conn = Mutex::new(Some(conn));
140 #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(conn);
142 Ok(Self {
143 conn,
144 transaction: Default::default(),
145 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
146 })
147 }
148
149 #[cfg(all(test, not(target_os = "unknown")))]
150 pub(crate) async fn open_at_schema_version(
151 name: &str,
152 key: &DatabaseKey,
153 version: MigrationTarget,
154 ) -> CryptoKeystoreResult<Self> {
155 let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
156 let conn = Mutex::new(Some(conn));
157 let conn = Arc::new(conn);
158 Ok(Self {
159 conn,
160 transaction: Default::default(),
161 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
162 })
163 }
164
165 #[cfg(all(test, target_os = "unknown"))]
166 pub(crate) async fn open_at_schema_version(
167 name: &str,
168 key: &DatabaseKey,
169 version: Option<u32>,
170 ) -> CryptoKeystoreResult<Self> {
171 use crate::connection::{
172 storage::{WasmEncryptedStorage, WasmStorageWrapper},
173 wasm::migrations::{TARGET_VERSION, open_at},
174 };
175
176 let version = version.unwrap_or(TARGET_VERSION);
177 let idb_database = open_at(name, key, version).await;
178 let wasm_connection = KeystoreDatabaseConnection::from_inner(WasmEncryptedStorage::new(
179 key,
180 WasmStorageWrapper::Persistent(idb_database),
181 ));
182 let conn = Arc::new(Mutex::new(Some(wasm_connection)));
183 Ok(Self {
184 conn,
185 transaction: Default::default(),
186 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
187 })
188 }
189
190 pub async fn location(&self) -> CryptoKeystoreResult<Option<String>> {
191 return Ok(self.conn().await?.location().map(ToString::to_string));
192 }
193
194 pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
196 self.conn.lock().await.try_into()
197 }
198
199 async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
202 let _semaphore = self.transaction_semaphore.acquire_arc().await;
203
204 let mut guard = self.conn.lock().await;
205 guard.take().ok_or(CryptoKeystoreError::Closed)
206 }
207
208 pub async fn close(&self) -> CryptoKeystoreResult<()> {
210 #[cfg(not(target_os = "unknown"))]
211 self.take().await?;
212
213 #[cfg(target_os = "unknown")]
214 {
215 let conn = self.take().await?;
216 conn.close().await?;
217 }
218 Ok(())
219 }
220
221 pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
223 self.take().await?.wipe().await
224 }
225
226 #[cfg(not(target_os = "unknown"))]
242 pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
243 let conn = self.conn().await?;
244 conn.export_copy(destination_path).await
245 }
246
247 pub async fn migrate_db_key_type_to_bytes(
248 name: &str,
249 old_key: &str,
250 new_key: &DatabaseKey,
251 ) -> CryptoKeystoreResult<()> {
252 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
253 }
254
255 pub async fn update_key(&self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
256 self.conn().await?.update_key(new_key).await
257 }
258
259 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
261 let semaphore = self.transaction_semaphore.acquire_arc().await;
262 let mut transaction_guard = self.transaction.lock().await;
263 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
264 Ok(())
265 }
266
267 pub async fn try_new_immediate_transaction(&self) -> CryptoKeystoreResult<()> {
271 let semaphore = self
272 .transaction_semaphore
273 .try_acquire_arc()
274 .ok_or(CryptoKeystoreError::TransactionInProgress)?;
275 let mut transaction_guard = self.transaction.lock().await;
276 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
277 Ok(())
278 }
279
280 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
281 let mut transaction_guard = self.transaction.lock().await;
282 let Some(transaction) = transaction_guard.as_ref() else {
283 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
284 };
285 transaction.commit(self).await?;
286 *transaction_guard = None;
287 Ok(())
288 }
289
290 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
291 let mut transaction_guard = self.transaction.lock().await;
292 if transaction_guard.is_none() {
293 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
294 };
295 *transaction_guard = None;
296 Ok(())
297 }
298
299 pub async fn child_groups(&self, entity: PersistedMlsGroup) -> CryptoKeystoreResult<Vec<PersistedMlsGroup>> {
300 let mut conn = self.conn().await?;
301 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
302
303 let transaction_guard = self.transaction.lock().await;
304 let Some(transaction) = transaction_guard.as_ref() else {
305 return Ok(persisted_records);
306 };
307 transaction.child_groups(entity, persisted_records).await
308 }
309
310 pub async fn save<'a, E>(&self, entity: E) -> CryptoKeystoreResult<E::AutoGeneratedFields>
311 where
312 E: Entity + EntityDatabaseMutation<'a> + Send + Sync,
313 {
314 let transaction_guard = self.transaction.lock().await;
315 let Some(transaction) = transaction_guard.as_ref() else {
316 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
317 };
318 transaction.save(entity).await
319 }
320
321 pub async fn remove<'a, E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<()>
322 where
323 E: Entity + EntityDatabaseMutation<'a>,
324 {
325 let transaction_guard = self.transaction.lock().await;
326 let Some(transaction) = transaction_guard.as_ref() else {
327 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
328 };
329 transaction.remove::<E>(id).await
330 }
331
332 pub async fn remove_borrowed<'a, E>(&self, id: &E::BorrowedPrimaryKey) -> CryptoKeystoreResult<()>
333 where
334 E: Entity + EntityDatabaseMutation<'a> + BorrowPrimaryKey + EntityDeleteBorrowed<'a>,
335 {
336 let transaction_guard = self.transaction.lock().await;
337 let Some(transaction) = transaction_guard.as_ref() else {
338 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
339 };
340 transaction.remove_borrowed::<E>(id).await
341 }
342
343 pub async fn find_pending_messages_by_conversation_id(
344 &self,
345 conversation_id: &[u8],
346 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
347 let mut conn = self.conn().await?;
348 let persisted_records = MlsPendingMessage::find_all_matching(&mut conn, &conversation_id.into()).await?;
349
350 let transaction_guard = self.transaction.lock().await;
351 let Some(transaction) = transaction_guard.as_ref() else {
352 return Ok(persisted_records);
353 };
354 transaction
355 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
356 .await
357 }
358
359 pub async fn remove_pending_messages_by_conversation_id(
360 &self,
361 conversation_id: impl AsRef<[u8]> + Send,
362 ) -> CryptoKeystoreResult<()> {
363 let transaction_guard = self.transaction.lock().await;
364 let Some(transaction) = transaction_guard.as_ref() else {
365 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
366 };
367 transaction
368 .remove_pending_messages_by_conversation_id(conversation_id)
369 .await;
370 Ok(())
371 }
372}
373
374#[cfg_attr(target_os = "unknown", async_trait(?Send))]
375#[cfg_attr(not(target_os = "unknown"), async_trait)]
376impl FetchFromDatabase for Database {
377 async fn get<E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<Option<E>>
378 where
379 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
380 {
381 if let Some(transaction) = self.transaction.lock().await.as_ref()
383 && let Some(cached_record) = transaction.get(id).await
385 {
386 return Ok(cached_record.map(Arc::unwrap_or_clone));
387 }
388
389 let mut conn = self.conn().await?;
391 E::get(&mut conn, id).await
392 }
393
394 async fn get_borrowed<E>(&self, id: &<E as BorrowPrimaryKey>::BorrowedPrimaryKey) -> CryptoKeystoreResult<Option<E>>
395 where
396 E: EntityGetBorrowed<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
397 E::PrimaryKey: Borrow<E::BorrowedPrimaryKey>,
398 for<'a> &'a E::BorrowedPrimaryKey: KeyType,
399 {
400 if let Some(transaction) = self.transaction.lock().await.as_ref()
402 && let Some(cached_record) = transaction.get_borrowed(id).await
404 {
405 return Ok(cached_record.map(Arc::unwrap_or_clone));
406 }
407
408 let mut conn = self.conn().await?;
410 E::get_borrowed(&mut conn, id).await
411 }
412
413 async fn count<E>(&self) -> CryptoKeystoreResult<u32>
414 where
415 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
416 {
417 if self.transaction.lock().await.is_some() {
418 let count = self.load_all::<E>().await?.len();
421 Ok(count as _)
422 } else {
423 let mut conn = self.conn().await?;
424 E::count(&mut conn).await
425 }
426 }
427
428 async fn load_all<E>(&self) -> CryptoKeystoreResult<Vec<E>>
429 where
430 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
431 {
432 let mut conn = self.conn().await?;
433 let persisted_records = E::load_all(&mut conn).await?;
434
435 let transaction_guard = self.transaction.lock().await;
436 let Some(transaction) = transaction_guard.as_ref() else {
437 return Ok(persisted_records);
438 };
439 transaction.find_all(persisted_records).await
440 }
441
442 async fn search<E, SearchKey>(&self, search_key: &SearchKey) -> CryptoKeystoreResult<Vec<E>>
443 where
444 E: Entity<ConnectionType = KeystoreDatabaseConnection> + SearchableEntity<SearchKey> + Clone + Send + Sync,
445 SearchKey: KeyType,
446 {
447 let mut conn = self.conn().await?;
448 let persisted_records = E::find_all_matching(&mut conn, search_key).await?;
449
450 let transaction_guard = self.transaction.lock().await;
451 let Some(transaction) = transaction_guard.as_ref() else {
452 return Ok(persisted_records);
453 };
454
455 transaction.search(persisted_records, search_key).await
456 }
457}