Skip to content

Commit

Permalink
reorganize
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 22, 2024
1 parent 83771ae commit 596b11c
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 104 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamin
use crate::barrier::checkpoint::state::BarrierWorkerState;
use crate::barrier::command::CommandContext;
use crate::barrier::complete_task::{
BarrierCompleteOutput, CompleteBarrierTask, CreatingJobCompleteBarrierTask,
BarrierCommitOutput, CompleteBarrierTask, CreatingJobCompleteBarrierTask,
DatabaseCompleteBarrierTask,
};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
Expand Down Expand Up @@ -66,7 +66,7 @@ impl CheckpointControl {

pub(crate) fn ack_completed(
&mut self,
output: BarrierCompleteOutput,
output: BarrierCommitOutput,
control_stream_manager: &mut ControlStreamManager,
) {
self.hummock_version_stats = output.hummock_version_stats;
Expand Down
126 changes: 64 additions & 62 deletions src/meta/src/barrier/complete_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ impl CompleteBarrierTask {
}
}

impl CompleteBarrierTask {
pub(super) async fn complete_barrier(
self,
impl CommittingTask {
pub(super) async fn commit_barrier(
task: CompleteBarrierTask,
commit_info: CommitEpochInfo,
context: &impl GlobalBarrierWorkerContext,
env: MetaSrvEnv,
Expand All @@ -146,7 +146,7 @@ impl CompleteBarrierTask {
.barrier_wait_commit_latency
.start_timer();
let version_stats = context.commit_epoch(commit_info).await?;
for command_ctx in self
for command_ctx in task
.tasks
.values()
.flat_map(|(command, _)| command.as_ref().map(|task| &task.command))
Expand All @@ -162,22 +162,22 @@ impl CompleteBarrierTask {
let version_stats = match result {
Ok(version_stats) => version_stats,
Err(e) => {
for notifier in self.notifiers {
for notifier in task.notifiers {
notifier.notify_collection_failed(e.clone());
}
return Err(e);
}
};
self.notifiers.into_iter().for_each(|notifier| {
task.notifiers.into_iter().for_each(|notifier| {
notifier.notify_collected();
});
try_join_all(
self.finished_jobs
task.finished_jobs
.into_iter()
.map(|finished_job| context.finish_creating_job(finished_job)),
)
.await?;
for task in self.tasks.into_values().flat_map(|(task, _)| task) {
for task in task.tasks.into_values().flat_map(|(task, _)| task) {
let duration_sec = task.enqueue_time.stop_and_record();
Self::report_complete_event(&env, duration_sec, &task.command);
GLOBAL_META_METRICS
Expand All @@ -189,9 +189,7 @@ impl CompleteBarrierTask {

Ok(version_stats)
}
}

impl CompleteBarrierTask {
fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
Expand All @@ -211,20 +209,20 @@ impl CompleteBarrierTask {
}
}

pub(super) struct BarrierCompleteOutput {
pub(super) struct BarrierCommitOutput {
#[expect(clippy::type_complexity)]
/// `database_id` -> (`Some(database_graph_committed_epoch)`, vec(`creating_job_id`, `creating_job_committed_epoch`, `is_finished`)])
pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64, bool)>)>,
pub hummock_version_stats: HummockVersionStats,
}

pub(super) struct SyncingTask {
pub(super) struct CompletingTask {
node_to_collect: HashSet<WorkerId>,
collected_resps: HashMap<WorkerId, BarrierCompleteResponse>,
task: CompleteBarrierTask,
}

impl SyncingTask {
impl CompletingTask {
fn new(
task_id: u64,
task: CompleteBarrierTask,
Expand All @@ -239,11 +237,11 @@ impl SyncingTask {
})
}

pub(super) fn is_collected(&self) -> bool {
fn is_completed(&self) -> bool {
self.node_to_collect.is_empty()
}

pub(super) fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) {
fn into_commit_info(self) -> (CommitEpochInfo, CompleteBarrierTask) {
assert!(self.node_to_collect.is_empty());
let (mut commit_info, old_value_ssts) = collect_resp_info(self.collected_resps);
for (database_task, creating_jobs) in self.task.tasks.values() {
Expand Down Expand Up @@ -277,18 +275,16 @@ impl SyncingTask {
}
}

pub(super) struct CompletingTask {
pub(super) struct CompletingTasks {
next_task_id: u64,
pub(super) committing_task: CommittingTask,
pub(super) syncing_tasks: BTreeMap<u64, SyncingTask>,
tasks: BTreeMap<u64, CompletingTask>,
}

impl CompletingTask {
impl CompletingTasks {
pub(super) fn new() -> Self {
Self {
next_task_id: 0,
committing_task: CommittingTask::None,
syncing_tasks: Default::default(),
tasks: Default::default(),
}
}

Expand All @@ -299,42 +295,70 @@ impl CompletingTask {
) -> MetaResult<()> {
let task_id = self.next_task_id;
self.next_task_id += 1;
let task = SyncingTask::new(task_id, task, control_stream_manager)?;
self.syncing_tasks.insert(task_id, task);
let task = CompletingTask::new(task_id, task, control_stream_manager)?;
self.tasks.insert(task_id, task);
Ok(())
}

pub(super) fn next_completed_barrier<'a>(
pub(super) fn next_completed_task(&mut self) -> Option<(CommitEpochInfo, CompleteBarrierTask)> {
if let Some((_, task)) = self.tasks.first_key_value()
&& task.is_completed()
{
let (_, task) = self.tasks.pop_first().expect("non-empty");
Some(task.into_commit_info())
} else {
None
}
}

pub(super) fn on_barrier_complete_resp(
&mut self,
worker_id: WorkerId,
resp: BarrierCompleteResponse,
) {
let task = self.tasks.get_mut(&resp.task_id).expect("should exist");
assert!(task.node_to_collect.remove(&worker_id));
task.collected_resps
.try_insert(worker_id, resp)
.expect("non-duplicate");
}

pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool {
self.tasks
.values()
.any(|task| task.node_to_collect.contains(&worker_id))
}
}

impl CommittingTask {
pub(super) fn next_committed_barrier<'a>(
&'a mut self,
completing_tasks: &mut CompletingTasks,
context: &Arc<impl GlobalBarrierWorkerContext>,
env: &MetaSrvEnv,
) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
) -> impl Future<Output = MetaResult<BarrierCommitOutput>> + 'a {
// If there is no completing barrier, try to start completing the earliest barrier if
// it has been collected.
if let CommittingTask::None = &self.committing_task {
if let Some((_, task)) = self.syncing_tasks.first_key_value()
&& task.is_collected()
{
let (_, task) = self.syncing_tasks.pop_first().expect("non-empty");
let (commit_info, task) = task.into_commit_info();
if let CommittingTask::None = &self {
if let Some((commit_info, task)) = completing_tasks.next_completed_task() {
let epochs_to_ack = task.epochs_to_ack();
let context = context.clone();
let env = env.clone();
let join_handle = tokio::spawn(async move {
task.complete_barrier(commit_info, &*context, env).await
CommittingTask::commit_barrier(task, commit_info, &*context, env).await
});
self.committing_task = CommittingTask::Committing {
*self = CommittingTask::Committing {
epochs_to_ack,
join_handle,
};
}
}

self.next_completed_barrier_inner()
self.next_committed_barrier_inner()
}

async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
let CommittingTask::Committing { join_handle, .. } = &mut self.committing_task else {
async fn next_committed_barrier_inner(&mut self) -> MetaResult<BarrierCommitOutput> {
let CommittingTask::Committing { join_handle, .. } = self else {
return pending().await;
};

Expand All @@ -347,46 +371,24 @@ impl CompletingTask {
};
// It's important to reset the completing_command after await no matter the result is err
// or not, and otherwise the join handle will be polled again after ready.
let next_completing_command_status = if let Err(e) = &join_result {
let next_committing_task_status = if let Err(e) = &join_result {
CommittingTask::Err(e.clone())
} else {
CommittingTask::None
};
let completed_task =
replace(&mut self.committing_task, next_completing_command_status);
let committed_task = replace(self, next_committing_task_status);
let hummock_version_stats = join_result?;

must_match!(completed_task, CommittingTask::Committing {
must_match!(committed_task, CommittingTask::Committing {
epochs_to_ack,
..
} => {
Ok(BarrierCompleteOutput {
Ok(BarrierCommitOutput {
epochs_to_ack,
hummock_version_stats,
})
})
}
}
}

pub(super) fn on_barrier_complete_resp(
&mut self,
worker_id: WorkerId,
resp: BarrierCompleteResponse,
) {
let task = self
.syncing_tasks
.get_mut(&resp.task_id)
.expect("should exist");
assert!(task.node_to_collect.remove(&worker_id));
task.collected_resps
.try_insert(worker_id, resp)
.expect("non-duplicate");
}

pub(super) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool {
self.syncing_tasks
.values()
.any(|task| task.node_to_collect.contains(&worker_id))
}
}
40 changes: 21 additions & 19 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::future::poll_fn;
use std::task::Poll;
use std::task::{Context, Poll};
use std::time::Duration;

use anyhow::anyhow;
Expand All @@ -38,7 +38,6 @@ use risingwave_pb::stream_service::{
InjectBarrierRequest, StreamingControlStreamRequest,
};
use risingwave_rpc_client::StreamingControlHandle;
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::time::{sleep, timeout};
use tokio_retry::strategy::ExponentialBackoff;
Expand Down Expand Up @@ -170,16 +169,17 @@ impl ControlStreamManager {
*self = Self::new(self.env.clone());
}

async fn next_response_inner(
fn poll_next_response(
&mut self,
) -> Option<(
cx: &mut Context<'_>,
) -> Poll<(
WorkerId,
MetaResult<streaming_control_stream_response::Response>,
)> {
if self.nodes.is_empty() {
return None;
return Poll::Pending;
}
let (worker_id, result) = poll_fn(|cx| {
let result: Poll<(WorkerId, MetaResult<_>)> = {
for (worker_id, node) in &mut self.nodes {
match node.handle.response_stream.poll_next_unpin(cx) {
Poll::Ready(result) => {
Expand Down Expand Up @@ -213,18 +213,17 @@ impl ControlStreamManager {
}
}
Poll::Pending
})
.await;
};

if let Err(err) = &result {
if let Poll::Ready((worker_id, Err(err))) = &result {
let node = self
.nodes
.remove(&worker_id)
.remove(worker_id)
.expect("should exist when get shutdown resp");
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
}

Some((worker_id, result))
result
}

pub(super) async fn next_response(
Expand All @@ -233,7 +232,7 @@ impl ControlStreamManager {
WorkerId,
MetaResult<streaming_control_stream_response::Response>,
) {
pending_on_none(self.next_response_inner()).await
poll_fn(|cx| self.poll_next_response(cx)).await
}

pub(super) async fn collect_errors(
Expand All @@ -244,14 +243,17 @@ impl ControlStreamManager {
let mut errors = vec![(worker_id, first_err)];
#[cfg(not(madsim))]
{
let _ = timeout(COLLECT_ERROR_TIMEOUT, async {
while let Some((worker_id, result)) = self.next_response_inner().await {
if let Err(e) = result {
errors.push((worker_id, e));
if !self.nodes.is_empty() {
let _ = timeout(COLLECT_ERROR_TIMEOUT, async {
loop {
let (worker_id, result) = self.next_response().await;
if let Err(e) = result {
errors.push((worker_id, e));
}
}
}
})
.await;
})
.await;
}
}
tracing::debug!(?errors, "collected stream errors");
errors
Expand Down
Loading

0 comments on commit 596b11c

Please sign in to comment.