Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Revert "feat: try to remove parallel unit mapping in frontend" #16519

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
39 changes: 9 additions & 30 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -384,13 +374,11 @@
uint32 worker_id = 3;
}

message MetaSnapshot {

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "21" with name "streaming_worker_mappings" on message "MetaSnapshot" was deleted without reserving the name "streaming_worker_mappings".

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "22" with name "serving_worker_mappings" on message "MetaSnapshot" was deleted without reserving the name "serving_worker_mappings".

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "21" with name "streaming_worker_mappings" on message "MetaSnapshot" was deleted without reserving the number "21".

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "22" with name "serving_worker_mappings" on message "MetaSnapshot" was deleted without reserving the number "22".

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "parallel_unit_mappings" on message "MetaSnapshot" was deleted.

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "serving_parallel_unit_mappings" on message "MetaSnapshot" was deleted.

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[18]" on message "MetaSnapshot" was deleted.

Check failure on line 377 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[9]" on message "MetaSnapshot" was deleted.
message SnapshotVersion {

Check failure on line 378 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "4" with name "streaming_worker_mapping_version" on message "SnapshotVersion" was deleted without reserving the name "streaming_worker_mapping_version".

Check failure on line 378 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "4" with name "streaming_worker_mapping_version" on message "SnapshotVersion" was deleted without reserving the number "4".
uint64 catalog_version = 1;
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 parallel_unit_mapping_version = 2;
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -403,20 +391,16 @@
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9;
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
reserved 18;
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -455,6 +439,8 @@
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -464,15 +450,10 @@
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
reserved 23;
reserved "serving_parallel_unit_mappings";
}

service NotificationService {
Expand Down Expand Up @@ -647,10 +628,8 @@
message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
reserved 1;
reserved "mappings";
repeated FragmentParallelUnitMapping mappings = 1;
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
}

service ServingService {
Expand Down
39 changes: 16 additions & 23 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -51,7 +52,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedWorkerMapping,
vnode_mapping: ExpandedParallelUnitMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -60,8 +61,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -91,7 +92,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -113,11 +114,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
let worker = self
.worker_mapping
.pu_to_worker_mapping
.get(id)
.context("No worker node found for the given worker id.")?;
.context("No worker node found for the given parallel unit id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand Down Expand Up @@ -159,7 +160,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.worker_to_scan_range_mapping = HashMap::new();
self.pu_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -190,11 +191,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let worker_id = self.vnode_mapping[vnode.to_index()];
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];

let list = self
.worker_to_scan_range_mapping
.entry(worker_id)
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -206,7 +207,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.worker_to_scan_range_mapping.keys() {
for id in self.pu_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -372,14 +373,6 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -395,11 +388,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
worker_to_scan_range_mapping: HashMap::new(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
Loading
Loading