Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 5, 2024
1 parent 462275f commit 9474aa1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ echo "--- Download artifacts"
download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

export RUST_LOG="risingwave_meta::barrier::recovery=debug,\
export RUST_LOG="info,risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
Expand Down
10 changes: 9 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ impl GlobalBarrierManagerContext {

// Clean dirty fragments.
let stream_job_ids = mgr.catalog_manager.list_stream_job_ids().await?;
let stream_job_ids = &stream_job_ids;
let mut all_table_fragments = Vec::<u32>::new();
// let ptr = ((&mut all_table_fragments) as *mut Vec<u32>) as usize;
let to_drop_table_fragments = mgr
.fragment_manager
.list_dirty_table_fragments(|tf| {
// // SAFETY: trust me
// let all_table_fragments: &mut Vec<_> = unsafe { &mut *(ptr as *mut Vec<u32>) };
all_table_fragments.push(tf.table_id().table_id);
!stream_job_ids.contains(&tf.table_id().table_id)
})
.await;
Expand All @@ -89,7 +95,9 @@ impl GlobalBarrierManagerContext {
.collect();
debug!(
?stream_job_ids,
"clean dirty table fragments: {:?}", to_drop_streaming_ids
?all_table_fragments,
"clean dirty table fragments: {:?}",
to_drop_streaming_ids
);

let _unregister_table_ids = mgr
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl FragmentManager {

pub async fn list_dirty_table_fragments(
&self,
check_dirty: impl Fn(&TableFragments) -> bool,
mut check_dirty: impl FnMut(&TableFragments) -> bool,
) -> Vec<TableFragments> {
self.core
.read()
Expand Down

0 comments on commit 9474aa1

Please sign in to comment.