Skip to content

Commit

Permalink
Add new cluster_id config option
Browse files Browse the repository at this point in the history
Make the cluster_id settable to avoid several clusters merging accidentally chitchat#36

Add the optional cluster_id config param, with a default value of "quickwit-test-cluster"
Add a new unit test to confirm that four nodes with different cluster names settle correctly into 2x2
How was this PR tested?
Only unit test for now, we should test it a bit more manually before the v0.3 release

Closes #1283
  • Loading branch information
xvello authored Apr 20, 2022
1 parent 3520a0d commit 7205346
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/reference/quickwit-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ A commented example is accessible here: [quickwit.yaml](https://github.com/quick
| --- | --- | --- |
| version | Config file version. 0 is the only available value. | |
| node_id | Node ID of the instance (searcher or indexer). It must be unique in your cluster. If not set, a random ID is generated at each boot. | |
| cluster_id | Unique Id for the cluster this node will be joining. Should be set to a unique name to ensure clusters do not accidentally merge together. | "quickwit-test-cluster" |
| listen_address | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | 127.0.0.1 |
| rest_listen_port | The port which to listen for HTTP REST API. | 7280 |
| peer_seeds | List of IP addresses used by gossip for bootstrapping new nodes joining a cluster. This list may contain the current node address, and it does not need to be exhaustive on every node. | |
Expand Down
113 changes: 97 additions & 16 deletions quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl Cluster {
pub fn new(
me: Member,
listen_addr: SocketAddr,
cluster_id: String,
grpc_addr: SocketAddr,
seed_nodes: &[String],
failure_detector_config: FailureDetectorConfig,
Expand All @@ -130,8 +131,7 @@ impl Cluster {
NodeId::from(me.clone()),
seed_nodes,
listen_addr.to_string(),
// make `cluster_id` configurable
"quickwit-cluster-id".to_string(),
cluster_id,
vec![(GRPC_ADDRESS_KEY, grpc_addr)],
failure_detector_config,
);
Expand Down Expand Up @@ -297,6 +297,7 @@ pub fn grpc_addr_from_listen_addr_for_test(listen_addr: SocketAddr) -> SocketAdd

pub fn create_cluster_for_test_with_id(
peer_uuid: String,
cluster_id: String,
seeds: &[String],
) -> anyhow::Result<Cluster> {
let listen_addr = SocketAddr::new(
Expand All @@ -307,6 +308,7 @@ pub fn create_cluster_for_test_with_id(
let cluster = Cluster::new(
Member::new(peer_uuid, 1, listen_addr),
listen_addr,
cluster_id,
grpc_addr_from_listen_addr_for_test(listen_addr),
seeds,
failure_detector_config,
Expand All @@ -326,7 +328,7 @@ fn create_failure_detector_config_for_test() -> FailureDetectorConfig {
/// Creates a local cluster listening on a random port.
pub fn create_cluster_for_test(seeds: &[String]) -> anyhow::Result<Cluster> {
let peer_uuid = Uuid::new_v4().to_string();
let cluster = create_cluster_for_test_with_id(peer_uuid, seeds)?;
let cluster = create_cluster_for_test_with_id(peer_uuid, "test-cluster".to_string(), seeds)?;
Ok(cluster)
}

Expand Down Expand Up @@ -400,12 +402,78 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_cluster_id_isolation() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();

let cluster1a =
create_cluster_for_test_with_id("node_1a".to_string(), "cluster1".to_string(), &[])?;
let cluster2a = create_cluster_for_test_with_id(
"node_2a".to_string(),
"cluster2".to_string(),
&[cluster1a.listen_addr.to_string()],
)?;

let cluster1b = create_cluster_for_test_with_id(
"node_1b".to_string(),
"cluster1".to_string(),
&[
cluster1a.listen_addr.to_string(),
cluster2a.listen_addr.to_string(),
],
)?;
let cluster2b = create_cluster_for_test_with_id(
"node_2b".to_string(),
"cluster2".to_string(),
&[
cluster1a.listen_addr.to_string(),
cluster2a.listen_addr.to_string(),
],
)?;

let wait_secs = Duration::from_secs(10);
for cluster in [&cluster1a, &cluster2a, &cluster1b, &cluster2b] {
cluster
.wait_for_members(|members| members.len() == 2, wait_secs)
.await
.unwrap();
}

let members_a: Vec<SocketAddr> = cluster1a
.members()
.iter()
.map(|member| member.gossip_public_address)
.sorted()
.collect();
let mut expected_members_a = vec![cluster1a.listen_addr, cluster1b.listen_addr];
expected_members_a.sort();
assert_eq!(members_a, expected_members_a);

let members_b: Vec<SocketAddr> = cluster2a
.members()
.iter()
.map(|member| member.gossip_public_address)
.sorted()
.collect();
let mut expected_members_b = vec![cluster2a.listen_addr, cluster2b.listen_addr];
expected_members_b.sort();
assert_eq!(members_b, expected_members_b);

Ok(())
}

#[tokio::test]
async fn test_cluster_rejoin_with_different_id_issue_1018() -> anyhow::Result<()> {
let cluster_id = "unified-cluster";
quickwit_common::setup_logging_for_tests();
let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?;
let cluster1 =
create_cluster_for_test_with_id("node1".to_string(), cluster_id.to_string(), &[])?;
let node_1 = cluster1.listen_addr.to_string();
let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?;
let cluster2 = create_cluster_for_test_with_id(
"node2".to_string(),
cluster_id.to_string(),
&[node_1.clone()],
)?;

let wait_secs = Duration::from_secs(10);

Expand Down Expand Up @@ -441,6 +509,7 @@ mod tests {
let cluster2 = Cluster::new(
Member::new("newid".to_string(), 1, cluster2_listen_addr),
cluster2_listen_addr,
cluster_id.to_string(),
grpc_addr,
&[node_1],
create_failure_detector_config_for_test(),
Expand All @@ -456,23 +525,33 @@ mod tests {
assert!(!cluster1
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster2"));
.any(|member| (*member).node_unique_id == "node2"));
assert!(!cluster2
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster2"));
.any(|member| (*member).node_unique_id == "node2"));

Ok(())
}

#[tokio::test]
async fn test_cluster_rejoin_with_different_id_3_nodes_issue_1018() -> anyhow::Result<()> {
let cluster_id = "three-nodes-cluster";
quickwit_common::setup_logging_for_tests();
let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?;
let cluster1 =
create_cluster_for_test_with_id("node1".to_string(), cluster_id.to_string(), &[])?;
let node_1 = cluster1.listen_addr.to_string();
let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?;
let cluster2 = create_cluster_for_test_with_id(
"node2".to_string(),
cluster_id.to_string(),
&[node_1.clone()],
)?;
let node_2 = cluster2.listen_addr.to_string();
let cluster3 = create_cluster_for_test_with_id("cluster3".to_string(), &[node_2])?;
let cluster3 = create_cluster_for_test_with_id(
"node3".to_string(),
cluster_id.to_string(),
&[node_2],
)?;

let wait_secs = Duration::from_secs(15);

Expand Down Expand Up @@ -514,6 +593,7 @@ mod tests {
let cluster2 = Cluster::new(
Member::new("newid".to_string(), 1, cluster2_listen_addr),
cluster2_listen_addr,
cluster_id.to_string(),
grpc_addr,
&[node_1],
create_failure_detector_config_for_test(),
Expand All @@ -523,6 +603,7 @@ mod tests {
let cluster3 = Cluster::new(
Member::new("newid2".to_string(), 1, cluster3_listen_addr),
cluster3_listen_addr,
cluster_id.to_string(),
grpc_addr,
&[node_2],
create_failure_detector_config_for_test(),
Expand All @@ -532,28 +613,28 @@ mod tests {
assert!(!cluster1
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster2"));
.any(|member| (*member).node_unique_id == "node2"));
assert!(!cluster2
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster2"));
.any(|member| (*member).node_unique_id == "node2"));
assert!(!cluster3
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster2"));
.any(|member| (*member).node_unique_id == "node2"));

assert!(!cluster1
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster3"));
.any(|member| (*member).node_unique_id == "node3"));
assert!(!cluster2
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster3"));
.any(|member| (*member).node_unique_id == "node3"));
assert!(!cluster2
.members()
.iter()
.any(|member| (*member).node_unique_id == "cluster3"));
.any(|member| (*member).node_unique_id == "node3"));

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn start_cluster_service(
let cluster = Arc::new(Cluster::new(
member,
quickwit_config.gossip_socket_addr()?,
quickwit_config.cluster_id.clone(),
quickwit_config.grpc_socket_addr()?,
&seed_nodes,
FailureDetectorConfig::default(),
Expand Down
1 change: 1 addition & 0 deletions quickwit-config/resources/tests/config/quickwit.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Comments are supported.
{
"version": 0,
"cluster_id": "quickwit-cluster",
"node_id": "my-unique-node-id",
"metastore_uri": "postgres://username:password@host:port/db",
"data_dir": "/opt/quickwit/data",
Expand Down
1 change: 1 addition & 0 deletions quickwit-config/resources/tests/config/quickwit.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version = 0
cluster_id = "quickwit-cluster"
node_id = "my-unique-node-id"
listen_address = "0.0.0.0"
rest_listen_port = 1111
Expand Down
1 change: 1 addition & 0 deletions quickwit-config/resources/tests/config/quickwit.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: 0
cluster_id: quickwit-cluster
node_id: my-unique-node-id
listen_address: 0.0.0.0
rest_listen_port: 1111
Expand Down
10 changes: 10 additions & 0 deletions quickwit-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ fn default_metastore_and_index_root_uri(data_dir_path: &Path) -> String {
.to_string()
}

fn default_cluster_id() -> String {
"quickwit-test-cluster".to_string()
}

fn default_node_id() -> String {
new_coolid("node")
}
Expand Down Expand Up @@ -154,6 +158,8 @@ pub struct StorageConfig {
#[serde(deny_unknown_fields)]
pub struct QuickwitConfig {
pub version: usize,
#[serde(default = "default_cluster_id")]
pub cluster_id: String,
#[serde(default = "default_node_id")]
pub node_id: String,
#[serde(default = "default_listen_address")]
Expand Down Expand Up @@ -347,6 +353,7 @@ impl Default for QuickwitConfig {
gossip_listen_port: None,
grpc_listen_port: None,
peer_seeds: Vec::new(),
cluster_id: default_cluster_id(),
node_id: default_node_id(),
metastore_uri: None,
default_index_root_uri: None,
Expand All @@ -364,6 +371,7 @@ impl std::fmt::Debug for QuickwitConfig {
.debug_struct("QuickwitConfig")
.field("version", &self.version)
.field("node_id", &self.node_id)
.field("cluster_id", &self.cluster_id)
.field("listen_address", &self.listen_address)
.field("rest_listen_port", &self.rest_listen_port)
.field("gossip_listen_port", &self.gossip_listen_port())
Expand Down Expand Up @@ -408,6 +416,7 @@ mod tests {
let file = std::fs::read_to_string(&config_filepath).unwrap();
let config = QuickwitConfig::from_uri(&config_uri, file.as_bytes()).await?;
assert_eq!(config.version, 0);
assert_eq!(config.cluster_id, "quickwit-cluster");
assert_eq!(config.listen_address, "0.0.0.0".to_string());
assert_eq!(config.rest_listen_port, 1111);
assert_eq!(
Expand Down Expand Up @@ -477,6 +486,7 @@ mod tests {
"#;
let config = serde_yaml::from_str::<QuickwitConfig>(config_yaml).unwrap();
assert_eq!(config.version, 0);
assert_eq!(config.cluster_id, "quickwit-test-cluster");
assert_eq!(config.node_id, "1");
assert_eq!(
config.metastore_uri(),
Expand Down

0 comments on commit 7205346

Please sign in to comment.