Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 27, 2023
1 parent a99d25c commit be0e5d3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 84 deletions.
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ impl StreamTableScan {
let ctx = self.base.ctx();
let config = ctx.session_ctx().config();
let rate_limit = config.get_streaming_rate_limit();
let snapshot_read_delay =
config.get_backfill_snapshot_read_delay();
let snapshot_read_delay = config.get_backfill_snapshot_read_delay();

PbStreamNode {
fields: self.schema().to_prost(),
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ impl CatalogManager {
}
commit_meta!(self, tables)?;

tracing::debug!(id = ?table.id, "notifying frontend");
let version = self
.notify_frontend(
Operation::Add,
Expand Down Expand Up @@ -2517,6 +2518,15 @@ impl CatalogManager {
.await
}

pub async fn table_is_created(&self, table_id: TableId) -> bool {
let guard = self.core.lock().await;
return if let Some(table) = guard.database.tables.get(&table_id) {
table.get_stream_job_status() != Ok(StreamJobStatus::Creating)
} else {
false
};
}

pub async fn get_tables(&self, table_ids: &[TableId]) -> Vec<Table> {
let mut tables = vec![];
let guard = self.core.lock().await;
Expand Down
138 changes: 66 additions & 72 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,86 +263,80 @@ impl GlobalStreamManager {
.in_current_span();
tokio::spawn(fut);

let res = try {
while let Some(state) = receiver.recv().await {
match state {
CreatingState::Failed { reason } => {
tracing::debug!(id=?table_id, "stream job failed");
self.creating_job_info.delete_job(table_id).await;
return Err(reason);
}
CreatingState::Canceling { finish_tx } => {
tracing::debug!(id=?table_id, "cancelling streaming job");
if let Ok(table_fragments) = self
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
while let Some(state) = receiver.recv().await {
match state {
CreatingState::Failed { reason } => {
tracing::debug!(id=?table_id, "stream job failed");
self.creating_job_info.delete_job(table_id).await;
return Err(reason);
}
CreatingState::Canceling { finish_tx } => {
tracing::debug!(id=?table_id, "cancelling streaming job");
if let Ok(table_fragments) = self
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
.await
{
// try to cancel buffered creating command.
if self
.barrier_scheduler
.try_cancel_scheduled_create(table_id)
.await
{
// try to cancel buffered creating command.
if self
.barrier_scheduler
.try_cancel_scheduled_create(table_id)
.await
{
tracing::debug!(
"cancelling streaming job {table_id} in buffer queue."
);
let node_actors = table_fragments.worker_actor_ids();
let cluster_info =
self.cluster_manager.get_streaming_cluster_info().await;
let node_actors = node_actors
.into_iter()
.map(|(id, actor_ids)| {
(
cluster_info.worker_nodes.get(&id).cloned().unwrap(),
actor_ids,
)
})
.collect_vec();
let futures = node_actors.into_iter().map(|(node, actor_ids)| {
let request_id = Uuid::new_v4().to_string();
async move {
let client =
self.env.stream_client_pool().get(&node).await?;
let request = DropActorsRequest {
request_id,
actor_ids,
};
client.drop_actors(request).await
}
});
try_join_all(futures).await?;

self.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
table_id,
)))
.await?;
}
if !table_fragments.is_created() {
tracing::debug!(
"cancelling streaming job {table_id} by issue cancel command."
);
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(table_fragments))
.await?;
}
let _ = finish_tx.send(()).inspect_err(|_| {
tracing::warn!("failed to notify cancelled: {table_id}")
tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
let node_actors = table_fragments.worker_actor_ids();
let cluster_info =
self.cluster_manager.get_streaming_cluster_info().await;
let node_actors = node_actors
.into_iter()
.map(|(id, actor_ids)| {
(
cluster_info.worker_nodes.get(&id).cloned().unwrap(),
actor_ids,
)
})
.collect_vec();
let futures = node_actors.into_iter().map(|(node, actor_ids)| {
let request_id = Uuid::new_v4().to_string();
async move {
let client = self.env.stream_client_pool().get(&node).await?;
let request = DropActorsRequest {
request_id,
actor_ids,
};
client.drop_actors(request).await
}
});
self.creating_job_info.delete_job(table_id).await;
return Err(MetaError::cancelled("create".into()));
try_join_all(futures).await?;

self.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
table_id,
)))
.await?;
}
}
CreatingState::Created => {
if !table_fragments.is_created() {
tracing::debug!(
"cancelling streaming job {table_id} by issue cancel command."
);
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(table_fragments))
.await?;
}
let _ = finish_tx.send(()).inspect_err(|_| {
tracing::warn!("failed to notify cancelled: {table_id}")
});
self.creating_job_info.delete_job(table_id).await;
return Ok(());
return Err(MetaError::cancelled("create".into()));
}
}
CreatingState::Created => {
self.creating_job_info.delete_job(table_id).await;
return Ok(());
}
}
};

res
}
Ok(())
}

async fn build_actors(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ where
tokio::time::sleep(Duration::from_millis(
self.snapshot_read_delay as u64,
))
.await;
.await;
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_STREAMING_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=4000;";
const SET_BACKFILL_SNAPSHOT_READ_DELAY: &str =
"SET BACKFILL_SNAPSHOT_READ_DELAY=100;";
const SET_BACKFILL_SNAPSHOT_READ_DELAY: &str = "SET BACKFILL_SNAPSHOT_READ_DELAY=100;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
Expand Down Expand Up @@ -63,11 +62,12 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result<Vec<u32>> {
tracing::info!("selected streaming jobs to cancel {:?}", ids);
tracing::info!("cancelling streaming jobs");
let ids = ids.split('\n').collect::<Vec<_>>().join(",");
let result = session
.run(&format!("cancel jobs {};", ids))
.await?;
let result = session.run(&format!("cancel jobs {};", ids)).await?;
tracing::info!("cancelled streaming jobs, {:#?}", result);
let ids = result.split('\n').map(|s| s.parse::<u32>().unwrap()).collect_vec();
let ids = result
.split('\n')
.map(|s| s.parse::<u32>().unwrap())
.collect_vec();
Ok(ids)
}

Expand Down Expand Up @@ -150,9 +150,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
session.run(SEED_TABLE).await?;
session.run(SET_BACKGROUND_DDL).await?;
session.run(SET_STREAMING_RATE_LIMIT).await?;
session
.run(SET_BACKFILL_SNAPSHOT_READ_DELAY)
.await?;
session.run(SET_BACKFILL_SNAPSHOT_READ_DELAY).await?;

for _ in 0..5 {
session.run(CREATE_MV1).await?;
Expand Down

0 comments on commit be0e5d3

Please sign in to comment.