core_crypto_keystore/connection/
mod.rs1pub mod platform;
2
3use std::{
4 borrow::Borrow,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use async_lock::{Mutex, MutexGuard, Semaphore};
10use async_trait::async_trait;
11
12pub use self::platform::*;
13use crate::{
14 CryptoKeystoreError, CryptoKeystoreResult, DatabaseKey,
15 entities::{MlsPendingMessage, PersistedMlsGroup},
16 traits::{
17 BorrowPrimaryKey, Entity, EntityDatabaseMutation, EntityDeleteBorrowed, EntityGetBorrowed, FetchFromDatabase,
18 KeyType, SearchableEntity,
19 },
20 transaction::KeystoreTransaction,
21};
22
23pub const MAX_BLOB_LEN: usize = 1_000_000_000;
32
33#[cfg(not(target_os = "unknown"))]
34pub trait DatabaseConnectionRequirements: Sized + Send {}
36#[cfg(target_os = "unknown")]
37pub trait DatabaseConnectionRequirements: Sized {}
40
41#[cfg_attr(target_os = "unknown", async_trait::async_trait(?Send))]
42#[cfg_attr(not(target_os = "unknown"), async_trait::async_trait)]
43pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
44 type Connection: 'a;
45
46 async fn open(location: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
47
48 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
49
50 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
51
52 async fn wipe(self) -> CryptoKeystoreResult<()>;
54
55 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
56 #[cfg(not(target_os = "unknown"))]
57 if size > i32::MAX as usize {
58 return Err(CryptoKeystoreError::BlobTooBig);
59 }
60
61 if size >= MAX_BLOB_LEN {
62 return Err(CryptoKeystoreError::BlobTooBig);
63 }
64
65 Ok(())
66 }
67
68 fn location(&self) -> Option<&str>;
70}
71
72#[derive(Debug)]
73pub struct Database {
74 pub(crate) conn: Mutex<Option<KeystoreDatabaseConnection>>,
75 pub(crate) transaction: Mutex<Option<KeystoreTransaction>>,
76 transaction_semaphore: Arc<Semaphore>,
79}
80
81const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
82
83unsafe impl Send for Database {}
85unsafe impl Sync for Database {}
87
88#[derive(Debug, Clone)]
90pub enum ConnectionType<'a> {
91 Persistent(&'a str),
93 InMemory,
95}
96
97pub struct ConnectionGuard<'a> {
102 guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
103}
104
105impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
106 type Error = CryptoKeystoreError;
107
108 fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
109 guard
110 .is_some()
111 .then_some(Self { guard })
112 .ok_or(CryptoKeystoreError::Closed)
113 }
114}
115
116impl Deref for ConnectionGuard<'_> {
117 type Target = KeystoreDatabaseConnection;
118
119 fn deref(&self) -> &Self::Target {
120 self.guard
121 .as_ref()
122 .expect("we have exclusive access and already checked that the connection exists")
123 }
124}
125
126impl DerefMut for ConnectionGuard<'_> {
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 self.guard
129 .as_mut()
130 .expect("we have exclusive access and already checked that the connection exists")
131 }
132}
133
134impl Database {
136 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Arc<Self>> {
137 let conn = match location {
138 ConnectionType::Persistent(location) => KeystoreDatabaseConnection::open(location, key).await?,
139 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
140 };
141 let conn = Mutex::new(Some(conn));
142 Ok(Self {
143 conn,
144 transaction: Default::default(),
145 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
146 }
147 .into())
148 }
149
150 #[cfg(all(test, not(target_os = "unknown")))]
151 pub(crate) async fn open_at_schema_version(
152 name: &str,
153 key: &DatabaseKey,
154 version: MigrationTarget,
155 ) -> CryptoKeystoreResult<Self> {
156 let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
157 let conn = Mutex::new(Some(conn));
158 Ok(Self {
159 conn,
160 transaction: Default::default(),
161 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
162 })
163 }
164
165 #[cfg(all(test, target_os = "unknown"))]
166 pub(crate) async fn open_at_schema_version(
167 name: &str,
168 key: &DatabaseKey,
169 version: Option<u32>,
170 ) -> CryptoKeystoreResult<Arc<Self>> {
171 use crate::connection::{
172 storage::{WasmEncryptedStorage, WasmStorageWrapper},
173 wasm::migrations::{TARGET_VERSION, open_at},
174 };
175
176 let version = version.unwrap_or(TARGET_VERSION);
177 let idb_database = open_at(name, key, version).await;
178 let wasm_connection = KeystoreDatabaseConnection::from_inner(WasmEncryptedStorage::new(
179 key,
180 WasmStorageWrapper::Persistent(idb_database),
181 ));
182 let conn = Mutex::new(Some(wasm_connection));
183 Ok(Self {
184 conn,
185 transaction: Default::default(),
186 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
187 }
188 .into())
189 }
190
191 pub async fn location(&self) -> CryptoKeystoreResult<Option<String>> {
192 return Ok(self.conn().await?.location().map(ToString::to_string));
193 }
194
195 pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
197 self.conn.lock().await.try_into()
198 }
199}
200
201impl Database {
204 async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
207 let _semaphore = self.transaction_semaphore.acquire_arc().await;
208
209 let mut guard = self.conn.lock().await;
210 guard.take().ok_or(CryptoKeystoreError::Closed)
211 }
212
213 pub async fn close(&self) -> CryptoKeystoreResult<()> {
215 #[cfg(not(target_os = "unknown"))]
216 self.take().await?;
217
218 #[cfg(target_os = "unknown")]
219 {
220 let conn = self.take().await?;
221 conn.close().await?;
222 }
223 Ok(())
224 }
225
226 pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
228 self.take().await?.wipe().await
229 }
230
231 #[cfg(not(target_os = "unknown"))]
247 pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
248 let conn = self.conn().await?;
249 conn.export_copy(destination_path).await
250 }
251
252 pub async fn migrate_db_key_type_to_bytes(
253 name: &str,
254 old_key: &str,
255 new_key: &DatabaseKey,
256 ) -> CryptoKeystoreResult<()> {
257 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
258 }
259
260 pub async fn update_key(&self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
261 self.conn().await?.update_key(new_key).await
262 }
263
264 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
266 let semaphore = self.transaction_semaphore.acquire_arc().await;
267 let mut transaction_guard = self.transaction.lock().await;
268 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
269 Ok(())
270 }
271
272 pub async fn try_new_immediate_transaction(&self) -> CryptoKeystoreResult<()> {
276 let semaphore = self
277 .transaction_semaphore
278 .try_acquire_arc()
279 .ok_or(CryptoKeystoreError::TransactionInProgress)?;
280 let mut transaction_guard = self.transaction.lock().await;
281 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
282 Ok(())
283 }
284
285 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
286 let mut transaction_guard = self.transaction.lock().await;
287 let Some(transaction) = transaction_guard.as_ref() else {
288 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
289 };
290 transaction.commit(self).await?;
291 *transaction_guard = None;
292 Ok(())
293 }
294
295 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
296 let mut transaction_guard = self.transaction.lock().await;
297 if transaction_guard.is_none() {
298 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
299 };
300 *transaction_guard = None;
301 Ok(())
302 }
303
304 pub async fn child_groups(&self, entity: PersistedMlsGroup) -> CryptoKeystoreResult<Vec<PersistedMlsGroup>> {
305 let mut conn = self.conn().await?;
306 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
307
308 let transaction_guard = self.transaction.lock().await;
309 let Some(transaction) = transaction_guard.as_ref() else {
310 return Ok(persisted_records);
311 };
312 transaction.child_groups(entity, persisted_records).await
313 }
314
315 pub async fn save<'a, E>(&self, entity: E) -> CryptoKeystoreResult<E::AutoGeneratedFields>
316 where
317 E: Entity + EntityDatabaseMutation<'a> + Send + Sync,
318 {
319 let transaction_guard = self.transaction.lock().await;
320 let Some(transaction) = transaction_guard.as_ref() else {
321 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
322 };
323 transaction.save(entity).await
324 }
325
326 pub async fn remove<'a, E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<()>
327 where
328 E: Entity + EntityDatabaseMutation<'a>,
329 {
330 let transaction_guard = self.transaction.lock().await;
331 let Some(transaction) = transaction_guard.as_ref() else {
332 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
333 };
334 transaction.remove::<E>(id).await
335 }
336
337 pub async fn remove_borrowed<'a, E>(&self, id: &E::BorrowedPrimaryKey) -> CryptoKeystoreResult<()>
338 where
339 E: Entity + EntityDatabaseMutation<'a> + BorrowPrimaryKey + EntityDeleteBorrowed<'a>,
340 {
341 let transaction_guard = self.transaction.lock().await;
342 let Some(transaction) = transaction_guard.as_ref() else {
343 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
344 };
345 transaction.remove_borrowed::<E>(id).await
346 }
347
348 pub async fn find_pending_messages_by_conversation_id(
349 &self,
350 conversation_id: &[u8],
351 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
352 let mut conn = self.conn().await?;
353 let persisted_records = MlsPendingMessage::find_all_matching(&mut conn, &conversation_id.into()).await?;
354
355 let transaction_guard = self.transaction.lock().await;
356 let Some(transaction) = transaction_guard.as_ref() else {
357 return Ok(persisted_records);
358 };
359 transaction
360 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
361 .await
362 }
363
364 pub async fn remove_pending_messages_by_conversation_id(
365 &self,
366 conversation_id: impl AsRef<[u8]> + Send,
367 ) -> CryptoKeystoreResult<()> {
368 let transaction_guard = self.transaction.lock().await;
369 let Some(transaction) = transaction_guard.as_ref() else {
370 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
371 };
372 transaction
373 .remove_pending_messages_by_conversation_id(conversation_id)
374 .await;
375 Ok(())
376 }
377}
378
379#[cfg_attr(target_os = "unknown", async_trait(?Send))]
380#[cfg_attr(not(target_os = "unknown"), async_trait)]
381impl FetchFromDatabase for Database {
382 async fn get<E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<Option<E>>
383 where
384 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
385 {
386 if let Some(transaction) = self.transaction.lock().await.as_ref()
388 && let Some(cached_record) = transaction.get(id).await
390 {
391 return Ok(cached_record.map(Arc::unwrap_or_clone));
392 }
393
394 let mut conn = self.conn().await?;
396 E::get(&mut conn, id).await
397 }
398
399 async fn get_borrowed<E>(&self, id: &<E as BorrowPrimaryKey>::BorrowedPrimaryKey) -> CryptoKeystoreResult<Option<E>>
400 where
401 E: EntityGetBorrowed<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
402 E::PrimaryKey: Borrow<E::BorrowedPrimaryKey>,
403 for<'a> &'a E::BorrowedPrimaryKey: KeyType,
404 {
405 if let Some(transaction) = self.transaction.lock().await.as_ref()
407 && let Some(cached_record) = transaction.get_borrowed(id).await
409 {
410 return Ok(cached_record.map(Arc::unwrap_or_clone));
411 }
412
413 let mut conn = self.conn().await?;
415 E::get_borrowed(&mut conn, id).await
416 }
417
418 async fn count<E>(&self) -> CryptoKeystoreResult<u32>
419 where
420 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
421 {
422 if self.transaction.lock().await.is_some() {
423 let count = self.load_all::<E>().await?.len();
426 Ok(count as _)
427 } else {
428 let mut conn = self.conn().await?;
429 E::count(&mut conn).await
430 }
431 }
432
433 async fn load_all<E>(&self) -> CryptoKeystoreResult<Vec<E>>
434 where
435 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
436 {
437 let mut conn = self.conn().await?;
438 let persisted_records = E::load_all(&mut conn).await?;
439
440 let transaction_guard = self.transaction.lock().await;
441 let Some(transaction) = transaction_guard.as_ref() else {
442 return Ok(persisted_records);
443 };
444 transaction.find_all(persisted_records).await
445 }
446
447 async fn search<E, SearchKey>(&self, search_key: &SearchKey) -> CryptoKeystoreResult<Vec<E>>
448 where
449 E: Entity<ConnectionType = KeystoreDatabaseConnection> + SearchableEntity<SearchKey> + Clone + Send + Sync,
450 SearchKey: KeyType,
451 {
452 let mut conn = self.conn().await?;
453 let persisted_records = E::find_all_matching(&mut conn, search_key).await?;
454
455 let transaction_guard = self.transaction.lock().await;
456 let Some(transaction) = transaction_guard.as_ref() else {
457 return Ok(persisted_records);
458 };
459
460 transaction.search(persisted_records, search_key).await
461 }
462}