Skip to content

Commit

Permalink
add again
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 21, 2024
1 parent 34bcdbc commit f311539
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 66 deletions.
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ steps:

- label: "recovery test (madsim)"
key: "recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 65m ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 120m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
Expand All @@ -404,7 +404,7 @@ steps:
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 70
timeout_in_minutes: 120
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
Expand Down
3 changes: 0 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ message BarrierCompleteResponse {
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint64 partial_graph_id = 8;
// prev_epoch of barrier
uint64 epoch = 9;
}

message WaitEpochCommitRequest {
Expand Down
8 changes: 7 additions & 1 deletion src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ impl MonitorService for MonitorServiceImpl {
Default::default()
};

let mut barrier_traces_next_key = 0;
let barrier_traces_next_key = &mut barrier_traces_next_key;
let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
reg.collect::<BarrierAwait>()
.into_iter()
.map(|(k, v)| (k.prev_epoch, v.to_string()))
.map(|(k, v)| {
let key = *barrier_traces_next_key;
*barrier_traces_next_key += 1;
(key, format!("{:?}", (k.sync_graph_epochs, v.to_string())))
})
.collect()
} else {
Default::default()
Expand Down
92 changes: 38 additions & 54 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub use progress::CreateMviewProgressReporter;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo, SyncResult};
use risingwave_pb::stream_service::streaming_control_stream_request::{
InitRequest, InitialPartialGraph, Request,
};
Expand Down Expand Up @@ -330,10 +330,10 @@ impl LocalBarrierWorker {
),
});
}
(partial_graph_id, barrier, result) = rw_futures_util::pending_on_none(self.await_epoch_completed_futures.next()) => {
result = rw_futures_util::pending_on_none(self.await_epoch_completed_futures.next()) => {
match result {
Ok(result) => {
self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result);
self.on_epoch_completed(result);
}
Err(err) => {
self.notify_other_failure(err, "failed to complete epoch").await;
Expand Down Expand Up @@ -402,16 +402,13 @@ impl LocalBarrierWorker {
Ok(())
}
Request::CompleteBarrier(req) => {
assert_eq!(req.partial_graph_sync_epochs.len(), 1);
let (partial_graph_id, prev_epoch) = req
.partial_graph_sync_epochs
.into_iter()
.next()
.expect("checked");
self.complete_barrier(
req.task_id,
PartialGraphId::new(partial_graph_id),
prev_epoch,
req.partial_graph_sync_epochs
.iter()
.map(|(partial_graph_id, epoch)| {
(PartialGraphId::new(*partial_graph_id), *epoch)
}),
);
Ok(())
}
Expand Down Expand Up @@ -494,37 +491,28 @@ mod await_epoch_completed_future {
use risingwave_hummock_sdk::SyncResult;

use crate::error::StreamResult;
use crate::executor::Barrier;
use crate::task::{await_tree_key, BarrierCompleteResult, PartialGraphId};

pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
+ 'static;
pub(super) type AwaitEpochCompletedFuture =
impl Future<Output = StreamResult<BarrierCompleteResult>> + 'static;

pub(super) fn instrument_complete_barrier_future(
task_id: u64,
partial_graph_id: PartialGraphId,
complete_barrier_future: BoxFuture<'static, StreamResult<SyncResult>>,
barrier: Barrier,
sync_graph_epochs: Vec<(PartialGraphId, u64)>,
barrier_await_tree_reg: Option<&await_tree::Registry>,
) -> AwaitEpochCompletedFuture {
let prev_epoch = barrier.epoch.prev;
let future = complete_barrier_future.map(move |result| {
(
partial_graph_id,
barrier,
result.map(|sync_result| BarrierCompleteResult {
task_id,
sync_result,
}),
)
result.map(|sync_result| BarrierCompleteResult {
task_id,
sync_result,
})
});
if let Some(reg) = barrier_await_tree_reg {
reg.register(
await_tree_key::BarrierAwait { prev_epoch },
format!("SyncEpoch({})", prev_epoch),
)
.instrument(future)
.left_future()
let span = format!("SyncEpoch({:?})", sync_graph_epochs);
reg.register(await_tree_key::BarrierAwait { sync_graph_epochs }, span)
.instrument(future)
.left_future()
} else {
future.right_future()
}
Expand All @@ -538,26 +526,26 @@ use risingwave_storage::StateStoreImpl;
fn sync_epoch(
state_store: &StateStoreImpl,
streaming_metrics: &StreamingMetrics,
prev_epoch: u64,
table_ids: HashSet<TableId>,
sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
) -> BoxFuture<'static, StreamResult<SyncResult>> {
let timer = streaming_metrics.barrier_sync_latency.start_timer();
let hummock = state_store.as_hummock().cloned();
let sync_table_epochs_clone = sync_table_epochs.clone();
let future = async move {
if let Some(hummock) = hummock {
hummock.sync(vec![(prev_epoch, table_ids)]).await
hummock.sync(sync_table_epochs_clone).await
} else {
Ok(SyncResult::default())
}
};
future
.instrument_await(format!("sync_epoch (epoch {})", prev_epoch))
.instrument_await(format!("sync_epoch (epoch {:?})", sync_table_epochs))
.inspect_ok(move |_| {
timer.observe_duration();
})
.map_err(move |e| {
tracing::error!(
prev_epoch,
?sync_table_epochs,
error = %e.as_report(),
"Failed to sync state store",
);
Expand All @@ -570,39 +558,37 @@ impl LocalBarrierWorker {
fn complete_barrier(
&mut self,
task_id: u64,
partial_graph_id: PartialGraphId,
prev_epoch: u64,
sync_graph_epochs: impl Iterator<Item = (PartialGraphId, u64)>,
) {
let sync_graph_epochs = sync_graph_epochs.collect_vec();
{
let (barrier, table_ids) = self
.state
.pop_barrier_to_complete(partial_graph_id, prev_epoch);

let complete_barrier_future = sync_epoch(
&self.actor_manager.env.state_store(),
&self.actor_manager.streaming_metrics,
prev_epoch,
table_ids.expect("should be Some on BarrierKind::Checkpoint"),
sync_graph_epochs
.iter()
.map(|(partial_graph_id, prev_epoch)| {
let (barrier, table_ids) = self
.state
.pop_barrier_to_complete(*partial_graph_id, *prev_epoch);
assert!(barrier.kind.is_checkpoint());
(barrier.epoch.prev, table_ids)
})
.collect_vec(),
);

self.await_epoch_completed_futures.push_back({
instrument_complete_barrier_future(
task_id,
partial_graph_id,
complete_barrier_future,
barrier,
sync_graph_epochs,
self.actor_manager.await_tree_reg.as_ref(),
)
});
}
}

fn on_epoch_completed(
&mut self,
partial_graph_id: PartialGraphId,
epoch: u64,
result: BarrierCompleteResult,
) {
fn on_epoch_completed(&mut self, result: BarrierCompleteResult) {
let BarrierCompleteResult {
task_id,
sync_result,
Expand All @@ -619,8 +605,6 @@ impl LocalBarrierWorker {
streaming_control_stream_response::Response::CompleteBarrier(
BarrierCompleteResponse {
task_id,
partial_graph_id: partial_graph_id.into(),
epoch,
synced_sstables: synced_sstables
.into_iter()
.map(
Expand Down
11 changes: 8 additions & 3 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ impl ManagedBarrierState {
&mut self,
partial_graph_id: PartialGraphId,
prev_epoch: u64,
) -> (Barrier, Option<HashSet<TableId>>) {
) -> (Barrier, HashSet<TableId>) {
self.graph_states
.get_mut(&partial_graph_id)
.expect("should exist")
Expand Down Expand Up @@ -682,7 +682,7 @@ impl PartialGraphManagedBarrierState {
None
}

fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> (Barrier, Option<HashSet<TableId>>) {
fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> (Barrier, HashSet<TableId>) {
let (popped_prev_epoch, barrier_state) = self
.epoch_barrier_state_map
.pop_first()
Expand All @@ -696,7 +696,12 @@ impl PartialGraphManagedBarrierState {
assert!(barrier_state.barrier.kind.is_checkpoint());

assert_matches!(barrier_state.inner, ManagedBarrierStateInner::AwaitComplete);
(barrier_state.barrier, barrier_state.table_ids)
(
barrier_state.barrier,
barrier_state
.table_ids
.expect("should be Some on checkpoint barrier"),
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub type UpDownActorIds = (ActorId, ActorId);
pub type UpDownFragmentIds = (FragmentId, FragmentId);

#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
struct PartialGraphId(u64);
pub struct PartialGraphId(u64);

impl PartialGraphId {
fn new(id: u64) -> Self {
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ pub type ActorHandle = JoinHandle<()>;
pub type AtomicU64Ref = Arc<AtomicU64>;

pub mod await_tree_key {
use crate::task::PartialGraphId;

/// Await-tree key type for actors.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Actor(pub crate::task::ActorId);

/// Await-tree key type for barriers.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BarrierAwait {
pub prev_epoch: u64,
pub sync_graph_epochs: Vec<(PartialGraphId, u64)>,
}
}

Expand Down

0 comments on commit f311539

Please sign in to comment.