Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replace the parallel unit mapping with worker slot mapping in frontend. #16801

Merged
merged 32 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
decfccf
Add WorkerMapping to protos; refactor to worker ID usage.
shanicky Apr 7, 2024
5947afb
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-2
shanicky Apr 16, 2024
724d037
Refactor observer, notification, & catalog files for better structure…
shanicky Apr 16, 2024
0916ac8
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 17, 2024
8f7f9b3
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 18, 2024
e40d0c8
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 22, 2024
b4673e6
Refactor parallel unit-worker map method to utils module.
shanicky Apr 22, 2024
b50031c
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 22, 2024
3b48ac9
Deprecate fields in MetaSnapshot & reserve tags.
shanicky Apr 22, 2024
36937ac
Merge branch 'main' into peng/fe-remove-pu
shanicky Apr 24, 2024
6125cab
update proto
shanicky Apr 24, 2024
0c8b540
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 6, 2024
ff4e3fa
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 9, 2024
ca47e5b
Fixed typos, clean up imports and whitespace in Rust files
shanicky May 11, 2024
f107855
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 11, 2024
b6e2700
Refactor serving.rs for efficient worker-to-vnode mapping, update imp…
shanicky May 11, 2024
e3de9ff
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 13, 2024
78622ad
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
May 14, 2024
bc2b8c5
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
May 16, 2024
40c8a7e
Refactor protobufs, Rust types, rename worker mappings
May 17, 2024
ee15f5d
Update vnode placement tests and unique counts
May 17, 2024
3f4ac77
Remove assert from LocalLookupJoinExecBuilder
May 17, 2024
93cb568
Refactor WorkerMapping to WorkerSlotMapping in protobuf and Rust code
May 17, 2024
5a0c927
Update proto defs, error msg, rename vars, type changes
May 17, 2024
f54bb6c
Remove typecast for ROOT_TASK_* constants
May 17, 2024
95ded68
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
May 17, 2024
b85ec7a
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
May 20, 2024
56eba87
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
May 21, 2024
366020b
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
May 27, 2024
b79c38b
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
May 27, 2024
22edf32
Refactor `catalog.rs` imports, drop `visit_stream_node_cont`
May 27, 2024
6eda8c6
Refactor: Simplify `visit_stream_git_node_cont_mut` import in `catalo…
May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,15 @@
message TaskId {
string query_id = 1;
uint32 stage_id = 2;
uint32 task_id = 3;
uint64 task_id = 3;

Check failure on line 228 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "task_id" on message "TaskId" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

// Every task will create N buffers (channels) for parent operators to fetch results from,
// where N is the parallelism of parent stage.
message TaskOutputId {
TaskId task_id = 1;
// The id of output channel to fetch from
uint32 output_id = 2;
uint64 output_id = 2;

Check failure on line 236 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "output_id" on message "TaskOutputId" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message LocalExecutePlan {
Expand Down Expand Up @@ -270,7 +270,7 @@
repeated uint32 inner_side_key = 4;
uint32 lookup_prefix_len = 5;
plan_common.StorageTableDesc inner_side_table_desc = 6;
repeated uint32 inner_side_vnode_mapping = 7;
repeated uint64 inner_side_vnode_mapping = 7;

Check failure on line 273 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" with name "inner_side_vnode_mapping" on message "LocalLookupJoinNode" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
repeated int32 inner_side_column_ids = 8;
repeated uint32 output_indices = 9;
repeated common.WorkerNode worker_nodes = 10;
Expand Down
6 changes: 6 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to (worker id, slot index).
message WorkerSlotMapping {
repeated uint32 original_indices = 1;
repeated uint64 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
39 changes: 30 additions & 9 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker slot mapping with fragment id, used for notification.
message FragmentWorkerSlotMapping {
uint32 fragment_id = 1;
common.WorkerSlotMapping mapping = 2;
}

message FragmentWorkerSlotMappings {
repeated FragmentWorkerSlotMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -378,8 +388,10 @@ message SubscribeRequest {
message MetaSnapshot {
message SnapshotVersion {
uint64 catalog_version = 1;
uint64 parallel_unit_mapping_version = 2;
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 worker_node_version = 3;
uint64 streaming_worker_slot_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -392,16 +404,20 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9;
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;
reserved 18;
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerSlotMapping streaming_worker_slot_mappings = 21;
repeated FragmentWorkerSlotMapping serving_worker_slot_mappings = 22;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -440,8 +456,6 @@ message SubscribeResponse {
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -451,10 +465,15 @@ message SubscribeResponse {
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27;
FragmentWorkerSlotMappings serving_worker_slot_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
reserved 23;
reserved "serving_parallel_unit_mappings";
}

service NotificationService {
Expand Down Expand Up @@ -629,8 +648,10 @@ service SessionParamService {
message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
repeated FragmentParallelUnitMapping mappings = 1;
reserved 1;
reserved "mappings";
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerSlotMapping worker_slot_mappings = 3;
}

service ServingService {
Expand Down
52 changes: 33 additions & 19 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedParallelUnitMapping,
vnode_mapping: ExpandedWorkerSlotMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -91,8 +90,8 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {

/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -114,11 +113,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
let worker = self
.pu_to_worker_mapping
.worker_slot_mapping
.get(id)
.context("No worker node found for the given parallel unit id.")?;
.context("No worker node found for the given worker slot id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand All @@ -145,7 +144,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
// conflict.
query_id: self.task_id.query_id.clone(),
stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
task_id: *id,
task_id: (*id).into(),
}),
output_id: 0,
}),
Expand All @@ -160,7 +159,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.pu_to_scan_range_mapping = HashMap::new();
self.worker_slot_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -191,11 +190,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];
let worker_slot_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.worker_slot_to_scan_range_mapping
.entry(worker_slot_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -207,7 +206,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.pu_to_scan_range_mapping.keys() {
for id in self.worker_slot_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -368,11 +367,26 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let null_safe = lookup_join_node.get_null_safe().to_vec();

let vnode_mapping = lookup_join_node.get_inner_side_vnode_mapping().to_vec();
let vnode_mapping = lookup_join_node
.get_inner_side_vnode_mapping()
.iter()
.copied()
.map(WorkerSlotId::from)
.collect_vec();

assert!(!vnode_mapping.is_empty());

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -388,11 +402,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
worker_slot_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_slot_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ impl StateReporter {

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
pub struct TaskId {
pub task_id: u32,
pub task_id: u64,
pub stage_id: u32,
pub query_id: String,
}

#[derive(PartialEq, Eq, Hash, Clone, Default)]
pub struct TaskOutputId {
pub task_id: TaskId,
pub output_id: u32,
pub output_id: u64,
}

/// More compact formatter compared to derived `fmt::Debug`.
Expand Down
Loading
Loading