Skip to content

Commit

Permalink
feat: update sync procedure post-CRDTs (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
miraclx authored Nov 4, 2024
1 parent ec9f521 commit 4634284
Show file tree
Hide file tree
Showing 51 changed files with 1,612 additions and 836 deletions.
6 changes: 0 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions apps/kv-store/src/__private.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use calimero_sdk::borsh::{from_slice, to_vec};
use calimero_sdk::{app, env};
use calimero_storage::collections::unordered_map::{Entry, UnorderedMap};
use calimero_storage::entities::Data;
use calimero_storage::integration::Comparison;
use calimero_storage::interface::{Action, Interface, StorageError};
use calimero_storage::sync::{self, SyncArtifact};

use crate::KvStore;

#[app::logic]
impl KvStore {
pub fn __calimero_sync_next() -> Result<(), StorageError> {
let args = env::input().expect("fatal: missing input");

let artifact =
from_slice::<SyncArtifact>(&args).map_err(StorageError::DeserializationError)?;

let this = Interface::root::<Self>()?;

match artifact {
SyncArtifact::Actions(actions) => {
for action in actions {
let _ignored = match action {
Action::Add { type_id, .. } | Action::Update { type_id, .. } => {
match type_id {
1 => Interface::apply_action::<KvStore>(action)?,
254 => Interface::apply_action::<Entry<String, String>>(action)?,
255 => {
Interface::apply_action::<UnorderedMap<String, String>>(action)?
}
_ => return Err(StorageError::UnknownType(type_id)),
}
}
Action::Delete { .. } => {
todo!("how are we supposed to identify the entity to delete???????")
}
Action::Compare { .. } => {
todo!("how are we supposed to compare when `Comparison` needs `type_id`???????")
}
};
}

if let Some(this) = this {
return Interface::commit_root(this);
}
}
SyncArtifact::Comparisons(comparisons) => {
if comparisons.is_empty() {
sync::push_comparison(Comparison {
type_id: <Self as Data>::type_id(),
data: this
.as_ref()
.map(to_vec)
.transpose()
.map_err(StorageError::SerializationError)?,
comparison_data: Interface::generate_comparison_data(this.as_ref())?,
});
}

for Comparison {
type_id,
data,
comparison_data,
} in comparisons
{
match type_id {
1 => Interface::compare_affective::<KvStore>(data, comparison_data)?,
254 => Interface::compare_affective::<Entry<String, String>>(
data,
comparison_data,
)?,
255 => Interface::compare_affective::<UnorderedMap<String, String>>(
data,
comparison_data,
)?,
_ => return Err(StorageError::UnknownType(type_id)),
};
}
}
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions apps/kv-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use calimero_storage::collections::UnorderedMap;
use calimero_storage::entities::Element;
use calimero_storage::AtomicUnit;

mod __private;

#[app::state(emits = for<'a> Event<'a>)]
#[derive(AtomicUnit, Clone, Debug, PartialEq, PartialOrd)]
#[root]
Expand Down
41 changes: 35 additions & 6 deletions crates/config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use core::time::Duration;
use std::fs::{read_to_string, write};

use calimero_context::config::ContextConfig;
use calimero_network::config::{BootstrapConfig, CatchupConfig, DiscoveryConfig, SwarmConfig};
use calimero_network::config::{BootstrapConfig, DiscoveryConfig, SwarmConfig};
use calimero_server::admin::service::AdminConfig;
use calimero_server::jsonrpc::JsonRpcConfig;
use calimero_server::ws::WsConfig;
Expand All @@ -24,13 +25,23 @@ pub struct ConfigFile {
#[serde(flatten)]
pub network: NetworkConfig,

pub sync: SyncConfig,

pub datastore: DataStoreConfig,

pub blobstore: BlobStoreConfig,

pub context: ContextConfig,
}

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct SyncConfig {
#[serde(rename = "timeout_ms", with = "serde_duration")]
pub timeout: Duration,
#[serde(rename = "interval_ms", with = "serde_duration")]
pub interval: Duration,
}

#[derive(Debug, Deserialize, Serialize)]
#[non_exhaustive]
pub struct NetworkConfig {
Expand All @@ -43,8 +54,6 @@ pub struct NetworkConfig {

#[serde(default)]
pub discovery: DiscoveryConfig,

pub catchup: CatchupConfig,
}

impl NetworkConfig {
Expand All @@ -54,14 +63,12 @@ impl NetworkConfig {
bootstrap: BootstrapConfig,
discovery: DiscoveryConfig,
server: ServerConfig,
catchup: CatchupConfig,
) -> Self {
Self {
swarm,
server,
bootstrap,
discovery,
catchup,
}
}
}
Expand Down Expand Up @@ -128,14 +135,16 @@ impl ConfigFile {
#[must_use]
pub const fn new(
identity: libp2p_identity::Keypair,
network: NetworkConfig,
sync: SyncConfig,
datastore: DataStoreConfig,
blobstore: BlobStoreConfig,
context: ContextConfig,
network: NetworkConfig,
) -> Self {
Self {
identity,
network,
sync,
datastore,
blobstore,
context,
Expand Down Expand Up @@ -174,6 +183,26 @@ impl ConfigFile {
}
}

mod serde_duration {
use core::time::Duration;

use serde::{Deserialize, Deserializer, Serializer};

pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(duration.as_millis() as u64)
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
u64::deserialize(deserializer).map(Duration::from_millis)
}
}

pub mod serde_identity {
use core::fmt::{self, Formatter};

Expand Down
6 changes: 5 additions & 1 deletion crates/context/config/src/client/env/config/mutate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ impl<'a> Method<Near> for Mutate<'a> {
}

fn decode(response: Vec<u8>) -> eyre::Result<Self::Returns> {
serde_json::from_slice(&response).map_err(Into::into)
if !response.is_empty() {
eyre::bail!("unexpected response {:?}", response);
}

Ok(())
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/context/config/src/client/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl Transport for RelayerTransport {
.send()
.await?;

// todo! check response.status code

response.bytes().await.map(Into::into)
}
}
Loading

0 comments on commit 4634284

Please sign in to comment.