Skip to content

Commit

Permalink
sync multiple graphs at once
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 21, 2024
1 parent ca83819 commit 34bcdbc
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 186 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 20
timeout_in_minutes: 25
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
Expand Down
38 changes: 17 additions & 21 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use risingwave_common::must_match;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::change_log::{build_table_change_log_delta, ChangeLogDelta};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
Expand All @@ -39,15 +41,13 @@ use risingwave_pb::stream_plan::{
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::hummock::NewTableFragmentInfo;
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
Expand Down Expand Up @@ -499,16 +499,16 @@ impl CommandContext {
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}

pub(super) fn collect_commit_epoch_info(
pub(super) fn collect_extra_commit_epoch_info(
&self,
info: &mut CommitEpochInfo,
resps: HashMap<WorkerId, BarrierCompleteResponse>,
synced_ssts: &Vec<LocalSstableInfo>,
old_value_ssts: &Vec<SstableInfo>,
backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
tables_to_commit: &mut HashMap<TableId, u64>,
new_table_fragment_infos: &mut Vec<NewTableFragmentInfo>,
change_log_delta: &mut HashMap<TableId, ChangeLogDelta>,
) {
let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
collect_resp_info(resps);

let new_table_fragment_infos = if let Some(Command::CreateStreamingJob { info, job_type }) =
let new_table_fragment_info = if let Some(Command::CreateStreamingJob { info, job_type }) =
&self.command
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
Expand All @@ -522,9 +522,9 @@ impl CommandContext {
table_ids.insert(TableId::new(mv_table_id));
}

vec![NewTableFragmentInfo { table_ids }]
Some(NewTableFragmentInfo { table_ids })
} else {
vec![]
None
};

let mut mv_log_store_truncate_epoch = HashMap::new();
Expand Down Expand Up @@ -558,25 +558,21 @@ impl CommandContext {
}

let table_new_change_log = build_table_change_log_delta(
old_value_ssts.into_iter(),
old_value_ssts.iter(),
synced_ssts.iter().map(|sst| &sst.sst_info),
must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
mv_log_store_truncate_epoch.into_iter(),
);

let epoch = self.barrier_info.prev_epoch();
for table_id in &self.table_ids_to_commit {
info.tables_to_commit
tables_to_commit
.try_insert(*table_id, epoch)
.expect("non duplicate");
}

info.sstables.extend(synced_ssts);
info.new_table_watermarks.extend(new_table_watermarks);
info.sst_to_context.extend(sst_to_context);
info.new_table_fragment_infos
.extend(new_table_fragment_infos);
info.change_log_delta.extend(table_new_change_log);
new_table_fragment_infos.extend(new_table_fragment_info);
change_log_delta.extend(table_new_change_log);
}
}

Expand Down
141 changes: 60 additions & 81 deletions src/meta/src/barrier/complete_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::barrier::command::CommandContext;
use crate::barrier::context::GlobalBarrierWorkerContext;
use crate::barrier::notifier::Notifier;
use crate::barrier::progress::TrackingJob;
use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager};
use crate::barrier::utils::collect_creating_job_commit_epoch_info;
use crate::hummock::CommitEpochInfo;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::utils::collect_resp_info;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::MetaSrvEnv;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -108,6 +108,30 @@ impl CompleteBarrierTask {
})
.collect()
}

fn graph_to_complete(
&self,
) -> impl Iterator<Item = (DatabaseId, Option<TableId>, &'_ HashSet<WorkerId>, u64)> + '_ {
self.tasks
.iter()
.flat_map(|(database_id, (database, creating_jobs))| {
database
.iter()
.map(|database| {
(
*database_id,
None,
&database.workers,
database.command.barrier_info.prev_epoch(),
)
})
.chain(
creating_jobs.iter().map(|task| {
(*database_id, Some(task.job_id), &task.workers, task.epoch)
}),
)
})
}
}

impl CompleteBarrierTask {
Expand Down Expand Up @@ -195,9 +219,8 @@ pub(super) struct BarrierCompleteOutput {
}

pub(super) struct SyncingTask {
collecting_partial_graphs: HashMap<(DatabaseId, Option<TableId>), HashSet<WorkerId>>,
collected_resps:
HashMap<(DatabaseId, Option<TableId>), HashMap<WorkerId, BarrierCompleteResponse>>,
node_to_collect: HashSet<WorkerId>,
collected_resps: HashMap<WorkerId, BarrierCompleteResponse>,
task: CompleteBarrierTask,
}

Expand All @@ -207,81 +230,49 @@ impl SyncingTask {
task: CompleteBarrierTask,
control_stream_manager: &mut ControlStreamManager,
) -> MetaResult<Self> {
let mut collected_resps = HashMap::new();
let mut collecting_partial_graphs = HashMap::new();
for (database_id, (database_task, creating_jobs)) in &task.tasks {
if let Some(task) = database_task {
collected_resps
.try_insert((*database_id, None), HashMap::new())
.expect("non-duplicate");
if !task.workers.is_empty() {
control_stream_manager.complete_barrier(
task_id,
*database_id,
None,
&task.workers,
task.command.barrier_info.prev_epoch(),
)?;
collecting_partial_graphs
.try_insert((*database_id, None), task.workers.clone())
.expect("non-duplicate");
}
}
for task in creating_jobs {
collected_resps
.try_insert((*database_id, Some(task.job_id)), HashMap::new())
.expect("non-duplicate");
if !task.workers.is_empty() {
control_stream_manager.complete_barrier(
task_id,
*database_id,
Some(task.job_id),
&task.workers,
task.epoch,
)?;
collecting_partial_graphs
.try_insert((*database_id, Some(task.job_id)), task.workers.clone())
.expect("non-duplicate");
}
}
}
let node_to_collect =
control_stream_manager.complete_barrier(task_id, task.graph_to_complete())?;
Ok(Self {
collecting_partial_graphs,
collected_resps,
node_to_collect,
collected_resps: HashMap::new(),
task,
})
}

pub(super) fn is_collected(&self) -> bool {
self.collecting_partial_graphs.is_empty()
self.node_to_collect.is_empty()
}

pub(super) fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) {
let mut resps = self.collected_resps;
let mut commit_info = CommitEpochInfo::default();
for (database_id, (database_task, creating_jobs)) in &self.task.tasks {
assert!(self.node_to_collect.is_empty());
let (mut commit_info, old_value_ssts) = collect_resp_info(self.collected_resps);
for (database_task, creating_jobs) in self.task.tasks.values() {
if let Some(task) = database_task {
let resps = resps.remove(&(*database_id, None)).expect("should exist");
task.command.collect_commit_epoch_info(
&mut commit_info,
resps,
task.command.collect_extra_commit_epoch_info(
&commit_info.sstables,
&old_value_ssts,
task.backfill_pinned_upstream_log_epoch.clone(),
&mut commit_info.tables_to_commit,
&mut commit_info.new_table_fragment_infos,
&mut commit_info.change_log_delta,
)
}
for task in creating_jobs {
let resps = resps
.remove(&(*database_id, Some(task.job_id)))
.expect("should exist");
collect_creating_job_commit_epoch_info(
&mut commit_info,
task.epoch,
resps,
task.tables_to_commit.iter().cloned(),
task.is_first_commit,
);
task.tables_to_commit.iter().for_each(|table_id| {
commit_info
.tables_to_commit
.try_insert(*table_id, task.epoch)
.expect("non duplicate");
});
if task.is_first_commit {
commit_info
.new_table_fragment_infos
.push(NewTableFragmentInfo {
table_ids: task.tables_to_commit.clone(),
});
};
}
}
assert!(resps.is_empty());
(commit_info, self.task)
}
}
Expand Down Expand Up @@ -383,31 +374,19 @@ impl CompletingTask {
worker_id: WorkerId,
resp: BarrierCompleteResponse,
) {
let partial_graph_id = from_partial_graph_id(resp.partial_graph_id);
let task = self
.syncing_tasks
.get_mut(&resp.task_id)
.expect("should exist");
assert!(task.node_to_collect.remove(&worker_id));
task.collected_resps
.get_mut(&partial_graph_id)
.expect("should exist")
.try_insert(worker_id, resp)
.expect("non-duplicate");
let remaining_workers = task
.collecting_partial_graphs
.get_mut(&partial_graph_id)
.expect("should exist");
assert!(remaining_workers.remove(&worker_id));
if remaining_workers.is_empty() {
task.collecting_partial_graphs.remove(&partial_graph_id);
}
}

pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool {
self.syncing_tasks.values().any(|task| {
task.collecting_partial_graphs
.values()
.any(|workers| workers.contains(&worker_id))
})
self.syncing_tasks
.values()
.any(|task| task.node_to_collect.contains(&worker_id))
}
}
58 changes: 34 additions & 24 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,30 +438,40 @@ impl ControlStreamManager {
pub(super) fn complete_barrier(
&mut self,
task_id: u64,
database_id: DatabaseId,
creating_job_id: Option<TableId>,
workers: &HashSet<WorkerId>,
epoch: u64,
) -> MetaResult<()> {
let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
workers.iter().try_for_each(|worker_id| {
self.nodes
.get_mut(worker_id)
.ok_or_else(|| anyhow!("unconnected node: {}", worker_id))?
.handle
.send_request(StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::CompleteBarrier(
BarrierCompleteRequest {
task_id,
partial_graph_sync_epochs: HashMap::from_iter([(
partial_graph_id,
epoch,
)]),
},
)),
})?;
Ok(())
})
infos: impl Iterator<Item = (DatabaseId, Option<TableId>, &HashSet<WorkerId>, u64)>,
) -> MetaResult<HashSet<WorkerId>> {
let mut workers = HashSet::new();
let mut worker_request: HashMap<_, HashMap<_, _>> = HashMap::new();
for (database_id, creating_job_id, workers, epoch) in infos {
let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
for worker_id in workers {
worker_request
.entry(*worker_id)
.or_default()
.try_insert(partial_graph_id, epoch)
.expect("non-duplicate");
}
}

worker_request
.into_iter()
.try_for_each::<_, Result<_, MetaError>>(|(worker_id, partial_graph_sync_epochs)| {
workers.insert(worker_id);
self.nodes
.get_mut(&worker_id)
.ok_or_else(|| anyhow!("unconnected node: {}", worker_id))?
.handle
.send_request(StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::CompleteBarrier(
BarrierCompleteRequest {
task_id,
partial_graph_sync_epochs,
},
)),
})?;
Ok(())
})?;
Ok(workers)
}

pub(super) fn add_partial_graph(
Expand Down
Loading

0 comments on commit 34bcdbc

Please sign in to comment.