Skip to content

Commit

Permalink
handle tx on finish create index and sink
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 26, 2024
1 parent 8d46f74 commit 4ef0b7b
Showing 1 changed file with 60 additions and 48 deletions.
108 changes: 60 additions & 48 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3135,37 +3135,37 @@ impl CatalogManager {
mut internal_tables: Vec<Table>,
mut index: Index,
mut table: Table,
) -> MetaResult<NotificationVersion> {
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let key = (table.database_id, table.schema_id, index.name.clone());
let version = try {
let key = (table.database_id, table.schema_id, index.name.clone());

let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
assert!(
!indexes.contains_key(&index.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"index must be in creating procedure"
);
let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
assert!(
!indexes.contains_key(&index.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"index must be in creating procedure"
);

database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.remove(&table.id);
database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.remove(&table.id);

index.stream_job_status = PbStreamJobStatus::Created.into();
indexes.insert(index.id, index.clone());
index.stream_job_status = PbStreamJobStatus::Created.into();
indexes.insert(index.id, index.clone());

table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, indexes, tables)?;
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, indexes, tables)?;

let version = self
.notify_frontend(
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: vec![
Expand All @@ -3183,9 +3183,15 @@ impl CatalogManager {
.collect_vec(),
}),
)
.await;
.await
};
if let Some(tx) = core.table_id_to_tx.remove(&index.id) {
tx.send(version).unwrap();
} else {
core.table_id_to_version.insert(index.id, version);
}

Ok(version)
Ok(())
}

pub async fn start_create_sink_procedure(&self, sink: &Sink) -> MetaResult<()> {
Expand Down Expand Up @@ -3221,33 +3227,33 @@ impl CatalogManager {
&self,
mut internal_tables: Vec<Table>,
mut sink: Sink,
) -> MetaResult<NotificationVersion> {
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let key = (sink.database_id, sink.schema_id, sink.name.clone());
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks);
assert!(
!sinks.contains_key(&sink.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"sink must be in creating procedure"
);
let version = try {
let key = (sink.database_id, sink.schema_id, sink.name.clone());
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks);
assert!(
!sinks.contains_key(&sink.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"sink must be in creating procedure"
);

database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.remove(&sink.id);
database_core.in_progress_creation_tracker.remove(&key);
database_core
.in_progress_creation_streaming_job
.remove(&sink.id);

sink.stream_job_status = PbStreamJobStatus::Created.into();
sinks.insert(sink.id, sink.clone());
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sinks, tables)?;
sink.stream_job_status = PbStreamJobStatus::Created.into();
sinks.insert(sink.id, sink.clone());
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sinks, tables)?;

let version = self
.notify_frontend(
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
Expand All @@ -3260,9 +3266,15 @@ impl CatalogManager {
.collect_vec(),
}),
)
.await;
.await
};

Ok(version)
if let Some(tx) = core.table_id_to_tx.remove(&sink.id) {
tx.send(version).unwrap();
} else {
core.table_id_to_version.insert(sink.id, version);
}
Ok(())
}

pub async fn cancel_create_sink_procedure(
Expand Down

0 comments on commit 4ef0b7b

Please sign in to comment.