Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 19, 2024
1 parent b853f20 commit 27fa578
Show file tree
Hide file tree
Showing 19 changed files with 35 additions and 37 deletions.
3 changes: 1 addition & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ message WorkerNode {
bool is_streaming = 1;
bool is_serving = 2;
bool is_unschedulable = 3;
// Secondary host address for the worker node.
// This is used for frontend node to register its rpc address
string secondary_host = 4;
string internal_rpc_host_addr = 4;
}
message Resource {
string rw_version = 1;
Expand Down
3 changes: 1 addition & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,8 @@ message AddWorkerNodeRequest {
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// Secondary host address for the worker node.
// This is used for frontend node to register its rpc address
string secondary_host = 5;
string internal_rpc_host_addr = 5;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -445,7 +445,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub async fn compute_node_serve(
is_streaming: opts.role.for_streaming(),
is_serving: opts.role.for_serving(),
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
&config.meta,
)
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism() as _),
secondary_host: Set(pb_property.secondary_host.clone()),
internal_rpc_host_addr: Set(pb_property.internal_rpc_host_addr.clone()),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct RwWorkerNode {
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
secondary_host: Option<String>,
internal_rpc_host_addr: Option<String>,
rw_version: Option<String>,
system_total_memory_bytes: Option<i64>,
system_total_cpu_cores: Option<i64>,
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
} else {
None
},
secondary_host: property.map(|p| p.secondary_host.clone()),
internal_rpc_host_addr: property.map(|p| p.internal_rpc_host_addr.clone()),
rw_version: resource.map(|r| r.rw_version.to_owned()),
system_total_memory_bytes: resource.map(|r| r.total_memory_bytes as _),
system_total_cpu_cores: resource.map(|r| r.total_cpu_cores as _),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(0),
..Default::default()
Expand All @@ -699,7 +699,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -717,7 +717,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl FrontendEnv {
WorkerType::Frontend,
&frontend_address,
AddWorkerNodeProperty {
secondary_host: frontend_rpc_addr.to_string(),
internal_rpc_host_addr: frontend_rpc_addr.to_string(),
..Default::default()
},
&config.meta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(WorkerProperty::Table)
.add_column(ColumnDef::new(WorkerProperty::SecondaryHost).string())
.add_column(ColumnDef::new(WorkerProperty::InternalRpcHostAddr).string())
.to_owned(),
)
.await
Expand All @@ -39,7 +39,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(WorkerProperty::Table)
.drop_column(WorkerProperty::SecondaryHost)
.drop_column(WorkerProperty::InternalRpcHostAddr)
.to_owned(),
)
.await
Expand All @@ -55,5 +55,5 @@ enum Table {
#[derive(DeriveIden)]
enum WorkerProperty {
Table,
SecondaryHost,
InternalRpcHostAddr,
}
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/worker_property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Model {
pub is_streaming: bool,
pub is_serving: bool,
pub is_unschedulable: bool,
pub secondary_host: String,
pub internal_rpc_host_addr: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2879,12 +2879,12 @@ impl CatalogController {

pub async fn get_table_by_cdc_table_id(
&self,
cdc_table_name: String,
cdc_table_id: String,
) -> MetaResult<Vec<PbTable>> {
let inner = self.inner.read().await;
let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::CdcTableId.eq(cdc_table_name))
.filter(table::Column::CdcTableId.eq(cdc_table_id))
.all(&inner.db)
.await?;
Ok(table_objs
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl From<WorkerInfo> for PbWorkerNode {
is_streaming: p.is_streaming,
is_serving: p.is_serving,
is_unschedulable: p.is_unschedulable,
secondary_host: p.secondary_host.clone(),
internal_rpc_host_addr: p.internal_rpc_host_addr.clone(),
}),
transactional_id: info.0.transaction_id.map(|id| id as _),
resource: info.2.resource,
Expand Down Expand Up @@ -724,7 +724,7 @@ impl ClusterControllerInner {
is_streaming: Set(add_property.is_streaming),
is_serving: Set(add_property.is_serving),
is_unschedulable: Set(add_property.is_unschedulable),
secondary_host: Set(add_property.secondary_host),
internal_rpc_host_addr: Set(add_property.internal_rpc_host_addr),
};
WorkerProperty::insert(property).exec(&txn).await?;
}
Expand Down Expand Up @@ -971,7 +971,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
};
let hosts = mock_worker_hosts_for_test(worker_count);
let mut worker_ids = vec![];
Expand Down Expand Up @@ -1062,7 +1062,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
};
let worker_id = cluster_ctl
.add_worker(
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async fn test_release_context_resource() {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down Expand Up @@ -488,7 +488,7 @@ async fn test_hummock_manager_basic() {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ pub async fn setup_compute_env_with_metric(
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl ClusterManager {
is_streaming: worker_property.is_streaming,
is_serving: worker_property.is_serving,
is_unschedulable: worker_property.is_unschedulable,
secondary_host: worker_property.secondary_host,
internal_rpc_host_addr: worker_property.internal_rpc_host_addr,
})
} else {
None
Expand Down Expand Up @@ -839,7 +839,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down Expand Up @@ -881,7 +881,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand All @@ -904,7 +904,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down Expand Up @@ -954,7 +954,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
secondary_host: "".to_string(),
internal_rpc_host_addr: "".to_string(),
},
Default::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
.property
.as_ref()
.expect("frontend node property is missing");
HostAddr::from_str(prop.secondary_host.as_str())?
HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
} else {
node.get_host().unwrap().into()
};
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ impl StreamActorManager {
} = actor;
let actor = actor.unwrap();
let actor_id = actor.actor_id;
let streaming_config = Arc::new(shared_context.config.clone());
let streaming_config = self.env.config().clone();
let actor_context = ActorContext::create(
&actor,
self.env.total_mem_usage(),
Expand Down

0 comments on commit 27fa578

Please sign in to comment.