core_crypto_keystore/connection/
mod.rs1pub mod platform;
2
3use std::{
4 borrow::Borrow,
5 fmt,
6 ops::{Deref, DerefMut},
7 sync::Arc,
8};
9
10use async_lock::{Mutex, MutexGuard, Semaphore};
11use async_trait::async_trait;
12use sha2::{Digest as _, Sha256};
13use zeroize::{Zeroize, ZeroizeOnDrop};
14
15pub use self::platform::*;
16use crate::{
17 CryptoKeystoreError, CryptoKeystoreResult,
18 entities::{MlsPendingMessage, PersistedMlsGroup},
19 traits::{
20 BorrowPrimaryKey, Entity, EntityDatabaseMutation, EntityDeleteBorrowed, EntityGetBorrowed, FetchFromDatabase,
21 KeyType, SearchableEntity,
22 },
23 transaction::KeystoreTransaction,
24};
25
26pub const MAX_BLOB_LEN: usize = 1_000_000_000;
35
36#[cfg(not(target_family = "wasm"))]
37pub trait DatabaseConnectionRequirements: Sized + Send {}
39#[cfg(target_family = "wasm")]
40pub trait DatabaseConnectionRequirements: Sized {}
43
44#[derive(Clone, Zeroize, ZeroizeOnDrop, derive_more::From, PartialEq, Eq)]
46pub struct DatabaseKey([u8; Self::LEN]);
47
48impl DatabaseKey {
49 pub const LEN: usize = 32;
50
51 pub fn generate() -> DatabaseKey {
52 DatabaseKey(rand::random::<[u8; Self::LEN]>())
53 }
54}
55
56impl fmt::Debug for DatabaseKey {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
58 f.write_str("DatabaseKey(hash=")?;
59 for x in Sha256::digest(self).as_slice().iter().take(10) {
60 fmt::LowerHex::fmt(x, f)?
61 }
62 f.write_str("...)")
63 }
64}
65
66impl AsRef<[u8]> for DatabaseKey {
67 fn as_ref(&self) -> &[u8] {
68 &self.0
69 }
70}
71
72impl Deref for DatabaseKey {
73 type Target = [u8];
74
75 fn deref(&self) -> &Self::Target {
76 &self.0
77 }
78}
79
80impl TryFrom<&[u8]> for DatabaseKey {
81 type Error = CryptoKeystoreError;
82
83 fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
84 if buf.len() != Self::LEN {
85 Err(CryptoKeystoreError::InvalidDbKeySize {
86 expected: Self::LEN,
87 actual: buf.len(),
88 })
89 } else {
90 Ok(Self(buf.try_into().unwrap()))
91 }
92 }
93}
94
95#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
96#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
97pub trait DatabaseConnection<'a>: DatabaseConnectionRequirements {
98 type Connection: 'a;
99
100 async fn open(location: &str, key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
101
102 async fn open_in_memory(key: &DatabaseKey) -> CryptoKeystoreResult<Self>;
103
104 async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()>;
105
106 async fn wipe(self) -> CryptoKeystoreResult<()>;
108
109 fn check_buffer_size(size: usize) -> CryptoKeystoreResult<()> {
110 #[cfg(not(target_family = "wasm"))]
111 if size > i32::MAX as usize {
112 return Err(CryptoKeystoreError::BlobTooBig);
113 }
114
115 if size >= MAX_BLOB_LEN {
116 return Err(CryptoKeystoreError::BlobTooBig);
117 }
118
119 Ok(())
120 }
121
122 fn location(&self) -> Option<&str>;
124}
125
126#[derive(Debug, Clone)]
127pub struct Database {
128 pub(crate) conn: Arc<Mutex<Option<KeystoreDatabaseConnection>>>,
129 pub(crate) transaction: Arc<Mutex<Option<KeystoreTransaction>>>,
130 transaction_semaphore: Arc<Semaphore>,
131}
132
133const ALLOWED_CONCURRENT_TRANSACTIONS_COUNT: usize = 1;
134
135unsafe impl Send for Database {}
137unsafe impl Sync for Database {}
139
140#[derive(Debug, Clone)]
142pub enum ConnectionType<'a> {
143 Persistent(&'a str),
145 InMemory,
147}
148
149pub struct ConnectionGuard<'a> {
154 guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>,
155}
156
157impl<'a> TryFrom<MutexGuard<'a, Option<KeystoreDatabaseConnection>>> for ConnectionGuard<'a> {
158 type Error = CryptoKeystoreError;
159
160 fn try_from(guard: MutexGuard<'a, Option<KeystoreDatabaseConnection>>) -> Result<Self, Self::Error> {
161 guard
162 .is_some()
163 .then_some(Self { guard })
164 .ok_or(CryptoKeystoreError::Closed)
165 }
166}
167
168impl Deref for ConnectionGuard<'_> {
169 type Target = KeystoreDatabaseConnection;
170
171 fn deref(&self) -> &Self::Target {
172 self.guard
173 .as_ref()
174 .expect("we have exclusive access and already checked that the connection exists")
175 }
176}
177
178impl DerefMut for ConnectionGuard<'_> {
179 fn deref_mut(&mut self) -> &mut Self::Target {
180 self.guard
181 .as_mut()
182 .expect("we have exclusive access and already checked that the connection exists")
183 }
184}
185
186impl Database {
188 pub async fn open(location: ConnectionType<'_>, key: &DatabaseKey) -> CryptoKeystoreResult<Self> {
189 let conn = match location {
190 ConnectionType::Persistent(location) => KeystoreDatabaseConnection::open(location, key).await?,
191 ConnectionType::InMemory => KeystoreDatabaseConnection::open_in_memory(key).await?,
192 };
193 let conn = Mutex::new(Some(conn));
194 #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(conn);
196 Ok(Self {
197 conn,
198 transaction: Default::default(),
199 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
200 })
201 }
202
203 #[cfg(all(test, not(target_family = "wasm")))]
204 pub(crate) async fn open_at_schema_version(
205 name: &str,
206 key: &DatabaseKey,
207 version: MigrationTarget,
208 ) -> CryptoKeystoreResult<Self> {
209 let conn = KeystoreDatabaseConnection::init_with_key_at_schema_version(name, key, version)?;
210 let conn = Mutex::new(Some(conn));
211 let conn = Arc::new(conn);
212 Ok(Self {
213 conn,
214 transaction: Default::default(),
215 transaction_semaphore: Arc::new(Semaphore::new(ALLOWED_CONCURRENT_TRANSACTIONS_COUNT)),
216 })
217 }
218
219 pub async fn location(&self) -> CryptoKeystoreResult<Option<String>> {
220 return Ok(self.conn().await?.location().map(ToString::to_string));
221 }
222
223 pub async fn conn(&self) -> CryptoKeystoreResult<ConnectionGuard<'_>> {
225 self.conn.lock().await.try_into()
226 }
227
228 async fn take(&self) -> CryptoKeystoreResult<KeystoreDatabaseConnection> {
231 let _semaphore = self.transaction_semaphore.acquire_arc().await;
232
233 let mut guard = self.conn.lock().await;
234 guard.take().ok_or(CryptoKeystoreError::Closed)
235 }
236
237 pub async fn close(&self) -> CryptoKeystoreResult<()> {
239 #[cfg(not(target_family = "wasm"))]
240 self.take().await?;
241
242 #[cfg(target_family = "wasm")]
243 {
244 let conn = self.take().await?;
245 conn.close().await?;
246 }
247 Ok(())
248 }
249
250 pub async fn wipe(&self) -> CryptoKeystoreResult<()> {
252 self.take().await?.wipe().await
253 }
254
255 #[cfg(not(target_family = "wasm"))]
271 pub async fn export_copy(&self, destination_path: &str) -> CryptoKeystoreResult<()> {
272 let conn = self.conn().await?;
273 conn.export_copy(destination_path).await
274 }
275
276 pub async fn migrate_db_key_type_to_bytes(
277 name: &str,
278 old_key: &str,
279 new_key: &DatabaseKey,
280 ) -> CryptoKeystoreResult<()> {
281 KeystoreDatabaseConnection::migrate_db_key_type_to_bytes(name, old_key, new_key).await
282 }
283
284 pub async fn update_key(&mut self, new_key: &DatabaseKey) -> CryptoKeystoreResult<()> {
285 self.conn().await?.update_key(new_key).await
286 }
287
288 pub async fn new_transaction(&self) -> CryptoKeystoreResult<()> {
290 let semaphore = self.transaction_semaphore.acquire_arc().await;
291 let mut transaction_guard = self.transaction.lock().await;
292 *transaction_guard = Some(KeystoreTransaction::new(semaphore).await?);
293 Ok(())
294 }
295
296 pub async fn commit_transaction(&self) -> CryptoKeystoreResult<()> {
297 let mut transaction_guard = self.transaction.lock().await;
298 let Some(transaction) = transaction_guard.as_ref() else {
299 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
300 };
301 transaction.commit(self).await?;
302 *transaction_guard = None;
303 Ok(())
304 }
305
306 pub async fn rollback_transaction(&self) -> CryptoKeystoreResult<()> {
307 let mut transaction_guard = self.transaction.lock().await;
308 if transaction_guard.is_none() {
309 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
310 };
311 *transaction_guard = None;
312 Ok(())
313 }
314
315 pub async fn child_groups(&self, entity: PersistedMlsGroup) -> CryptoKeystoreResult<Vec<PersistedMlsGroup>> {
316 let mut conn = self.conn().await?;
317 let persisted_records = entity.child_groups(conn.deref_mut()).await?;
318
319 let transaction_guard = self.transaction.lock().await;
320 let Some(transaction) = transaction_guard.as_ref() else {
321 return Ok(persisted_records);
322 };
323 transaction.child_groups(entity, persisted_records).await
324 }
325
326 pub async fn save<'a, E>(&self, entity: E) -> CryptoKeystoreResult<E::AutoGeneratedFields>
327 where
328 E: Entity + EntityDatabaseMutation<'a> + Send + Sync,
329 {
330 let transaction_guard = self.transaction.lock().await;
331 let Some(transaction) = transaction_guard.as_ref() else {
332 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
333 };
334 transaction.save(entity).await
335 }
336
337 pub async fn remove<'a, E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<()>
338 where
339 E: Entity + EntityDatabaseMutation<'a>,
340 {
341 let transaction_guard = self.transaction.lock().await;
342 let Some(transaction) = transaction_guard.as_ref() else {
343 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
344 };
345 transaction.remove::<E>(id).await
346 }
347
348 pub async fn remove_borrowed<'a, E>(&self, id: &E::BorrowedPrimaryKey) -> CryptoKeystoreResult<()>
349 where
350 E: Entity + EntityDatabaseMutation<'a> + BorrowPrimaryKey + EntityDeleteBorrowed<'a>,
351 {
352 let transaction_guard = self.transaction.lock().await;
353 let Some(transaction) = transaction_guard.as_ref() else {
354 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
355 };
356 transaction.remove_borrowed::<E>(id).await
357 }
358
359 pub async fn find_pending_messages_by_conversation_id(
360 &self,
361 conversation_id: &[u8],
362 ) -> CryptoKeystoreResult<Vec<MlsPendingMessage>> {
363 let mut conn = self.conn().await?;
364 let persisted_records = MlsPendingMessage::find_all_matching(&mut conn, &conversation_id.into()).await?;
365
366 let transaction_guard = self.transaction.lock().await;
367 let Some(transaction) = transaction_guard.as_ref() else {
368 return Ok(persisted_records);
369 };
370 transaction
371 .find_pending_messages_by_conversation_id(conversation_id, persisted_records)
372 .await
373 }
374
375 pub async fn remove_pending_messages_by_conversation_id(
376 &self,
377 conversation_id: impl AsRef<[u8]> + Send,
378 ) -> CryptoKeystoreResult<()> {
379 let transaction_guard = self.transaction.lock().await;
380 let Some(transaction) = transaction_guard.as_ref() else {
381 return Err(CryptoKeystoreError::MutatingOperationWithoutTransaction);
382 };
383 transaction
384 .remove_pending_messages_by_conversation_id(conversation_id)
385 .await;
386 Ok(())
387 }
388}
389
390#[cfg_attr(target_family = "wasm", async_trait(?Send))]
391#[cfg_attr(not(target_family = "wasm"), async_trait)]
392impl FetchFromDatabase for Database {
393 async fn get<E>(&self, id: &E::PrimaryKey) -> CryptoKeystoreResult<Option<E>>
394 where
395 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
396 {
397 if let Some(transaction) = self.transaction.lock().await.as_ref()
399 && let Some(cached_record) = transaction.get(id).await
401 {
402 return Ok(cached_record.map(Arc::unwrap_or_clone));
403 }
404
405 let mut conn = self.conn().await?;
407 E::get(&mut conn, id).await
408 }
409
410 async fn get_borrowed<E>(&self, id: &<E as BorrowPrimaryKey>::BorrowedPrimaryKey) -> CryptoKeystoreResult<Option<E>>
411 where
412 E: EntityGetBorrowed<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
413 E::PrimaryKey: Borrow<E::BorrowedPrimaryKey>,
414 for<'a> &'a E::BorrowedPrimaryKey: KeyType,
415 {
416 if let Some(transaction) = self.transaction.lock().await.as_ref()
418 && let Some(cached_record) = transaction.get_borrowed(id).await
420 {
421 return Ok(cached_record.map(Arc::unwrap_or_clone));
422 }
423
424 let mut conn = self.conn().await?;
426 E::get_borrowed(&mut conn, id).await
427 }
428
429 async fn count<E>(&self) -> CryptoKeystoreResult<u32>
430 where
431 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
432 {
433 if self.transaction.lock().await.is_some() {
434 let count = self.load_all::<E>().await?.len();
437 Ok(count as _)
438 } else {
439 let mut conn = self.conn().await?;
440 E::count(&mut conn).await
441 }
442 }
443
444 async fn load_all<E>(&self) -> CryptoKeystoreResult<Vec<E>>
445 where
446 E: Entity<ConnectionType = KeystoreDatabaseConnection> + Clone + Send + Sync,
447 {
448 let mut conn = self.conn().await?;
449 let persisted_records = E::load_all(&mut conn).await?;
450
451 let transaction_guard = self.transaction.lock().await;
452 let Some(transaction) = transaction_guard.as_ref() else {
453 return Ok(persisted_records);
454 };
455 transaction.find_all(persisted_records).await
456 }
457
458 async fn search<E, SearchKey>(&self, search_key: &SearchKey) -> CryptoKeystoreResult<Vec<E>>
459 where
460 E: Entity<ConnectionType = KeystoreDatabaseConnection> + SearchableEntity<SearchKey> + Clone + Send + Sync,
461 SearchKey: KeyType,
462 {
463 let mut conn = self.conn().await?;
464 let persisted_records = E::find_all_matching(&mut conn, search_key).await?;
465
466 let transaction_guard = self.transaction.lock().await;
467 let Some(transaction) = transaction_guard.as_ref() else {
468 return Ok(persisted_records);
469 };
470
471 transaction.search(persisted_records, search_key).await
472 }
473}