Skip to content

Commit

Permalink
notify mv
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 5, 2024
1 parent 6ef9193 commit 0676389
Showing 1 changed file with 37 additions and 28 deletions.
65 changes: 37 additions & 28 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ impl CatalogManager {
StreamingJob::MaterializedView(table) => {
creating_internal_table_ids.push(table.id);
self.finish_create_materialized_view_procedure(internal_tables, table)
.await?
.await?;
}
StreamingJob::Sink(sink, target_table) => {
let sink_id = sink.id;
Expand Down Expand Up @@ -1293,41 +1293,50 @@ impl CatalogManager {
&self,
mut internal_tables: Vec<Table>,
mut table: Table,
) -> MetaResult<NotificationVersion> {
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let tables = &mut database_core.tables;
if cfg!(not(test)) {
Self::check_table_creating(tables, &table)?;
}
let mut tables = BTreeMapTransaction::new(tables);
let version = try {
if cfg!(not(test)) {
Self::check_table_creating(tables, &table)?;
}
let mut tables = BTreeMapTransaction::new(tables);

table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, tables)?;
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, tables)?;

tracing::debug!(id = ?table.id, "notifying frontend");
let version = self
.notify_frontend(
Operation::Update,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
relation_info: RelationInfo::Table(table.to_owned()).into(),
}]
.into_iter()
.chain(internal_tables.into_iter().map(|internal_table| Relation {
relation_info: RelationInfo::Table(internal_table).into(),
}))
.collect_vec(),
}),
)
.await;
tracing::debug!(id = ?table.id, "notifying frontend");
let version = self
.notify_frontend(
Operation::Update,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
relation_info: RelationInfo::Table(table.to_owned()).into(),
}]
.into_iter()
.chain(internal_tables.into_iter().map(|internal_table| Relation {
relation_info: RelationInfo::Table(internal_table).into(),
}))
.collect_vec(),
}),
)
.await;
version
};

Ok(version)
if let Some(tx) = core.table_id_to_tx.remove(&table.id) {
tx.send(version).unwrap();
} else {
core.table_id_to_version.insert(table.id, version);
}

Ok(())
}

/// Used to cleanup `CREATE MATERIALIZED VIEW` state in stream manager.
Expand Down

0 comments on commit 0676389

Please sign in to comment.