Skip to content

Commit

Permalink
test(gossipsub): CI testing with nodes subscribing to gossipsub topic…
Browse files Browse the repository at this point in the history
…s and publishing messages
  • Loading branch information
bochaco committed Sep 21, 2023
1 parent a0d4f29 commit 915c623
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 19 deletions.
57 changes: 46 additions & 11 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,51 @@ jobs:
log_file_prefix: safe_test_logs_e2e
platform: ${{ matrix.os }}

gossipsub:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Gossipsub E2E tests
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
steps:
- uses: actions/checkout@v3

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
- uses: Swatinem/rust-cache@v2

- name: Build node
run: cargo build --release --bin safenode
timeout-minutes: 30

- name: Build gossipsub testing executable
run: cargo test --release -p sn_node --features=local-discovery --test msgs_over_gossipsub --no-run
timeout-minutes: 30

- name: Start a local network
uses: maidsafe/sn-local-testnet-action@main
with:
action: start
interval: 2000
node-path: target/release/safenode
faucet-path: target/release/faucet
platform: ${{ matrix.os }}

- name: Gossipsub - nodes to subscribe to topics, and publish messages
run: cargo test --release -p sn_node --features local-discovery msgs_over_gossipsub -- --nocapture
timeout-minutes: 20

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
with:
action: stop
log_file_prefix: safe_test_logs_e2e
platform: ${{ matrix.os }}

spend_test:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: spend tests against network
Expand Down Expand Up @@ -337,17 +382,7 @@ jobs:
echo "SAFE_PEERS has been set to $SAFE_PEERS"
fi
- name: Chunks data integrity during nodes churn - Linux/MacOS
if: matrix.os != 'windows-latest'
run: cargo test --release -p sn_node --features="local-discovery" --test data_with_churn -- --nocapture
env:
TEST_DURATION_MINS: 5
TEST_TOTAL_CHURN_CYCLES: 15
SN_LOG: "all"
timeout-minutes: 30

- name: Chunks data integrity during nodes churn - Windows
if: matrix.os == 'windows-latest'
- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --features="local-discovery" --test data_with_churn -- --nocapture
env:
TEST_DURATION_MINS: 5
Expand Down
45 changes: 45 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,51 @@ jobs:
SLACK_MESSAGE: "Please check the logs for the run at ${{ env.WORKFLOW_URL }}/${{ github.run_id }}"
SLACK_TITLE: "Nightly Unit Test Run Failed"

gossipsub:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Gossipsub E2E tests
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
steps:
- uses: actions/checkout@v3

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
- uses: Swatinem/rust-cache@v2

- name: Build node
run: cargo build --release --bin safenode
timeout-minutes: 30

- name: Build gossipsub testing executable
run: cargo test --release -p sn_node --features=local-discovery --test msgs_over_gossipsub --no-run
timeout-minutes: 30

- name: Start a local network
uses: maidsafe/sn-local-testnet-action@main
with:
action: start
interval: 2000
node-path: target/release/safenode
faucet-path: target/release/faucet
platform: ${{ matrix.os }}

- name: Gossipsub - nodes to subscribe to topics, and publish messages
run: cargo test --release -p sn_node --features local-discovery msgs_over_gossipsub -- --nocapture
timeout-minutes: 20

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
with:
action: stop
log_file_prefix: safe_test_logs_e2e
platform: ${{ matrix.os }}

spend_test:
name: spend tests against network
runs-on: ${{ matrix.os }}
Expand Down
123 changes: 123 additions & 0 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod common;

use common::safenode_proto::{
safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest,
NodeEventsRequest,
};
use sn_node::NodeEvent;

use eyre::Result;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tonic::Request;

const NODE_COUNT: u8 = 25;

#[tokio::test(flavor = "multi_thread")]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

for node_index in 1..NODE_COUNT + 1 {
// request current node to subscribe to a fresh new topic
addr.set_port(12000 + node_index as u16);
let topic = format!("TestTopic-{node_index}");
node_subscribe_to_topic(addr, topic.clone()).await?;

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(10000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
}
})
.await;

Ok::<u8, eyre::Error>(count)
});

tokio::time::sleep(Duration::from_millis(1000)).await;

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(addr, topic).await?;

let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
node_index
);
}

Ok(())
}

async fn node_subscribe_to_topic(addr: SocketAddr, topic: String) -> Result<()> {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;

// subscribe to given topic
let _response = rpc_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest { topic }))
.await?;

Ok(())
}

async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
for node_index in 1..NODE_COUNT + 1 {
addr.set_port(12000 + node_index as u16);
if addr != filter_addr {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
println!("Node {node_index} to publish on {topic} message: {msg}");

let _response = rpc_client
.publish_on_topic(Request::new(GossipsubPublishRequest {
topic: topic.clone(),
msg: msg.into(),
}))
.await?;
}
}

Ok(())
}
39 changes: 31 additions & 8 deletions sn_protocol/src/safenode_proto/req_resp_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,43 @@ message NodeInfoResponse {
uint64 uptime_secs = 5;
}

// Information about how this node's connections to the network and peers
message NetworkInfoRequest {}

message NetworkInfoResponse {
repeated bytes connected_peers = 1;
repeated string listeners = 2;
}

// Stream of node events
message NodeEventsRequest {}

message NodeEvent {
string event = 1;
bytes event = 1;
}

// Addresses of all the Records stored by the node
message RecordAddressesRequest {}

message RecordAddressesResponse {
repeated bytes addresses = 1;
}

// Subsribe to a gossipsub topic
message GossipsubSubscribeRequest {
string topic = 1;
}

message GossipsubSubscribeResponse {}

// Publish a msg on a gossipsub topic
message GossipsubPublishRequest {
string topic = 1;
bytes msg = 2;
}

message GossipsubPublishResponse {}

// Stop the safenode app
message StopRequest {
uint64 delay_millis = 1;
Expand All @@ -51,10 +81,3 @@ message UpdateRequest {

message UpdateResponse {}

// Information about how this node's connections to the network and peers
message NetworkInfoRequest {}

message NetworkInfoResponse {
repeated bytes connected_peers = 1;
repeated string listeners = 2;
}
9 changes: 9 additions & 0 deletions sn_protocol/src/safenode_proto/safenode.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ service SafeNode {
// Returns a stream of events as triggered by this node
rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent);

// Returns the Addresses of all the Records stored by this node
rpc RecordAddresses (RecordAddressesRequest) returns (RecordAddressesResponse);

// Subscribe to a Gossipsub topic
rpc SubscribeToTopic (GossipsubSubscribeRequest) returns (GossipsubSubscribeResponse);

// Publish a msg on a Gossipsub topic
rpc PublishOnTopic (GossipsubPublishRequest) returns (GossipsubPublishResponse);

// Stop the execution of this node
rpc Stop (StopRequest) returns (StopResponse);

Expand Down

0 comments on commit 915c623

Please sign in to comment.