diff --git a/docs/reference/quickwit-config.md b/docs/reference/quickwit-config.md index 28ca4741e85..e25adf06311 100644 --- a/docs/reference/quickwit-config.md +++ b/docs/reference/quickwit-config.md @@ -17,6 +17,7 @@ A commented example is accessible here: [quickwit.yaml](https://github.com/quick | --- | --- | --- | | version | Config file version. 0 is the only available value. | | | node_id | Node ID of the instance (searcher or indexer). It must be unique in your cluster. If not set, a random ID is generated at each boot. | | +| cluster_id | Unique Id for the cluster this node will be joining. Should be set to a unique name to ensure clusters do not accidentally merge together. | "quickwit-test-cluster" | | listen_address | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | 127.0.0.1 | | rest_listen_port | The port which to listen for HTTP REST API. | 7280 | | peer_seeds | List of IP addresses used by gossip for bootstrapping new nodes joining a cluster. This list may contain the current node address, and it does not need to be exhaustive on every node. | | diff --git a/quickwit-cluster/src/cluster.rs b/quickwit-cluster/src/cluster.rs index 6f844eb4371..ea73b8cae3a 100644 --- a/quickwit-cluster/src/cluster.rs +++ b/quickwit-cluster/src/cluster.rs @@ -121,6 +121,7 @@ impl Cluster { pub fn new( me: Member, listen_addr: SocketAddr, + cluster_id: String, grpc_addr: SocketAddr, seed_nodes: &[String], failure_detector_config: FailureDetectorConfig, @@ -130,8 +131,7 @@ impl Cluster { NodeId::from(me.clone()), seed_nodes, listen_addr.to_string(), - // make `cluster_id` configurable - "quickwit-cluster-id".to_string(), + cluster_id, vec![(GRPC_ADDRESS_KEY, grpc_addr)], failure_detector_config, ); @@ -297,6 +297,7 @@ pub fn grpc_addr_from_listen_addr_for_test(listen_addr: SocketAddr) -> SocketAdd pub fn create_cluster_for_test_with_id( peer_uuid: String, + cluster_id: String, seeds: &[String], ) -> anyhow::Result { let listen_addr = SocketAddr::new( @@ -307,6 +308,7 @@ pub fn create_cluster_for_test_with_id( let cluster = Cluster::new( Member::new(peer_uuid, 1, listen_addr), listen_addr, + cluster_id, grpc_addr_from_listen_addr_for_test(listen_addr), seeds, failure_detector_config, @@ -326,7 +328,7 @@ fn create_failure_detector_config_for_test() -> FailureDetectorConfig { /// Creates a local cluster listening on a random port. pub fn create_cluster_for_test(seeds: &[String]) -> anyhow::Result { let peer_uuid = Uuid::new_v4().to_string(); - let cluster = create_cluster_for_test_with_id(peer_uuid, seeds)?; + let cluster = create_cluster_for_test_with_id(peer_uuid, "test-cluster".to_string(), seeds)?; Ok(cluster) } @@ -400,12 +402,78 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_cluster_id_isolation() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + + let cluster1a = + create_cluster_for_test_with_id("node_1a".to_string(), "cluster1".to_string(), &[])?; + let cluster2a = create_cluster_for_test_with_id( + "node_2a".to_string(), + "cluster2".to_string(), + &[cluster1a.listen_addr.to_string()], + )?; + + let cluster1b = create_cluster_for_test_with_id( + "node_1b".to_string(), + "cluster1".to_string(), + &[ + cluster1a.listen_addr.to_string(), + cluster2a.listen_addr.to_string(), + ], + )?; + let cluster2b = create_cluster_for_test_with_id( + "node_2b".to_string(), + "cluster2".to_string(), + &[ + cluster1a.listen_addr.to_string(), + cluster2a.listen_addr.to_string(), + ], + )?; + + let wait_secs = Duration::from_secs(10); + for cluster in [&cluster1a, &cluster2a, &cluster1b, &cluster2b] { + cluster + .wait_for_members(|members| members.len() == 2, wait_secs) + .await + .unwrap(); + } + + let members_a: Vec = cluster1a + .members() + .iter() + .map(|member| member.gossip_public_address) + .sorted() + .collect(); + let mut expected_members_a = vec![cluster1a.listen_addr, cluster1b.listen_addr]; + expected_members_a.sort(); + assert_eq!(members_a, expected_members_a); + + let members_b: Vec = cluster2a + .members() + .iter() + .map(|member| member.gossip_public_address) + .sorted() + .collect(); + let mut expected_members_b = vec![cluster2a.listen_addr, cluster2b.listen_addr]; + expected_members_b.sort(); + assert_eq!(members_b, expected_members_b); + + Ok(()) + } + #[tokio::test] async fn test_cluster_rejoin_with_different_id_issue_1018() -> anyhow::Result<()> { + let cluster_id = "unified-cluster"; quickwit_common::setup_logging_for_tests(); - let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?; + let cluster1 = + create_cluster_for_test_with_id("node1".to_string(), cluster_id.to_string(), &[])?; let node_1 = cluster1.listen_addr.to_string(); - let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?; + let cluster2 = create_cluster_for_test_with_id( + "node2".to_string(), + cluster_id.to_string(), + &[node_1.clone()], + )?; let wait_secs = Duration::from_secs(10); @@ -441,6 +509,7 @@ mod tests { let cluster2 = Cluster::new( Member::new("newid".to_string(), 1, cluster2_listen_addr), cluster2_listen_addr, + cluster_id.to_string(), grpc_addr, &[node_1], create_failure_detector_config_for_test(), @@ -456,23 +525,33 @@ mod tests { assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster2")); + .any(|member| (*member).node_unique_id == "node2")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster2")); + .any(|member| (*member).node_unique_id == "node2")); Ok(()) } #[tokio::test] async fn test_cluster_rejoin_with_different_id_3_nodes_issue_1018() -> anyhow::Result<()> { + let cluster_id = "three-nodes-cluster"; quickwit_common::setup_logging_for_tests(); - let cluster1 = create_cluster_for_test_with_id("cluster1".to_string(), &[])?; + let cluster1 = + create_cluster_for_test_with_id("node1".to_string(), cluster_id.to_string(), &[])?; let node_1 = cluster1.listen_addr.to_string(); - let cluster2 = create_cluster_for_test_with_id("cluster2".to_string(), &[node_1.clone()])?; + let cluster2 = create_cluster_for_test_with_id( + "node2".to_string(), + cluster_id.to_string(), + &[node_1.clone()], + )?; let node_2 = cluster2.listen_addr.to_string(); - let cluster3 = create_cluster_for_test_with_id("cluster3".to_string(), &[node_2])?; + let cluster3 = create_cluster_for_test_with_id( + "node3".to_string(), + cluster_id.to_string(), + &[node_2], + )?; let wait_secs = Duration::from_secs(15); @@ -514,6 +593,7 @@ mod tests { let cluster2 = Cluster::new( Member::new("newid".to_string(), 1, cluster2_listen_addr), cluster2_listen_addr, + cluster_id.to_string(), grpc_addr, &[node_1], create_failure_detector_config_for_test(), @@ -523,6 +603,7 @@ mod tests { let cluster3 = Cluster::new( Member::new("newid2".to_string(), 1, cluster3_listen_addr), cluster3_listen_addr, + cluster_id.to_string(), grpc_addr, &[node_2], create_failure_detector_config_for_test(), @@ -532,28 +613,28 @@ mod tests { assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster2")); + .any(|member| (*member).node_unique_id == "node2")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster2")); + .any(|member| (*member).node_unique_id == "node2")); assert!(!cluster3 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster2")); + .any(|member| (*member).node_unique_id == "node2")); assert!(!cluster1 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster3")); + .any(|member| (*member).node_unique_id == "node3")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster3")); + .any(|member| (*member).node_unique_id == "node3")); assert!(!cluster2 .members() .iter() - .any(|member| (*member).node_unique_id == "cluster3")); + .any(|member| (*member).node_unique_id == "node3")); Ok(()) } diff --git a/quickwit-cluster/src/lib.rs b/quickwit-cluster/src/lib.rs index e17f7367bb4..4dd02eebd18 100644 --- a/quickwit-cluster/src/lib.rs +++ b/quickwit-cluster/src/lib.rs @@ -57,6 +57,7 @@ pub async fn start_cluster_service( let cluster = Arc::new(Cluster::new( member, quickwit_config.gossip_socket_addr()?, + quickwit_config.cluster_id.clone(), quickwit_config.grpc_socket_addr()?, &seed_nodes, FailureDetectorConfig::default(), diff --git a/quickwit-config/resources/tests/config/quickwit.json b/quickwit-config/resources/tests/config/quickwit.json index 09790636d7f..4a432eb1640 100644 --- a/quickwit-config/resources/tests/config/quickwit.json +++ b/quickwit-config/resources/tests/config/quickwit.json @@ -1,6 +1,7 @@ # Comments are supported. { "version": 0, + "cluster_id": "quickwit-cluster", "node_id": "my-unique-node-id", "metastore_uri": "postgres://username:password@host:port/db", "data_dir": "/opt/quickwit/data", diff --git a/quickwit-config/resources/tests/config/quickwit.toml b/quickwit-config/resources/tests/config/quickwit.toml index 35ad20ad72c..65138e5fcb9 100644 --- a/quickwit-config/resources/tests/config/quickwit.toml +++ b/quickwit-config/resources/tests/config/quickwit.toml @@ -1,4 +1,5 @@ version = 0 +cluster_id = "quickwit-cluster" node_id = "my-unique-node-id" listen_address = "0.0.0.0" rest_listen_port = 1111 diff --git a/quickwit-config/resources/tests/config/quickwit.yaml b/quickwit-config/resources/tests/config/quickwit.yaml index 92e1b06b58b..1c7970b6269 100644 --- a/quickwit-config/resources/tests/config/quickwit.yaml +++ b/quickwit-config/resources/tests/config/quickwit.yaml @@ -1,4 +1,5 @@ version: 0 +cluster_id: quickwit-cluster node_id: my-unique-node-id listen_address: 0.0.0.0 rest_listen_port: 1111 diff --git a/quickwit-config/src/config.rs b/quickwit-config/src/config.rs index f4a4dff3731..e337b5f08db 100644 --- a/quickwit-config/src/config.rs +++ b/quickwit-config/src/config.rs @@ -50,6 +50,10 @@ fn default_metastore_and_index_root_uri(data_dir_path: &Path) -> String { .to_string() } +fn default_cluster_id() -> String { + "quickwit-test-cluster".to_string() +} + fn default_node_id() -> String { new_coolid("node") } @@ -154,6 +158,8 @@ pub struct StorageConfig { #[serde(deny_unknown_fields)] pub struct QuickwitConfig { pub version: usize, + #[serde(default = "default_cluster_id")] + pub cluster_id: String, #[serde(default = "default_node_id")] pub node_id: String, #[serde(default = "default_listen_address")] @@ -347,6 +353,7 @@ impl Default for QuickwitConfig { gossip_listen_port: None, grpc_listen_port: None, peer_seeds: Vec::new(), + cluster_id: default_cluster_id(), node_id: default_node_id(), metastore_uri: None, default_index_root_uri: None, @@ -364,6 +371,7 @@ impl std::fmt::Debug for QuickwitConfig { .debug_struct("QuickwitConfig") .field("version", &self.version) .field("node_id", &self.node_id) + .field("cluster_id", &self.cluster_id) .field("listen_address", &self.listen_address) .field("rest_listen_port", &self.rest_listen_port) .field("gossip_listen_port", &self.gossip_listen_port()) @@ -408,6 +416,7 @@ mod tests { let file = std::fs::read_to_string(&config_filepath).unwrap(); let config = QuickwitConfig::from_uri(&config_uri, file.as_bytes()).await?; assert_eq!(config.version, 0); + assert_eq!(config.cluster_id, "quickwit-cluster"); assert_eq!(config.listen_address, "0.0.0.0".to_string()); assert_eq!(config.rest_listen_port, 1111); assert_eq!( @@ -477,6 +486,7 @@ mod tests { "#; let config = serde_yaml::from_str::(config_yaml).unwrap(); assert_eq!(config.version, 0); + assert_eq!(config.cluster_id, "quickwit-test-cluster"); assert_eq!(config.node_id, "1"); assert_eq!( config.metastore_uri(),