Skip to content

Commit

Permalink
cipher
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Jan 17, 2025
1 parent 7912ab1 commit ed53c1e
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 43 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ xmtp_id = { path = "xmtp_id" }
xmtp_mls = { path = "xmtp_mls" }
xmtp_proto = { path = "xmtp_proto" }


[profile.dev]
# Disabling debug info speeds up builds a bunch,
# and we don't rely on it for debugging that much.
Expand Down
15 changes: 12 additions & 3 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ impl FfiXmtpClient {
Ok(())
}

/// Manually trigger a device sync request to sync records from another active device on this account.
pub async fn send_sync_request(&self, kind: FfiDeviceSyncKind) -> Result<(), GenericError> {
let provider = self.inner_client.mls_provider()?;
self.inner_client
Expand Down Expand Up @@ -499,7 +500,7 @@ impl FfiXmtpClient {
Ok(())
}

/// Revokes or removes an identity - really a wallet address - from the existing client
/// Revokes or removes an identity from the existing client
pub async fn revoke_wallet(
&self,
wallet_address: &str,
Expand Down Expand Up @@ -563,20 +564,28 @@ impl FfiXmtpClient {
}))
}

pub async fn backup(&self, path: String, opts: FfiBackupOptions) -> Result<(), GenericError> {
/// Backup your application to file for later restoration.
pub async fn backup_to_file(
&self,
path: String,
opts: FfiBackupOptions,
) -> Result<(), GenericError> {
let provider = self.inner_client.mls_provider()?;
let opts: BackupOptions = opts.into();
opts.export_to_file(provider, path).await?;

Ok(())
}

pub async fn metadata(&self, path: String) -> Result<FfiBackupMetadata, GenericError> {
/// Load the metadata for a backup to see what it contains.
/// Reads only the metadata without loading the entire file, so this function is quick.
pub async fn backup_metadata(&self, path: String) -> Result<FfiBackupMetadata, GenericError> {
let file = tokio::fs::File::open(path).await?;
let importer = BackupImporter::open(file).await?;
Ok(importer.metadata.into())
}
}

#[derive(uniffi::Record)]
pub struct FfiBackupMetadata {
backup_version: u32,
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub enum DeviceSyncError {
#[error(transparent)]
Backup(#[from] BackupError),
#[error(transparent)]
DecodeError(#[from] prost::DecodeError),
Decode(#[from] prost::DecodeError),
}

impl RetryableError for DeviceSyncError {
Expand Down
12 changes: 8 additions & 4 deletions xmtp_mls/src/groups/device_sync/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ impl BackupOptions {
self,
provider: XmtpOpenMlsProvider,
path: impl AsRef<Path>,
key: &[u8],
) -> Result<(), DeviceSyncError> {
let provider = Arc::new(provider);
let mut exporter = BackupExporter::new(self, &provider);
let mut exporter = BackupExporter::new(self, &provider, key);
exporter.write_to_file(path).await?;

Ok(())
Expand Down Expand Up @@ -88,7 +89,8 @@ mod tests {
elements: vec![BackupElementSelection::Messages],
};

let mut exporter = BackupExporter::new(opts, &alix_provider);
let key = vec![7; 12];
let mut exporter = BackupExporter::new(opts, &alix_provider, &key);
let path = Path::new("archive.zstd");
let _ = std::fs::remove_file(path);
exporter.write_to_file(path).await.unwrap();
Expand All @@ -97,8 +99,10 @@ mod tests {
let alix2 = ClientBuilder::new_test_client(&alix2_wallet).await;
let alix2_provider = Arc::new(alix2.mls_provider().unwrap());

let file = tokio::fs::File::open(path).await.unwrap();
let mut importer = BackupImporter::open(file).await.unwrap();
let mut importer = BackupImporter::from_file(path, &key).await.unwrap();
importer.insert(&alix2_provider).await.unwrap();

// cleanup
let _ = tokio::fs::remove_file(path).await;
}
}
47 changes: 43 additions & 4 deletions xmtp_mls/src/groups/device_sync/backup/backup_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{export_stream::BatchExportStream, BackupOptions};
use crate::{groups::device_sync::DeviceSyncError, XmtpOpenMlsProvider};
use crate::{groups::device_sync::NONCE_SIZE, XmtpOpenMlsProvider};
use aes_gcm::{aead::Aead, aes::Aes256, Aes256Gcm, AesGcm, KeyInit};
use async_compression::futures::write::ZstdEncoder;
use futures::{pin_mut, task::Context, AsyncRead, AsyncReadExt, AsyncWriteExt, StreamExt};
use futures::{pin_mut, task::Context, AsyncRead, AsyncWriteExt, StreamExt};
use prost::Message;
use std::{future::Future, io, path::Path, pin::Pin, sync::Arc, task::Poll};
use sha2::digest::{generic_array::GenericArray, typenum};
use std::{future::Future, io, pin::Pin, sync::Arc, task::Poll};
use xmtp_proto::xmtp::device_sync::{backup_element::Element, BackupElement, BackupMetadata};

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -16,36 +18,65 @@ pub(super) struct BackupExporter {
position: usize,
zstd_encoder: ZstdEncoder<Vec<u8>>,
encoder_finished: bool,

cipher: AesGcm<Aes256, typenum::U12, typenum::U16>,
nonce: GenericArray<u8, typenum::U12>,

// Used to write the nonce, contains the same data as nonce.
nonce_buffer: Vec<u8>,
}

#[derive(Default)]
pub(super) enum Stage {
#[default]
Nonce,
Metadata,
Elements,
}

impl BackupExporter {
pub(super) fn new(opts: BackupOptions, provider: &Arc<XmtpOpenMlsProvider>) -> Self {
pub(super) fn new(
opts: BackupOptions,
provider: &Arc<XmtpOpenMlsProvider>,
key: &[u8],
) -> Self {
let nonce = xmtp_common::rand_array::<NONCE_SIZE>();
Self {
position: 0,
stage: Stage::default(),
stream: BatchExportStream::new(&opts, provider),
metadata: opts.into(),
zstd_encoder: ZstdEncoder::new(Vec::new()),
encoder_finished: false,

cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
nonce: GenericArray::clone_from_slice(&nonce),
nonce_buffer: nonce.to_vec(),
}
}
}

impl AsyncRead for BackupExporter {
/// This function encrypts first, and compresses second.
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

// Putting this up here becuase we don't want to encrypt or compress the nonce.
if matches!(this.stage, Stage::Nonce) {
let amount = this.nonce_buffer.len().min(buf.len());
let nonce_bytes: Vec<_> = this.nonce_buffer.drain(..amount).collect();
buf[..amount].copy_from_slice(&nonce_bytes);

if this.nonce_buffer.is_empty() {
this.stage = Stage::Metadata;
}
return Poll::Ready(Ok(amount));
}

{
// Read from the buffer while there is data
let buffer_inner = this.zstd_encoder.get_ref();
Expand All @@ -66,6 +97,10 @@ impl AsyncRead for BackupExporter {
// Time to fill the buffer with more data 8kb at a time.
while this.zstd_encoder.get_ref().len() < 8_000 {
let mut element = match this.stage {
Stage::Nonce => {
// Should never get here due to the above logic. Error if it does.
unreachable!()
}
Stage::Metadata => {
this.stage = Stage::Elements;
BackupElement {
Expand All @@ -90,6 +125,10 @@ impl AsyncRead for BackupExporter {

let mut bytes = (element.len() as u32).to_le_bytes().to_vec();
bytes.append(&mut element);
let bytes = this
.cipher
.encrypt(&this.nonce, &*bytes)
.expect("Encryption should always work");

let fut = this.zstd_encoder.write(&bytes);
pin_mut!(fut);
Expand Down
52 changes: 47 additions & 5 deletions xmtp_mls/src/groups/device_sync/backup/backup_importer.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,68 @@
use crate::{
groups::device_sync::{DeviceSyncError, NONCE_SIZE},
storage::{
consent_record::StoredConsentRecord, group::StoredGroup, group_message::StoredGroupMessage,
DbConnection, ProviderTransactions, StorageError,
},
Store, XmtpOpenMlsProvider,
};
use aes_gcm::{aead::Aead, aes::Aes256, Aes256Gcm, AesGcm, KeyInit};
use async_compression::futures::bufread::ZstdDecoder;
use futures::{AsyncBufRead, AsyncReadExt};
use prost::Message;
use sha2::digest::{generic_array::GenericArray, typenum};
use std::pin::Pin;
use xmtp_proto::xmtp::device_sync::{backup_element::Element, BackupElement, BackupMetadata};

use super::BackupError;

#[cfg(not(target_arch = "wasm32"))]
mod file_import;

pub struct BackupImporter {
pub metadata: BackupMetadata,
decoded: Vec<u8>,
decoder: ZstdDecoder<Pin<Box<dyn AsyncBufRead + Send>>>,
pub metadata: BackupMetadata,

cipher: AesGcm<Aes256, typenum::U12, typenum::U16>,
nonce: GenericArray<u8, typenum::U12>,
}

impl BackupImporter {
async fn next_element(&mut self) -> Result<Option<BackupElement>, StorageError> {
pub(super) async fn load(
mut reader: Pin<Box<dyn AsyncBufRead + Send>>,
key: &[u8],
) -> Result<Self, DeviceSyncError> {
let mut nonce = [0; NONCE_SIZE];
reader.read_exact(&mut nonce).await?;

let mut importer = Self {
decoder: ZstdDecoder::new(reader),
decoded: vec![],
metadata: BackupMetadata::default(),

cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
nonce: GenericArray::from(nonce),
};

let Some(BackupElement {
element: Some(Element::Metadata(metadata)),
}) = importer.next_element().await?
else {
return Err(BackupError::MissingMetadata)?;
};

importer.metadata = metadata;
Ok(importer)
}

async fn next_element(&mut self) -> Result<Option<BackupElement>, DeviceSyncError> {
let mut buffer = [0u8; 1024];
let mut element_len = 0;
loop {
let amount = self.decoder.read(&mut buffer).await?;
self.decoded.extend_from_slice(&buffer[..amount]);
let decrypted = self.cipher.decrypt(&self.nonce, &buffer[..amount])?;
self.decoded.extend_from_slice(&decrypted);

if element_len == 0 && self.decoded.len() >= 4 {
let bytes = self.decoded.drain(..4).collect::<Vec<_>>();
Expand All @@ -49,16 +85,22 @@ impl BackupImporter {
Ok(None)
}

pub async fn insert(&mut self, provider: &XmtpOpenMlsProvider) -> Result<(), StorageError> {
pub async fn insert(&mut self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> {
provider
.transaction_async(|provider| async move {
let conn = provider.conn_ref();
while let Some(element) = self.next_element().await? {
while let Some(element) = self
.next_element()
.await
.map_err(|e| StorageError::Generic(e.to_string()))?
{
insert(element, conn)?;
}
Ok(())
})
.await
.map_err(|e| DeviceSyncError::Storage(e))?;
Ok(())
}

pub fn metadata(&self) -> &BackupMetadata {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
use super::BackupImporter;
use crate::groups::device_sync::{backup::BackupError, DeviceSyncError};
use async_compression::futures::bufread::ZstdDecoder;
use crate::groups::device_sync::DeviceSyncError;
use futures::io::BufReader;
use std::pin::Pin;
use tokio::io::AsyncRead;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use xmtp_proto::xmtp::device_sync::{backup_element::Element, BackupElement, BackupMetadata};
use std::{path::Path, pin::Pin};
use tokio_util::compat::TokioAsyncReadCompatExt;

impl BackupImporter {
pub async fn open(reader: impl AsyncRead + Send + 'static) -> Result<Self, DeviceSyncError> {
let reader = reader.compat();
let reader = BufReader::new(reader);
pub async fn from_file(path: impl AsRef<Path>, key: &[u8]) -> Result<Self, DeviceSyncError> {
let reader = tokio::fs::File::open(path.as_ref()).await?;
let reader = BufReader::new(reader.compat());
let reader = Box::pin(reader) as Pin<Box<_>>;
let decoder = ZstdDecoder::new(reader);

let mut importer = Self {
decoder,
decoded: vec![],
metadata: BackupMetadata::default(),
};

let Some(BackupElement {
element: Some(Element::Metadata(metadata)),
}) = importer.next_element().await?
else {
return Err(BackupError::MissingMetadata)?;
};

importer.metadata = metadata;
Ok(importer)
Self::load(reader, key).await
}
}
2 changes: 2 additions & 0 deletions xmtp_mls/src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum StorageError {
Duplicate(DuplicateItem),
#[error(transparent)]
OpenMlsStorage(#[from] SqlKeyStoreError),
#[error("generic:{0}")]
Generic(String),
}

#[derive(Error, Debug)]
Expand Down

0 comments on commit ed53c1e

Please sign in to comment.