Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: try to remove parallel unit mapping in frontend #16205

Merged
merged 11 commits into from
Apr 24, 2024
Merged
6 changes: 6 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
30 changes: 21 additions & 9 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -374,11 +384,12 @@
uint32 worker_id = 3;
}

message MetaSnapshot {

Check failure on line 387 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "18" with name "serving_parallel_unit_mappings" on message "MetaSnapshot" was deleted without reserving the name "serving_parallel_unit_mappings".

Check failure on line 387 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "9" with name "parallel_unit_mappings" on message "MetaSnapshot" was deleted without reserving the name "parallel_unit_mappings".
message SnapshotVersion {

Check failure on line 388 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "parallel_unit_mapping_version" on message "SnapshotVersion" was deleted without reserving the name "parallel_unit_mapping_version".
uint64 catalog_version = 1;
uint64 parallel_unit_mapping_version = 2;
reserved 2; // for old parallel_unit_mapping_version
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -391,16 +402,18 @@
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9; // for old parallel_unit_mapping
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;
reserved 18; // for old serving_parallel_unit_mappings

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;

SnapshotVersion version = 13;
}
Expand All @@ -422,7 +435,7 @@

message Recovery {}

message SubscribeResponse {

Check failure on line 438 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "12" with name "parallel_unit_mapping" on message "SubscribeResponse" was deleted without reserving the name "parallel_unit_mapping".

Check failure on line 438 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "23" with name "serving_parallel_unit_mappings" on message "SubscribeResponse" was deleted without reserving the name "serving_parallel_unit_mappings".

Check failure on line 438 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "12" with name "parallel_unit_mapping" on message "SubscribeResponse" was deleted without reserving the number "12".

Check failure on line 438 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "23" with name "serving_parallel_unit_mappings" on message "SubscribeResponse" was deleted without reserving the number "23".
enum Operation {
UNSPECIFIED = 0;
ADD = 1;
Expand All @@ -439,8 +452,6 @@
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -450,9 +461,10 @@
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
}
}

Expand Down Expand Up @@ -627,9 +639,9 @@

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {

Check failure on line 642 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "mappings" on message "GetServingVnodeMappingsResponse" was deleted without reserving the name "mappings".

Check failure on line 642 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "mappings" on message "GetServingVnodeMappingsResponse" was deleted without reserving the number "1".
repeated FragmentParallelUnitMapping mappings = 1;
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
}

service ServingService {
Expand Down
39 changes: 23 additions & 16 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedParallelUnitMapping,
vnode_mapping: ExpandedWorkerMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -92,7 +91,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -114,11 +113,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
let worker = self
.pu_to_worker_mapping
.worker_mapping
.get(id)
.context("No worker node found for the given parallel unit id.")?;
.context("No worker node found for the given worker id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand Down Expand Up @@ -160,7 +159,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.pu_to_scan_range_mapping = HashMap::new();
self.worker_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -191,11 +190,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];
let worker_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.worker_to_scan_range_mapping
.entry(worker_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -207,7 +206,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.pu_to_scan_range_mapping.keys() {
for id in self.worker_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -373,6 +372,14 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -388,11 +395,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
worker_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
Loading
Loading