Skip to content

Commit

Permalink
fix: Revert "feat: try to remove parallel unit mapping in frontend" (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Apr 29, 2024
1 parent 1e55914 commit c13caf6
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 532 deletions.
6 changes: 0 additions & 6 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
39 changes: 9 additions & 30 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -387,10 +377,8 @@ message SubscribeRequest {
message MetaSnapshot {
message SnapshotVersion {
uint64 catalog_version = 1;
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 parallel_unit_mapping_version = 2;
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -403,20 +391,16 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9;
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
reserved 18;
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -455,6 +439,8 @@ message SubscribeResponse {
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -464,15 +450,10 @@ message SubscribeResponse {
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
reserved 23;
reserved "serving_parallel_unit_mappings";
}

service NotificationService {
Expand Down Expand Up @@ -647,10 +628,8 @@ service SessionParamService {
message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
reserved 1;
reserved "mappings";
repeated FragmentParallelUnitMapping mappings = 1;
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
}

service ServingService {
Expand Down
39 changes: 16 additions & 23 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -51,7 +52,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedWorkerMapping,
vnode_mapping: ExpandedParallelUnitMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -60,8 +61,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -91,7 +92,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -113,11 +114,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
let worker = self
.worker_mapping
.pu_to_worker_mapping
.get(id)
.context("No worker node found for the given worker id.")?;
.context("No worker node found for the given parallel unit id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand Down Expand Up @@ -159,7 +160,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.worker_to_scan_range_mapping = HashMap::new();
self.pu_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -190,11 +191,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let worker_id = self.vnode_mapping[vnode.to_index()];
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];

let list = self
.worker_to_scan_range_mapping
.entry(worker_id)
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -206,7 +207,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.worker_to_scan_range_mapping.keys() {
for id in self.pu_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -372,14 +373,6 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -395,11 +388,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
worker_to_scan_range_mapping: HashMap::new(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
Loading

0 comments on commit c13caf6

Please sign in to comment.