Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move deserialization into PeerMessage to distinct variants correctly #538

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Don't update or announce an update in schema provider if a schema with this id exists already [#472](https://github.com/p2panda/aquadoggo/pull/472)
- Do nothing on document_view insertion conflicts [#474](https://github.com/p2panda/aquadoggo/pull/474)
- Only over-write `http_port` when cli arg is passed [#489](https://github.com/p2panda/aquadoggo/pull/489)
- Move deserialization into PeerMessage to distinct variants correctly [#538](https://github.com/p2panda/aquadoggo/pull/538)

### Open Sauce

Expand Down
251 changes: 249 additions & 2 deletions aquadoggo/src/network/peers/message.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use p2panda_rs::entry::{EncodedEntry, LogId, SeqNum};
use p2panda_rs::identity::PublicKey;
use p2panda_rs::operation::EncodedOperation;
use p2panda_rs::Validate;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};

use crate::replication::{AnnouncementMessage, SyncMessage};
use crate::replication::{
Announcement, AnnouncementMessage, Message, Mode, SchemaIdSet, SessionId, SyncMessage,
ANNOUNCE_TYPE, ENTRY_TYPE, HAVE_TYPE, SYNC_DONE_TYPE, SYNC_REQUEST_TYPE,
};

/// p2panda protocol messages which can be sent over the wire.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[serde(untagged)]
pub enum PeerMessage {
/// Announcement of peers about the schema ids they are interest in.
Expand All @@ -14,3 +22,242 @@
/// Replication status and data exchange.
SyncMessage(SyncMessage),
}

impl<'de> Deserialize<'de> for PeerMessage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct MessageVisitor;

impl<'de> Visitor<'de> for MessageVisitor {
type Value = PeerMessage;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("p2panda message")
}

Check warning on line 38 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L37-L38

Added lines #L37 - L38 were not covered by tests

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let message_type: u64 = seq
.next_element()?
.ok_or_else(|| serde::de::Error::custom("invalid message type"))?;

let message = match message_type {
ANNOUNCE_TYPE => {
let protocol_version: u64 = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing protocol version in announce message")
})?;

let timestamp: u64 = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing timestamp in announce message")
})?;

let supported_schema_ids: SchemaIdSet =
seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing target set in announce message")

Check warning on line 60 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L60

Added line #L60 was not covered by tests
})?;
supported_schema_ids.validate().map_err(|_| {
serde::de::Error::custom("invalid target set in announce message")

Check warning on line 63 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L63

Added line #L63 was not covered by tests
})?;

PeerMessage::Announce(AnnouncementMessage(
protocol_version,
Announcement {
supported_schema_ids,
timestamp,
},
))
}
SYNC_REQUEST_TYPE => {
let session_id: SessionId = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing session id in replication message")
})?;

let mode: Mode = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing mode in sync request message")

Check warning on line 80 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L80

Added line #L80 was not covered by tests
})?;

let target_set: SchemaIdSet = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing target set in sync request message")

Check warning on line 84 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L84

Added line #L84 was not covered by tests
})?;

target_set.validate().map_err(|_| {
serde::de::Error::custom("invalid target set in sync request message")

Check warning on line 88 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L88

Added line #L88 was not covered by tests
})?;

if target_set.is_empty() {
return Err(serde::de::Error::custom(
"empty target set in sync request message",
));
}

PeerMessage::SyncMessage(SyncMessage::new(
session_id,
Message::SyncRequest(mode, target_set),
))
}
ENTRY_TYPE => {
let session_id: SessionId = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing session id in replication message")
})?;

Check warning on line 105 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L104-L105

Added lines #L104 - L105 were not covered by tests

let entry_bytes: EncodedEntry = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing entry bytes in entry message")
})?;

Check warning on line 109 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L108-L109

Added lines #L108 - L109 were not covered by tests

let operation_bytes: Option<EncodedOperation> = seq.next_element()?;

PeerMessage::SyncMessage(SyncMessage::new(
session_id,
Message::Entry(entry_bytes, operation_bytes),
))

Check warning on line 116 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L114-L116

Added lines #L114 - L116 were not covered by tests
}
SYNC_DONE_TYPE => {
let session_id: SessionId = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing session id in replication message")
})?;

Check warning on line 121 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L120-L121

Added lines #L120 - L121 were not covered by tests

let live_mode: bool = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing live mode flag in sync done message")
})?;

Check warning on line 125 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L124-L125

Added lines #L124 - L125 were not covered by tests

PeerMessage::SyncMessage(SyncMessage::new(
session_id,
Message::SyncDone(live_mode),
))

Check warning on line 130 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L128-L130

Added lines #L128 - L130 were not covered by tests
}
HAVE_TYPE => {
let session_id: SessionId = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing session id in replication message")

Check warning on line 134 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L134

Added line #L134 was not covered by tests
})?;

let log_heights: Vec<(PublicKey, Vec<(LogId, SeqNum)>)> =
seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing log heights in have message")

Check warning on line 139 in aquadoggo/src/network/peers/message.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/peers/message.rs#L139

Added line #L139 was not covered by tests
})?;

PeerMessage::SyncMessage(SyncMessage::new(
session_id,
Message::Have(log_heights),
))
}
_ => return Err(serde::de::Error::custom("unknown message type")),
};

if let Some(items_left) = seq.size_hint() {
if items_left > 0 {
return Err(serde::de::Error::custom(
"too many fields for p2panda message",
));
}
};

Ok(message)
}
}

deserializer.deserialize_seq(MessageVisitor)
}
}

#[cfg(test)]
mod tests {
use ciborium::cbor;
use ciborium::value::{Error, Value};
use p2panda_rs::entry::{LogId, SeqNum};
use p2panda_rs::identity::PublicKey;
use p2panda_rs::serde::{deserialize_into, serialize_value};
use p2panda_rs::test_utils::fixtures::public_key;
use rstest::rstest;

use crate::replication::{
Announcement, AnnouncementMessage, Message, Mode, SchemaIdSet, SyncMessage,
};
use crate::test_utils::helpers::random_schema_id_set;

use super::PeerMessage;

#[rstest]
fn deserialize(
#[from(random_schema_id_set)] supported_schema_ids: SchemaIdSet,
#[from(random_schema_id_set)] target_set: SchemaIdSet,
public_key: PublicKey,
) {
assert_eq!(
deserialize_into::<PeerMessage>(&serialize_value(cbor!([
0,
1,
12345678,
supported_schema_ids
])))
.unwrap(),
PeerMessage::Announce(AnnouncementMessage::new(Announcement {
timestamp: 12345678,
supported_schema_ids,
}))
);

assert_eq!(
deserialize_into::<PeerMessage>(&serialize_value(cbor!([1, 12, 0, target_set])))
.unwrap(),
PeerMessage::SyncMessage(SyncMessage::new(
12,
Message::SyncRequest(Mode::LogHeight, target_set.clone())
))
);

let log_heights: Vec<(PublicKey, Vec<(LogId, SeqNum)>)> = vec![];
assert_eq!(
deserialize_into::<PeerMessage>(&serialize_value(cbor!([10, 12, log_heights])))
.unwrap(),
PeerMessage::SyncMessage(SyncMessage::new(12, Message::Have(vec![])))
);

assert_eq!(
deserialize_into::<PeerMessage>(&serialize_value(cbor!([
10,
12,
vec![(
// Convert explicitly to bytes as `cbor!` macro doesn't understand somehow that
// `PublicKey` serializes to a byte array
serde_bytes::Bytes::new(&public_key.to_bytes()),
vec![(LogId::default(), SeqNum::default())]
)]
])))
.unwrap(),
PeerMessage::SyncMessage(SyncMessage::new(
12,
Message::Have(vec![(
public_key,
vec![(LogId::default(), SeqNum::default())]
)])
))
);
}

#[rstest]
#[should_panic(expected = "invalid message type")]
#[case::invalid_message_type(cbor!([]))]
#[should_panic(expected = "missing protocol version in announce message")]
#[case::announce_missing_version(cbor!([0]))]
#[should_panic(expected = "missing timestamp in announce message")]
#[case::announce_missing_timestamp(cbor!([0, 122]))]
#[should_panic(expected = "too many fields for p2panda message")]
#[case::announce_too_many_fields(cbor!([0, 1, 0, ["schema_field_definition_v1"], "too much"]))]
#[should_panic(expected = "missing session id in replication message")]
#[case::sync_only_message_type(cbor!([1]))]
#[should_panic(expected = "empty target set in sync request")]
#[case::sync_only_message_type(cbor!([1, 0, 0, []]))]
#[should_panic(expected = "too many fields for p2panda message")]
#[case::sync_too_many_fields(cbor!([1, 0, 0, ["schema_field_definition_v1"], "too much"]))]
fn deserialize_invalid_messages(#[case] cbor: Result<Value, Error>) {
// Check the cbor is valid
assert!(cbor.is_ok());

// We unwrap here to cause a panic and then test for expected error stings
deserialize_into::<PeerMessage>(&serialize_value(cbor)).unwrap();
}
}
Loading