Skip to content

Commit

Permalink
feat(iroh-sync): read only replicas (#1770)
Browse files Browse the repository at this point in the history
## Description

Closes #1767 
Updates sync, the sync db, commands and tickets to enable read only
replicas

## Notes  & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.

---------

Co-authored-by: Franz Heinzmann (Frando) <[email protected]>
  • Loading branch information
divagant-martian and Frando authored Nov 6, 2023
1 parent c36cc6d commit c1ebea8
Show file tree
Hide file tree
Showing 15 changed files with 522 additions and 196 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
iroh-bytes = { version = "0.9.0", path = "../iroh-bytes" }
iroh-metrics = { version = "0.9.0", path = "../iroh-metrics", optional = true }
num_enum = "0.7"
once_cell = "1.18.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
Expand Down
54 changes: 34 additions & 20 deletions iroh-sync/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use tracing::{debug, error, error_span, trace, warn};

use crate::{
ranger::Message,
store::{self, GetFilter},
Author, AuthorHeads, AuthorId, ContentStatus, ContentStatusCallback, Event, Namespace,
NamespaceId, PeerIdBytes, Replica, SignedEntry, SyncOutcome,
store::{self, GetFilter, ImportNamespaceOutcome},
Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, SignedEntry,
SyncOutcome,
};

#[derive(derive_more::Debug, derive_more::Display)]
Expand All @@ -30,7 +31,7 @@ enum Action {
},
#[display("NewReplica")]
ImportNamespace {
namespace: Namespace,
capability: Capability,
#[debug("reply")]
reply: oneshot::Sender<Result<NamespaceId>>,
},
Expand All @@ -42,7 +43,7 @@ enum Action {
#[display("ListReplicas")]
ListReplicas {
#[debug("reply")]
reply: flume::Sender<Result<NamespaceId>>,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
},
#[display("Replica({}, {})", _0.fmt_short(), _1)]
Replica(NamespaceId, ReplicaAction),
Expand Down Expand Up @@ -134,7 +135,7 @@ enum ReplicaAction {
reply: oneshot::Sender<Result<()>>,
},
ExportSecretKey {
reply: oneshot::Sender<Result<Namespace>>,
reply: oneshot::Sender<Result<NamespaceSecret>>,
},
HasNewsForUs {
heads: AuthorHeads,
Expand Down Expand Up @@ -396,7 +397,7 @@ impl SyncHandle {
rx.await?
}

pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<Namespace> {
pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
let (reply, rx) = oneshot::channel();
let action = ReplicaAction::ExportSecretKey { reply };
self.send_replica(namespace, action).await?;
Expand All @@ -418,7 +419,10 @@ impl SyncHandle {
self.send(Action::ListAuthors { reply }).await
}

pub async fn list_replicas(&self, reply: flume::Sender<Result<NamespaceId>>) -> Result<()> {
pub async fn list_replicas(
&self,
reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
) -> Result<()> {
self.send(Action::ListReplicas { reply }).await
}

Expand All @@ -428,9 +432,9 @@ impl SyncHandle {
rx.await?
}

pub async fn import_namespace(&self, namespace: Namespace) -> Result<NamespaceId> {
pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
let (reply, rx) = oneshot::channel();
self.send(Action::ImportNamespace { namespace, reply })
self.send(Action::ImportNamespace { capability, reply })
.await?;
rx.await?
}
Expand Down Expand Up @@ -481,10 +485,16 @@ impl<S: store::Store> Actor<S> {
let id = author.id();
send_reply(reply, self.store.import_author(author).map(|_| id))
}
Action::ImportNamespace { namespace, reply } => {
let id = namespace.id();
send_reply(reply, self.store.import_namespace(namespace).map(|_| id))
}
Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
let id = capability.id();
let outcome = this.store.import_namespace(capability.clone())?;
if let ImportNamespaceOutcome::Upgraded = outcome {
if let Ok(replica) = this.states.replica(&id) {
replica.merge_capability(capability)?;
}
}
Ok(id)
}),
Action::ListAuthors { reply } => iter_to_channel(
reply,
self.store
Expand Down Expand Up @@ -603,7 +613,10 @@ impl<S: store::Store> Actor<S> {
this.store.remove_replica(&namespace)
}),
ReplicaAction::ExportSecretKey { reply } => {
let res = self.states.replica(&namespace).map(|r| r.secret_key());
let res = self
.states
.replica(&namespace)
.and_then(|r| Ok(r.secret_key()?.clone()));
send_reply(reply, res)
}
ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
Expand Down Expand Up @@ -787,12 +800,13 @@ mod tests {
async fn open_close() -> anyhow::Result<()> {
let store = store::memory::Store::default();
let sync = SyncHandle::spawn(store, None, "foo".into());
let namespace = Namespace::new(&mut rand::rngs::OsRng {});
sync.import_namespace(namespace.clone()).await?;
sync.open(namespace.id(), Default::default()).await?;
let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;
let (tx, rx) = flume::bounded(10);
sync.subscribe(namespace.id(), tx).await?;
sync.close(namespace.id()).await?;
sync.subscribe(id, tx).await?;
sync.close(id).await?;
assert!(rx.recv_async().await.is_err());
Ok(())
}
Expand Down
46 changes: 22 additions & 24 deletions iroh-sync/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,26 @@ impl AuthorPublicKey {
/// Namespace key of a [`crate::Replica`].
///
/// Holders of this key can insert new entries into a [`crate::Replica`].
/// Internally, a [`Namespace`] is a [`SigningKey`] which is used to sign entries.
/// Internally, a [`NamespaceSecret`] is a [`SigningKey`] which is used to sign entries.
#[derive(Clone, Serialize, Deserialize)]
pub struct Namespace {
pub struct NamespaceSecret {
signing_key: SigningKey,
}

impl Namespace {
/// Create a new [`Namespace`] with a random key.
impl NamespaceSecret {
/// Create a new [`NamespaceSecret`] with a random key.
pub fn new<R: CryptoRngCore + ?Sized>(rng: &mut R) -> Self {
let signing_key = SigningKey::generate(rng);

Namespace { signing_key }
NamespaceSecret { signing_key }
}

/// Create a [`Namespace`] from a byte array.
/// Create a [`NamespaceSecret`] from a byte array.
pub fn from_bytes(bytes: &[u8; 32]) -> Self {
SigningKey::from_bytes(bytes).into()
}

/// Returns the [`Namespace`] byte representation.
/// Returns the [`NamespaceSecret`] byte representation.
pub fn to_bytes(&self) -> [u8; 32] {
self.signing_key.to_bytes()
}
Expand All @@ -119,26 +119,24 @@ impl Namespace {
NamespaceId::from(self.public_key())
}

/// Sign a message with this [`Namespace`] key.
/// Sign a message with this [`NamespaceSecret`] key.
pub fn sign(&self, msg: &[u8]) -> Signature {
self.signing_key.sign(msg)
}

/// Strictly verify a signature on a message with this [`Namespace`]'s public key.
/// Strictly verify a signature on a message with this [`NamespaceSecret`]'s public key.
pub fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), SignatureError> {
self.signing_key.verify_strict(msg, signature)
}
}

/// Identifier for a [`Namespace`]
///
/// This is the corresponding [`VerifyingKey`] for a [`Namespace`]. It is used as an identifier, and can
/// be used to verify [`Signature`]s.
/// The corresponding [`VerifyingKey`] for a [`NamespaceSecret`].
/// It is used as an identifier, and can be used to verify [`Signature`]s.
#[derive(Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash, derive_more::From)]
pub struct NamespacePublicKey(VerifyingKey);

impl NamespacePublicKey {
/// Verify that a signature matches the `msg` bytes and was created with the [`Namespace`]
/// Verify that a signature matches the `msg` bytes and was created with the [`NamespaceSecret`]
/// that corresponds to this [`NamespaceId`].
pub fn verify(&self, msg: &[u8], signature: &Signature) -> Result<(), SignatureError> {
self.0.verify_strict(msg, signature)
Expand All @@ -165,7 +163,7 @@ impl fmt::Display for Author {
}
}

impl fmt::Display for Namespace {
impl fmt::Display for NamespaceSecret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", base32::fmt(self.to_bytes()))
}
Expand Down Expand Up @@ -195,7 +193,7 @@ impl fmt::Display for NamespaceId {
}
}

impl fmt::Debug for Namespace {
impl fmt::Debug for NamespaceSecret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Namespace({})", self)
}
Expand Down Expand Up @@ -239,7 +237,7 @@ impl FromStr for Author {
}
}

impl FromStr for Namespace {
impl FromStr for NamespaceSecret {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand Down Expand Up @@ -269,7 +267,7 @@ impl From<SigningKey> for Author {
}
}

impl From<SigningKey> for Namespace {
impl From<SigningKey> for NamespaceSecret {
fn from(signing_key: SigningKey) -> Self {
Self { signing_key }
}
Expand Down Expand Up @@ -299,8 +297,8 @@ impl Ord for AuthorPublicKey {
}
}

impl From<Namespace> for NamespacePublicKey {
fn from(value: Namespace) -> Self {
impl From<NamespaceSecret> for NamespacePublicKey {
fn from(value: NamespaceSecret) -> Self {
value.public_key()
}
}
Expand All @@ -311,8 +309,8 @@ impl From<Author> for AuthorPublicKey {
}
}

impl From<&Namespace> for NamespacePublicKey {
fn from(value: &Namespace) -> Self {
impl From<&NamespaceSecret> for NamespacePublicKey {
fn from(value: &NamespaceSecret) -> Self {
value.public_key()
}
}
Expand Down Expand Up @@ -506,8 +504,8 @@ impl From<Author> for AuthorId {
value.id()
}
}
impl From<Namespace> for NamespaceId {
fn from(value: Namespace) -> Self {
impl From<NamespaceSecret> for NamespaceId {
fn from(value: NamespaceSecret) -> Self {
value.id()
}
}
Expand Down
4 changes: 2 additions & 2 deletions iroh-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
//!
//! All entries in a replica are signed with two keypairs:
//!
//! * The [Namespace] key, as a token of write capability. The public key is the [NamespaceId], which
//! also serves as the unique identifier for a replica.
//! * The [`NamespaceSecret`] key, as a token of write capability. The public key is the
//! [`NamespaceId`], which also serves as the unique identifier for a replica.
//! * The [Author] key, as a proof of authorship. Any number of authors may be created, and
//! their semantic meaning is application-specific. The public key of an author is the [AuthorId].
//!
Expand Down
16 changes: 8 additions & 8 deletions iroh-sync/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl BobState {
mod tests {
use crate::{
actor::OpenOpts,
keys::{AuthorId, Namespace},
keys::{AuthorId, NamespaceSecret},
store::{self, GetFilter, Store},
};
use anyhow::Result;
Expand All @@ -315,7 +315,7 @@ mod tests {
// For now uses same author on both sides.
let author = alice_store.new_author(&mut rng).unwrap();

let namespace = Namespace::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);

let mut alice_replica = alice_store.new_replica(namespace.clone()).unwrap();
alice_replica
Expand All @@ -330,7 +330,7 @@ mod tests {

assert_eq!(
bob_store
.get_many(bob_replica.namespace(), GetFilter::All)
.get_many(bob_replica.id(), GetFilter::All)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand All @@ -339,7 +339,7 @@ mod tests {
);
assert_eq!(
alice_store
.get_many(alice_replica.namespace(), GetFilter::All)
.get_many(alice_replica.id(), GetFilter::All)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
Expand Down Expand Up @@ -491,7 +491,7 @@ mod tests {

let alice_node_pubkey = SecretKey::generate_with_rng(&mut rng).public();
let bob_node_pubkey = SecretKey::generate_with_rng(&mut rng).public();
let namespace = Namespace::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);

let mut all_messages = vec![];

Expand Down Expand Up @@ -634,7 +634,7 @@ mod tests {
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(99);
let alice_node_pubkey = SecretKey::generate_with_rng(&mut rng).public();
let bob_node_pubkey = SecretKey::generate_with_rng(&mut rng).public();
let namespace = Namespace::new(&mut rng);
let namespace = NamespaceSecret::new(&mut rng);
let mut alice_replica = alice_store.new_replica(namespace.clone()).unwrap();
let mut bob_replica = bob_store.new_replica(namespace.clone()).unwrap();

Expand All @@ -654,12 +654,12 @@ mod tests {
.unwrap();

assert_eq!(
get_messages(&alice_store, alice_replica.namespace()),
get_messages(&alice_store, alice_replica.id()),
vec![(author.id(), key.clone(), hash_alice)]
);

assert_eq!(
get_messages(&bob_store, bob_replica.namespace()),
get_messages(&bob_store, bob_replica.id()),
vec![(author.id(), key.clone(), hash_bob)]
);

Expand Down
Loading

0 comments on commit c1ebea8

Please sign in to comment.