From 34bcdbc0e18794b343dfe278d7937e96c0d82f82 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 21 Nov 2024 19:48:28 +0800 Subject: [PATCH] sync multiple graphs at once --- ci/workflows/pull-request.yml | 2 +- src/meta/src/barrier/command.rs | 38 +++-- src/meta/src/barrier/complete_task.rs | 141 ++++++++---------- src/meta/src/barrier/rpc.rs | 58 ++++--- src/meta/src/barrier/utils.rs | 77 +++------- .../src/hummock/mock_hummock_meta_client.rs | 5 +- src/storage/hummock_sdk/src/change_log.rs | 2 +- 7 files changed, 137 insertions(+), 186 deletions(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index e10ffb2d0091f..2da49436db05b 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -565,7 +565,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 20 + timeout_in_minutes: 25 retry: *auto-retry - label: "end-to-end test (deterministic simulation)" diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ddf0c88da8a9f..3317d3b8180b6 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,7 +23,9 @@ use risingwave_common::must_match; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::change_log::build_table_change_log_delta; +use risingwave_hummock_sdk::change_log::{build_table_change_log_delta, ChangeLogDelta}; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; @@ -39,15 +41,13 @@ use risingwave_pb::stream_plan::{ DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; -use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; -use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; -use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use crate::hummock::NewTableFragmentInfo; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -499,16 +499,16 @@ impl CommandContext { Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64) } - pub(super) fn collect_commit_epoch_info( + pub(super) fn collect_extra_commit_epoch_info( &self, - info: &mut CommitEpochInfo, - resps: HashMap, + synced_ssts: &Vec, + old_value_ssts: &Vec, backfill_pinned_log_epoch: HashMap)>, + tables_to_commit: &mut HashMap, + new_table_fragment_infos: &mut Vec, + change_log_delta: &mut HashMap, ) { - let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) = - collect_resp_info(resps); - - let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) = + let new_table_fragment_info = if let Some(Command::CreateStreamingJob { info, job_type }) = &self.command && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) { @@ -522,9 +522,9 @@ impl CommandContext { table_ids.insert(TableId::new(mv_table_id)); } - vec![NewTableFragmentInfo { table_ids }] + Some(NewTableFragmentInfo { table_ids }) } else { - vec![] + None }; let mut mv_log_store_truncate_epoch = HashMap::new(); @@ -558,7 +558,7 @@ impl CommandContext { } let table_new_change_log = build_table_change_log_delta( - old_value_ssts.into_iter(), + old_value_ssts.iter(), synced_ssts.iter().map(|sst| &sst.sst_info), must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), @@ -566,17 +566,13 @@ impl CommandContext { let epoch = self.barrier_info.prev_epoch(); for table_id in &self.table_ids_to_commit { - info.tables_to_commit + tables_to_commit .try_insert(*table_id, epoch) .expect("non duplicate"); } - info.sstables.extend(synced_ssts); - info.new_table_watermarks.extend(new_table_watermarks); - info.sst_to_context.extend(sst_to_context); - info.new_table_fragment_infos - .extend(new_table_fragment_infos); - info.change_log_delta.extend(table_new_change_log); + new_table_fragment_infos.extend(new_table_fragment_info); + change_log_delta.extend(table_new_change_log); } } diff --git a/src/meta/src/barrier/complete_task.rs b/src/meta/src/barrier/complete_task.rs index 3f7adc571ff37..2ea0dbd656fc8 100644 --- a/src/meta/src/barrier/complete_task.rs +++ b/src/meta/src/barrier/complete_task.rs @@ -31,9 +31,9 @@ use crate::barrier::command::CommandContext; use crate::barrier::context::GlobalBarrierWorkerContext; use crate::barrier::notifier::Notifier; use crate::barrier::progress::TrackingJob; -use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager}; -use crate::barrier::utils::collect_creating_job_commit_epoch_info; -use crate::hummock::CommitEpochInfo; +use crate::barrier::rpc::ControlStreamManager; +use crate::barrier::utils::collect_resp_info; +use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::MetaSrvEnv; use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::{MetaError, MetaResult}; @@ -108,6 +108,30 @@ impl CompleteBarrierTask { }) .collect() } + + fn graph_to_complete( + &self, + ) -> impl Iterator, &'_ HashSet, u64)> + '_ { + self.tasks + .iter() + .flat_map(|(database_id, (database, creating_jobs))| { + database + .iter() + .map(|database| { + ( + *database_id, + None, + &database.workers, + database.command.barrier_info.prev_epoch(), + ) + }) + .chain( + creating_jobs.iter().map(|task| { + (*database_id, Some(task.job_id), &task.workers, task.epoch) + }), + ) + }) + } } impl CompleteBarrierTask { @@ -195,9 +219,8 @@ pub(super) struct BarrierCompleteOutput { } pub(super) struct SyncingTask { - collecting_partial_graphs: HashMap<(DatabaseId, Option), HashSet>, - collected_resps: - HashMap<(DatabaseId, Option), HashMap>, + node_to_collect: HashSet, + collected_resps: HashMap, task: CompleteBarrierTask, } @@ -207,81 +230,49 @@ impl SyncingTask { task: CompleteBarrierTask, control_stream_manager: &mut ControlStreamManager, ) -> MetaResult { - let mut collected_resps = HashMap::new(); - let mut collecting_partial_graphs = HashMap::new(); - for (database_id, (database_task, creating_jobs)) in &task.tasks { - if let Some(task) = database_task { - collected_resps - .try_insert((*database_id, None), HashMap::new()) - .expect("non-duplicate"); - if !task.workers.is_empty() { - control_stream_manager.complete_barrier( - task_id, - *database_id, - None, - &task.workers, - task.command.barrier_info.prev_epoch(), - )?; - collecting_partial_graphs - .try_insert((*database_id, None), task.workers.clone()) - .expect("non-duplicate"); - } - } - for task in creating_jobs { - collected_resps - .try_insert((*database_id, Some(task.job_id)), HashMap::new()) - .expect("non-duplicate"); - if !task.workers.is_empty() { - control_stream_manager.complete_barrier( - task_id, - *database_id, - Some(task.job_id), - &task.workers, - task.epoch, - )?; - collecting_partial_graphs - .try_insert((*database_id, Some(task.job_id)), task.workers.clone()) - .expect("non-duplicate"); - } - } - } + let node_to_collect = + control_stream_manager.complete_barrier(task_id, task.graph_to_complete())?; Ok(Self { - collecting_partial_graphs, - collected_resps, + node_to_collect, + collected_resps: HashMap::new(), task, }) } pub(super) fn is_collected(&self) -> bool { - self.collecting_partial_graphs.is_empty() + self.node_to_collect.is_empty() } pub(super) fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) { - let mut resps = self.collected_resps; - let mut commit_info = CommitEpochInfo::default(); - for (database_id, (database_task, creating_jobs)) in &self.task.tasks { + assert!(self.node_to_collect.is_empty()); + let (mut commit_info, old_value_ssts) = collect_resp_info(self.collected_resps); + for (database_task, creating_jobs) in self.task.tasks.values() { if let Some(task) = database_task { - let resps = resps.remove(&(*database_id, None)).expect("should exist"); - task.command.collect_commit_epoch_info( - &mut commit_info, - resps, + task.command.collect_extra_commit_epoch_info( + &commit_info.sstables, + &old_value_ssts, task.backfill_pinned_upstream_log_epoch.clone(), + &mut commit_info.tables_to_commit, + &mut commit_info.new_table_fragment_infos, + &mut commit_info.change_log_delta, ) } for task in creating_jobs { - let resps = resps - .remove(&(*database_id, Some(task.job_id))) - .expect("should exist"); - collect_creating_job_commit_epoch_info( - &mut commit_info, - task.epoch, - resps, - task.tables_to_commit.iter().cloned(), - task.is_first_commit, - ); + task.tables_to_commit.iter().for_each(|table_id| { + commit_info + .tables_to_commit + .try_insert(*table_id, task.epoch) + .expect("non duplicate"); + }); + if task.is_first_commit { + commit_info + .new_table_fragment_infos + .push(NewTableFragmentInfo { + table_ids: task.tables_to_commit.clone(), + }); + }; } } - assert!(resps.is_empty()); (commit_info, self.task) } } @@ -383,31 +374,19 @@ impl CompletingTask { worker_id: WorkerId, resp: BarrierCompleteResponse, ) { - let partial_graph_id = from_partial_graph_id(resp.partial_graph_id); let task = self .syncing_tasks .get_mut(&resp.task_id) .expect("should exist"); + assert!(task.node_to_collect.remove(&worker_id)); task.collected_resps - .get_mut(&partial_graph_id) - .expect("should exist") .try_insert(worker_id, resp) .expect("non-duplicate"); - let remaining_workers = task - .collecting_partial_graphs - .get_mut(&partial_graph_id) - .expect("should exist"); - assert!(remaining_workers.remove(&worker_id)); - if remaining_workers.is_empty() { - task.collecting_partial_graphs.remove(&partial_graph_id); - } } pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { - self.syncing_tasks.values().any(|task| { - task.collecting_partial_graphs - .values() - .any(|workers| workers.contains(&worker_id)) - }) + self.syncing_tasks + .values() + .any(|task| task.node_to_collect.contains(&worker_id)) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 646fe98f86d1e..5b84244e23ad2 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -438,30 +438,40 @@ impl ControlStreamManager { pub(super) fn complete_barrier( &mut self, task_id: u64, - database_id: DatabaseId, - creating_job_id: Option, - workers: &HashSet, - epoch: u64, - ) -> MetaResult<()> { - let partial_graph_id = to_partial_graph_id(database_id, creating_job_id); - workers.iter().try_for_each(|worker_id| { - self.nodes - .get_mut(worker_id) - .ok_or_else(|| anyhow!("unconnected node: {}", worker_id))? - .handle - .send_request(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::CompleteBarrier( - BarrierCompleteRequest { - task_id, - partial_graph_sync_epochs: HashMap::from_iter([( - partial_graph_id, - epoch, - )]), - }, - )), - })?; - Ok(()) - }) + infos: impl Iterator, &HashSet, u64)>, + ) -> MetaResult> { + let mut workers = HashSet::new(); + let mut worker_request: HashMap<_, HashMap<_, _>> = HashMap::new(); + for (database_id, creating_job_id, workers, epoch) in infos { + let partial_graph_id = to_partial_graph_id(database_id, creating_job_id); + for worker_id in workers { + worker_request + .entry(*worker_id) + .or_default() + .try_insert(partial_graph_id, epoch) + .expect("non-duplicate"); + } + } + + worker_request + .into_iter() + .try_for_each::<_, Result<_, MetaError>>(|(worker_id, partial_graph_sync_epochs)| { + workers.insert(worker_id); + self.nodes + .get_mut(&worker_id) + .ok_or_else(|| anyhow!("unconnected node: {}", worker_id))? + .handle + .send_request(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::CompleteBarrier( + BarrierCompleteRequest { + task_id, + partial_graph_sync_epochs, + }, + )), + })?; + Ok(()) + })?; + Ok(workers) } pub(super) fn add_partial_graph( diff --git a/src/meta/src/barrier/utils.rs b/src/meta/src/barrier/utils.rs index 3ec6eae381a61..01180df0775a0 100644 --- a/src/meta/src/barrier/utils.rs +++ b/src/meta/src/barrier/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -25,17 +25,11 @@ use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; use risingwave_meta_model::WorkerId; use risingwave_pb::stream_service::BarrierCompleteResponse; -use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; +use crate::hummock::CommitEpochInfo; -#[expect(clippy::type_complexity)] pub(super) fn collect_resp_info( resps: HashMap, -) -> ( - HashMap, - Vec, - HashMap, - Vec, -) { +) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; let mut table_watermarks = Vec::with_capacity(resps.len()); @@ -57,51 +51,26 @@ pub(super) fn collect_resp_info( } ( - sst_to_worker, - synced_ssts, - merge_multiple_new_table_watermarks( - table_watermarks - .into_iter() - .map(|watermarks| { - watermarks - .into_iter() - .map(|(table_id, watermarks)| { - (TableId::new(table_id), TableWatermarks::from(&watermarks)) - }) - .collect() - }) - .collect_vec(), - ), + CommitEpochInfo { + sstables: synced_ssts, + new_table_watermarks: merge_multiple_new_table_watermarks( + table_watermarks + .into_iter() + .map(|watermarks| { + watermarks + .into_iter() + .map(|(table_id, watermarks)| { + (TableId::new(table_id), TableWatermarks::from(&watermarks)) + }) + .collect() + }) + .collect_vec(), + ), + sst_to_context: sst_to_worker, + new_table_fragment_infos: vec![], + change_log_delta: Default::default(), + tables_to_commit: Default::default(), + }, old_value_ssts, ) } - -pub(super) fn collect_creating_job_commit_epoch_info( - commit_info: &mut CommitEpochInfo, - epoch: u64, - resps: HashMap, - tables_to_commit: impl Iterator, - is_first_time: bool, -) { - let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps); - assert!(old_value_sst.is_empty()); - commit_info.sst_to_context.extend(sst_to_context); - commit_info.sstables.extend(sstables); - commit_info - .new_table_watermarks - .extend(new_table_watermarks); - let tables_to_commit: HashSet<_> = tables_to_commit.collect(); - tables_to_commit.iter().for_each(|table_id| { - commit_info - .tables_to_commit - .try_insert(*table_id, epoch) - .expect("non duplicate"); - }); - if is_first_time { - commit_info - .new_table_fragment_infos - .push(NewTableFragmentInfo { - table_ids: tables_to_commit, - }); - }; -} diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 777509d53ed88..688dd615f108f 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -198,10 +198,7 @@ impl HummockMetaClient for MockHummockMetaClient { BTreeSet::new() }; let table_change_log = build_table_change_log_delta( - sync_result - .old_value_ssts - .into_iter() - .map(|sst| sst.sst_info), + sync_result.old_value_ssts.iter().map(|sst| &sst.sst_info), sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), &vec![epoch], table_change_log_table_ids diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index cf3ded58b946e..0777beab46651 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -149,7 +149,7 @@ where } pub fn build_table_change_log_delta<'a>( - old_value_ssts: impl Iterator, + old_value_ssts: impl Iterator, new_value_ssts: impl Iterator, epochs: &Vec, log_store_table_ids: impl Iterator,