Skip to content

Commit

Permalink
Refactor protobufs, Rust types, rename worker mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 17, 2024
1 parent bc2b8c5 commit 40c8a7e
Show file tree
Hide file tree
Showing 23 changed files with 235 additions and 150 deletions.
Empty file removed metadata.sqlite
Empty file.
6 changes: 3 additions & 3 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,15 @@ message TableFunctionNode {
message TaskId {
string query_id = 1;
uint32 stage_id = 2;
uint32 task_id = 3;
uint64 task_id = 3;

Check failure on line 228 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "task_id" on message "TaskId" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

// Every task will create N buffers (channels) for parent operators to fetch results from,
// where N is the parallelism of parent stage.
message TaskOutputId {
TaskId task_id = 1;
// The id of output channel to fetch from
uint32 output_id = 2;
uint64 output_id = 2;

Check failure on line 236 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "output_id" on message "TaskOutputId" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message LocalExecutePlan {
Expand Down Expand Up @@ -270,7 +270,7 @@ message LocalLookupJoinNode {
repeated uint32 inner_side_key = 4;
uint32 lookup_prefix_len = 5;
plan_common.StorageTableDesc inner_side_table_desc = 6;
repeated uint32 inner_side_vnode_mapping = 7;
repeated uint64 inner_side_vnode_mapping = 7;

Check failure on line 273 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" with name "inner_side_vnode_mapping" on message "LocalLookupJoinNode" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
repeated int32 inner_side_column_ids = 8;
repeated uint32 output_indices = 9;
repeated common.WorkerNode worker_nodes = 10;
Expand Down
4 changes: 2 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ message ParallelUnitMapping {
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
message WorkerSlotMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
repeated uint64 data = 2;
}

message BatchQueryEpoch {
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ message FragmentParallelUnitMappings {
/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
common.WorkerSlotMapping mapping = 2;
}

message FragmentWorkerMappings {
Expand Down
35 changes: 22 additions & 13 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -51,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedWorkerMapping,
vnode_mapping: ExpandedWorkerSlotMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -60,8 +60,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {

/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
Expand All @@ -113,9 +113,9 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
let worker = self
.worker_mapping
.worker_slot_mapping
.get(id)
.context("No worker node found for the given worker id.")?;

Expand Down Expand Up @@ -144,7 +144,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
// conflict.
query_id: self.task_id.query_id.clone(),
stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
task_id: *id,
task_id: (*id).into(),
}),
output_id: 0,
}),
Expand Down Expand Up @@ -367,18 +367,27 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let null_safe = lookup_join_node.get_null_safe().to_vec();

let vnode_mapping = lookup_join_node.get_inner_side_vnode_mapping().to_vec();
let vnode_mapping = lookup_join_node
.get_inner_side_vnode_mapping()
.iter()
.copied()
.map(WorkerSlotId::from)
.collect_vec();

assert!(!vnode_mapping.is_empty());

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.flat_map(|worker| {
(0..(worker.parallel_units.len()))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());
assert_eq!(worker_slot_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
Expand All @@ -399,7 +408,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
worker_slot_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ impl StateReporter {

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
pub struct TaskId {
pub task_id: u32,
pub task_id: u64,
pub stage_id: u32,
pub query_id: String,
}

#[derive(PartialEq, Eq, Hash, Clone, Default)]
pub struct TaskOutputId {
pub task_id: TaskId,
pub output_id: u32,
pub output_id: u64,
}

/// More compact formatter compared to derived `fmt::Debug`.
Expand Down
53 changes: 31 additions & 22 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;

use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::hash::{WorkerId, WorkerMapping};
use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping};
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

Expand All @@ -36,9 +36,9 @@ pub struct WorkerNodeManager {
struct WorkerNodeManagerInner {
worker_nodes: Vec<WorkerNode>,
/// fragment vnode mapping info for streaming
streaming_fragment_vnode_mapping: HashMap<FragmentId, WorkerMapping>,
streaming_fragment_vnode_mapping: HashMap<FragmentId, WorkerSlotMapping>,
/// fragment vnode mapping info for serving
serving_fragment_vnode_mapping: HashMap<FragmentId, WorkerMapping>,
serving_fragment_vnode_mapping: HashMap<FragmentId, WorkerSlotMapping>,
}

pub type WorkerNodeManagerRef = Arc<WorkerNodeManager>;
Expand Down Expand Up @@ -125,8 +125,8 @@ impl WorkerNodeManager {
pub fn refresh(
&self,
nodes: Vec<WorkerNode>,
streaming_mapping: HashMap<FragmentId, WorkerMapping>,
serving_mapping: HashMap<FragmentId, WorkerMapping>,
streaming_mapping: HashMap<FragmentId, WorkerSlotMapping>,
serving_mapping: HashMap<FragmentId, WorkerSlotMapping>,
) {
let mut write_guard = self.inner.write().unwrap();
tracing::debug!("Refresh worker nodes {:?}.", nodes);
Expand All @@ -146,26 +146,32 @@ impl WorkerNodeManager {
/// If worker ids is empty, the scheduler may fail to schedule any task and stuck at
/// schedule next stage. If we do not return error in this case, needs more complex control
/// logic above. Report in this function makes the schedule root fail reason more clear.
pub fn get_workers_by_worker_ids(&self, worker_ids: &[WorkerId]) -> Result<Vec<WorkerNode>> {
if worker_ids.is_empty() {
pub fn get_workers_by_worker_ids(
&self,
worker_slot_ids: &[WorkerSlotId],
) -> Result<Vec<WorkerNode>> {
if worker_slot_ids.is_empty() {
return Err(BatchError::EmptyWorkerNodes);
}

let guard = self.inner.read().unwrap();

// TODO: Does the return order of this function need to match the order of the parameters?
let worker_index: HashMap<_, _> = guard
let worker_slot_index: HashMap<_, _> = guard
.worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.flat_map(|worker| {
(0..worker.parallel_units.len())
.map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();

let mut workers = Vec::with_capacity(worker_ids.len());
let mut workers = Vec::with_capacity(worker_slot_ids.len());

for worker_id in worker_ids {
match worker_index.get(worker_id) {
Some(worker) => workers.push(worker.clone()),
None => bail!("No worker node found for worker id: {}", worker_id),
for worker_slot_id in worker_slot_ids {
match worker_slot_index.get(worker_slot_id) {
Some(worker) => workers.push((*worker).clone()),
None => bail!("No worker node found for worker id: {}", worker_slot_id),
}
}

Expand All @@ -175,7 +181,7 @@ impl WorkerNodeManager {
pub fn get_streaming_fragment_mapping(
&self,
fragment_id: &FragmentId,
) -> Result<WorkerMapping> {
) -> Result<WorkerSlotMapping> {
self.inner
.read()
.unwrap()
Expand All @@ -188,7 +194,7 @@ impl WorkerNodeManager {
pub fn insert_streaming_fragment_mapping(
&self,
fragment_id: FragmentId,
vnode_mapping: WorkerMapping,
vnode_mapping: WorkerSlotMapping,
) {
self.inner
.write()
Expand All @@ -201,7 +207,7 @@ impl WorkerNodeManager {
pub fn update_streaming_fragment_mapping(
&self,
fragment_id: FragmentId,
vnode_mapping: WorkerMapping,
vnode_mapping: WorkerSlotMapping,
) {
let mut guard = self.inner.write().unwrap();
guard
Expand All @@ -219,15 +225,15 @@ impl WorkerNodeManager {
}

/// Returns fragment's vnode mapping for serving.
fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerMapping> {
fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerSlotMapping> {
self.inner
.read()
.unwrap()
.get_serving_fragment_mapping(fragment_id)
.ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id))
}

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, WorkerMapping>) {
pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, WorkerSlotMapping>) {
let mut guard = self.inner.write().unwrap();
tracing::debug!(
"Set serving vnode mapping for fragments {:?}",
Expand All @@ -236,7 +242,10 @@ impl WorkerNodeManager {
guard.serving_fragment_vnode_mapping = mappings;
}

pub fn upsert_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, WorkerMapping>) {
pub fn upsert_serving_fragment_mapping(
&self,
mappings: HashMap<FragmentId, WorkerSlotMapping>,
) {
let mut guard = self.inner.write().unwrap();
tracing::debug!(
"Upsert serving vnode mapping for fragments {:?}",
Expand Down Expand Up @@ -287,7 +296,7 @@ impl WorkerNodeManager {
}

impl WorkerNodeManagerInner {
fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option<WorkerMapping> {
fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option<WorkerSlotMapping> {
self.serving_fragment_vnode_mapping
.get(&fragment_id)
.cloned()
Expand Down Expand Up @@ -330,7 +339,7 @@ impl WorkerNodeSelector {
.sum()
}

pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerMapping> {
pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerSlotMapping> {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
Expand Down
Loading

0 comments on commit 40c8a7e

Please sign in to comment.