diff --git a/proto/common.proto b/proto/common.proto index 1bc65b8c03ff2..05d938cc26523 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -54,9 +54,8 @@ message WorkerNode { bool is_streaming = 1; bool is_serving = 2; bool is_unschedulable = 3; - // Secondary host address for the worker node. // This is used for frontend node to register its rpc address - string secondary_host = 4; + string internal_rpc_host_addr = 4; } message Resource { string rw_version = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 6fe5bd6b391ee..ef00c1ef663e8 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -312,9 +312,8 @@ message AddWorkerNodeRequest { bool is_streaming = 2; bool is_serving = 3; bool is_unschedulable = 4; - // Secondary host address for the worker node. // This is used for frontend node to register its rpc address - string secondary_host = 5; + string internal_rpc_host_addr = 5; } common.WorkerType worker_type = 1; common.HostAddress host = 2; diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index ee9eed558ad19..80cd2806f2b64 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -430,7 +430,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: true, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(1), ..Default::default() @@ -445,7 +445,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }), transactional_id: Some(2), ..Default::default() diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 020d3e43d7525..5619ffc6e0f96 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -213,7 +213,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }; let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index ac9beca5184fd..3577c334d42a0 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -127,7 +127,7 @@ pub async fn compute_node_serve( is_streaming: opts.role.for_streaming(), is_serving: opts.role.for_serving(), is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, &config.meta, ) diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index a73d2b1a27453..79b687c9ceed4 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -162,7 +162,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an is_serving: Set(pb_property.is_serving), is_unschedulable: Set(pb_property.is_unschedulable), parallelism: Set(worker.worker_node.parallelism() as _), - secondary_host: Set(pb_property.secondary_host.clone()), + internal_rpc_host_addr: Set(pb_property.internal_rpc_host_addr.clone()), }; WorkerProperty::insert(property) .exec(&meta_store_sql.conn) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs index 1391bfa9148a6..b50c7e4cfd07b 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs @@ -33,7 +33,7 @@ struct RwWorkerNode { is_streaming: Option, is_serving: Option, is_unschedulable: Option, - secondary_host: Option, + internal_rpc_host_addr: Option, rw_version: Option, system_total_memory_bytes: Option, system_total_cpu_cores: Option, @@ -74,7 +74,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result MetaResult> { let inner = self.inner.read().await; let table_objs = Table::find() .find_also_related(Object) - .filter(table::Column::CdcTableId.eq(cdc_table_name)) + .filter(table::Column::CdcTableId.eq(cdc_table_id)) .all(&inner.db) .await?; Ok(table_objs diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index f5e47b11d15a4..3a417394d2465 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -85,7 +85,7 @@ impl From for PbWorkerNode { is_streaming: p.is_streaming, is_serving: p.is_serving, is_unschedulable: p.is_unschedulable, - secondary_host: p.secondary_host.clone(), + internal_rpc_host_addr: p.internal_rpc_host_addr.clone(), }), transactional_id: info.0.transaction_id.map(|id| id as _), resource: info.2.resource, @@ -724,7 +724,7 @@ impl ClusterControllerInner { is_streaming: Set(add_property.is_streaming), is_serving: Set(add_property.is_serving), is_unschedulable: Set(add_property.is_unschedulable), - secondary_host: Set(add_property.secondary_host), + internal_rpc_host_addr: Set(add_property.internal_rpc_host_addr), }; WorkerProperty::insert(property).exec(&txn).await?; } @@ -971,7 +971,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }; let hosts = mock_worker_hosts_for_test(worker_count); let mut worker_ids = vec![]; @@ -1062,7 +1062,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }; let worker_id = cluster_ctl .add_worker( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index bc2479c9a2cdc..2d25f196a60bf 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -405,7 +405,7 @@ async fn test_release_context_resource() { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -488,7 +488,7 @@ async fn test_hummock_manager_basic() { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index c12a8de165143..886af7bddc622 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -352,7 +352,7 @@ pub async fn setup_compute_env_with_metric( is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 1392de0afe312..b2efeb529785b 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -504,7 +504,7 @@ impl ClusterManager { is_streaming: worker_property.is_streaming, is_serving: worker_property.is_serving, is_unschedulable: worker_property.is_unschedulable, - secondary_host: worker_property.secondary_host, + internal_rpc_host_addr: worker_property.internal_rpc_host_addr, }) } else { None @@ -839,7 +839,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -881,7 +881,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -904,7 +904,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -954,7 +954,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) @@ -1013,7 +1013,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index dd006a9fca772..220f69bc0a58d 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1038,7 +1038,7 @@ mod tests { is_streaming: true, is_serving: true, is_unschedulable: false, - secondary_host: "".to_string(), + internal_rpc_host_addr: "".to_string(), }, Default::default(), ) diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 02336d5ee1b2e..60a4ca537d21c 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -137,7 +137,7 @@ where .property .as_ref() .expect("frontend node property is missing"); - HostAddr::from_str(prop.secondary_host.as_str())? + HostAddr::from_str(prop.internal_rpc_host_addr.as_str())? } else { node.get_host().unwrap().into() }; diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 129c49b795822..dac104156f158 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -577,7 +577,7 @@ impl StreamActorManager { } = actor; let actor = actor.unwrap(); let actor_id = actor.actor_id; - let streaming_config = Arc::new(shared_context.config.clone()); + let streaming_config = self.env.config().clone(); let actor_context = ActorContext::create( &actor, self.env.total_mem_usage(),