Skip to content

Commit

Permalink
feat: try to remove parallel unit mapping in frontend (#16205)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Apr 24, 2024
1 parent 38f98f5 commit 4a8b1e5
Show file tree
Hide file tree
Showing 23 changed files with 532 additions and 305 deletions.
6 changes: 6 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
39 changes: 30 additions & 9 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -377,8 +387,10 @@ message SubscribeRequest {
message MetaSnapshot {
message SnapshotVersion {
uint64 catalog_version = 1;
uint64 parallel_unit_mapping_version = 2;
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -391,16 +403,20 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9;
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;
reserved 18;
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -439,8 +455,6 @@ message SubscribeResponse {
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -450,10 +464,15 @@ message SubscribeResponse {
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
reserved 23;
reserved "serving_parallel_unit_mappings";
}

service NotificationService {
Expand Down Expand Up @@ -628,8 +647,10 @@ service SessionParamService {
message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
repeated FragmentParallelUnitMapping mappings = 1;
reserved 1;
reserved "mappings";
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
}

service ServingService {
Expand Down
39 changes: 23 additions & 16 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedParallelUnitMapping,
vnode_mapping: ExpandedWorkerMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -92,7 +91,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -114,11 +113,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
let worker = self
.pu_to_worker_mapping
.worker_mapping
.get(id)
.context("No worker node found for the given parallel unit id.")?;
.context("No worker node found for the given worker id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand Down Expand Up @@ -160,7 +159,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.pu_to_scan_range_mapping = HashMap::new();
self.worker_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -191,11 +190,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];
let worker_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.worker_to_scan_range_mapping
.entry(worker_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -207,7 +206,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.pu_to_scan_range_mapping.keys() {
for id in self.worker_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -373,6 +372,14 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -388,11 +395,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
worker_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
Loading

0 comments on commit 4a8b1e5

Please sign in to comment.