Skip to content

Commit

Permalink
support cancel background stream job
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 27, 2023
1 parent 4c26b85 commit bdab80e
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 20 deletions.
73 changes: 70 additions & 3 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
# cargo make ci-start ci-backfill
# ./ci/scripts/run-backfill-tests.sh
# ```
# Example progress:
# dev=> select * from rw_catalog.rw_ddl_progress;
# ddl_id | ddl_statement | progress | initialized_at
#--------+------------------------------------------------+----------+-------------------------------
# 1002 | CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t | 56.12% | 2023-09-27 06:37:06.636+00:00
#(1 row)


set -euo pipefail
Expand All @@ -30,6 +36,12 @@ flush() {
run_sql "FLUSH;"
}

cancel_stream_jobs() {
ID=$(run_sql "select ddl_id from rw_catalog.rw_ddl_progress;" | tail -3 | head -1 | grep -E -o "[0-9]*")
echo "CANCELLING STREAM_JOB: $ID"
run_sql "CANCEL JOBS $ID;"
}

# Test snapshot and upstream read.
test_snapshot_and_upstream_read() {
echo "--- e2e, ci-backfill, test_snapshot_and_upstream_read"
Expand All @@ -55,7 +67,7 @@ test_snapshot_and_upstream_read() {

# Test background ddl recovery
test_background_ddl_recovery() {
echo "--- e2e, ci-3streaming-2serving-3fe, test background ddl"
echo "--- e2e, ci-1cn-1fe-with-recovery, test background ddl"
cargo make ci-start ci-1cn-1fe-with-recovery

# Test before recovery
Expand Down Expand Up @@ -92,6 +104,58 @@ test_background_ddl_recovery() {
cargo make kill
}

test_background_ddl_cancel() {
echo "--- e2e, ci-1cn-1fe-with-recovery, test background ddl"
cargo make ci-start ci-1cn-1fe-with-recovery

# Test before recovery
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/create_table.slt"
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/create_mv.slt"
sleep 1
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/validate.slt"

cancel_stream_jobs
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/validate_after_cancel.slt"

sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/create_mv.slt"

# Restart
cargo make kill
cargo make dev ci-1cn-1fe-with-recovery

# Recover
sleep 3

cancel_stream_jobs
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/validate_after_cancel.slt"

sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/create_mv.slt"
sleep 1
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/validate.slt"
cancel_stream_jobs
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/cancel/drop.slt"
cargo make kill
}

# Test foreground ddl should not recover
test_foreground_ddl_cancel() {
echo "--- e2e, ci-3streaming-2serving-3fe, test background ddl"
cargo make ci-start ci-1cn-1fe-with-recovery

# Test before recovery
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/create_base_table.slt"
run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;" &
sleep 1
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/validate.slt"

cancel_stream_jobs
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/validate_after_cancel.slt"

run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;"
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/drop.slt"

cargo make kill
}
# Test foreground ddl should not recover
test_foreground_ddl_no_recover() {
echo "--- e2e, ci-3streaming-2serving-3fe, test background ddl"
Expand All @@ -112,6 +176,7 @@ test_foreground_ddl_no_recover() {

# Test after recovery
sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/validate_restart.slt"
sleep 30

sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/drop.slt"

Expand All @@ -121,8 +186,10 @@ test_foreground_ddl_no_recover() {
main() {
set -euo pipefail
# test_snapshot_and_upstream_read
test_background_ddl_recovery
test_foreground_ddl_no_recover
# test_background_ddl_recovery
test_background_ddl_cancel
# test_foreground_ddl_no_recover
# test_foreground_ddl_cancel
}

main
5 changes: 5 additions & 0 deletions e2e_test/background_ddl/cancel/create_mv.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
5 changes: 5 additions & 0 deletions e2e_test/background_ddl/cancel/create_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
CREATE TABLE t(v1 int);

statement ok
INSERT INTO t SELECT * FROM generate_series(1, 500000);
2 changes: 2 additions & 0 deletions e2e_test/background_ddl/cancel/drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
statement ok
DROP TABLE t;
4 changes: 4 additions & 0 deletions e2e_test/background_ddl/cancel/validate.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select count(*) from rw_catalog.rw_ddl_progress
----
1
4 changes: 4 additions & 0 deletions e2e_test/background_ddl/cancel/validate_after_cancel.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select count(*) from rw_catalog.rw_ddl_progress
----
0
4 changes: 4 additions & 0 deletions e2e_test/background_ddl/foreground/validate_after_cancel.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select count(*) from rw_catalog.rw_ddl_progress;
----
0
11 changes: 10 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use uuid::Uuid;
use super::info::BarrierActorInfo;
use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::manager::{FragmentManagerRef, WorkerId};
use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::MetaResult;
Expand Down Expand Up @@ -217,6 +217,7 @@ impl Command {
/// [`Command`].
pub struct CommandContext {
fragment_manager: FragmentManagerRef,
catalog_manager: CatalogManagerRef,

client_pool: StreamClientPoolRef,

Expand Down Expand Up @@ -247,6 +248,7 @@ impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
fragment_manager: FragmentManagerRef,
catalog_manager: CatalogManagerRef,
client_pool: StreamClientPoolRef,
info: BarrierActorInfo,
prev_epoch: TracedEpoch,
Expand All @@ -259,6 +261,7 @@ impl CommandContext {
) -> Self {
Self {
fragment_manager,
catalog_manager,
client_pool,
info: Arc::new(info),
prev_epoch,
Expand Down Expand Up @@ -663,6 +666,12 @@ impl CommandContext {
Command::CancelStreamingJob(table_fragments) => {
let node_actors = table_fragments.worker_actor_ids();
self.clean_up(node_actors).await?;
self.catalog_manager
.cancel_create_table_procedure_with_id(
table_fragments.table_id().table_id,
self.fragment_manager.clone(),
)
.await?;
// Drop fragment info in meta store.
self.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ impl GlobalBarrierManager {

let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.catalog_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch.clone(),
Expand Down Expand Up @@ -994,14 +995,13 @@ impl GlobalBarrierManager {
};
if let Err(e) = res.as_ref() {
tracing::error!("Failed to finish stream job: {e:?}");
catalog_manager
.cancel_create_table_procedure(&table, fragment_manager)
.await
.unwrap();
// catalog_manager
// .cancel_create_table_procedure(&table, fragment_manager)
// .await
// .unwrap();
}
// FIXME: Should copy the functionality from DDLController,
// and call cancel_stream_job here if any part of this failed.
res.unwrap();
});
}
Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl GlobalBarrierManager {
// Inject the `Initial` barrier to initialize all executors.
let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.catalog_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch.clone(),
Expand Down
28 changes: 28 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,40 @@ impl CatalogManager {
Ok(version)
}

pub async fn cancel_create_table_procedure_with_id(
&self,
table_id: TableId,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
{
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let tables = &mut database_core.tables;
if let Some(table) = tables.get(&table_id).cloned() {
self.cancel_create_table_procedure_inner(core, &table, fragment_manager)
.await?;
return Ok(());
}
}
bail!("Table ID: {table_id} missing when attempting to cancel job")
}

pub async fn cancel_create_table_procedure(
&self,
table: &Table,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
self.cancel_create_table_procedure_inner(core, table, fragment_manager)
.await
}

pub async fn cancel_create_table_procedure_inner(
&self,
core: &mut CatalogManagerCore,
table: &Table,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
let database_core = &mut core.database;
let user_core = &mut core.user;
Self::check_table_creating(&database_core.tables, &table);
Expand Down
17 changes: 11 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,10 @@ impl DdlController {
match stream_job {
StreamingJob::MaterializedView(table) => {
creating_internal_table_ids.push(table.id);
self.catalog_manager
.cancel_create_table_procedure(table, self.fragment_manager.clone())
.await?;
// barrier manager will do the cleanup.
// self.catalog_manager
// .cancel_create_table_procedure(table, self.fragment_manager.clone())
// .await?;
}
StreamingJob::Sink(sink) => {
self.catalog_manager
Expand All @@ -751,9 +752,13 @@ impl DdlController {
.cancel_create_table_procedure_with_source(source, table)
.await;
} else {
self.catalog_manager
.cancel_create_table_procedure(table, self.fragment_manager.clone())
.await?;
// FIXME: Perhaps we still need to do some cleanup here???
// Or we need to revert the cancel table, and only drop
// the tables from meta store in barrier manager,
// and do other cleanups here?
// self.catalog_manager
// .cancel_create_table_procedure(table, self.fragment_manager.clone())
// .await?;
}
}
StreamingJob::Index(index, table) => {
Expand Down
25 changes: 20 additions & 5 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,25 @@ impl CreatingStreamingJobInfo {
jobs.remove(&job_id);
}

async fn cancel_jobs(&self, job_ids: Vec<TableId>) -> HashMap<TableId, oneshot::Receiver<()>> {
async fn cancel_jobs(&self, job_ids: Vec<TableId>) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
let mut jobs = self.streaming_jobs.lock().await;
let mut receivers = HashMap::new();
let mut recovered_job_ids = vec![];
for job_id in job_ids {
if let Some(job) = jobs.get_mut(&job_id)
&& let Some(shutdown_tx) = job.shutdown_tx.take()
{
let (tx, rx) = oneshot::channel();
if shutdown_tx.send(CreatingState::Canceling{finish_tx: tx}).await.is_ok() {
if shutdown_tx.send(CreatingState::Canceling { finish_tx: tx }).await.is_ok() {
receivers.insert(job_id, rx);
} else {
tracing::warn!("failed to send canceling state");
}
} else {
recovered_job_ids.push(job_id);
}
}
receivers
(receivers, recovered_job_ids)
}
}

Expand Down Expand Up @@ -559,7 +562,7 @@ impl GlobalStreamManager {
}

let _reschedule_job_lock = self.reschedule_lock.read().await;
let receivers = self.creating_job_info.cancel_jobs(table_ids).await;
let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;

let futures = receivers.into_iter().map(|(id, receiver)| async move {
if receiver.await.is_ok() {
Expand All @@ -570,7 +573,19 @@ impl GlobalStreamManager {
None
}
});
join_all(futures).await.into_iter().flatten().collect_vec()
let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
let fragments = self
.fragment_manager
.select_table_fragments_by_ids(&recovered_job_ids)
.await
.expect("recovered table should have fragment");
for fragment in fragments {
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await.expect("should be able to cancel recovered stream job");
}
cancelled_ids.extend(recovered_job_ids);
cancelled_ids
}
}

Expand Down

0 comments on commit bdab80e

Please sign in to comment.