Skip to content

Commit

Permalink
Revert "Revert "notify internal table catalog for v2""
Browse files Browse the repository at this point in the history
This reverts commit 339f089.
  • Loading branch information
kwannoel committed Jun 28, 2024
1 parent e9525b2 commit fff3849
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
19 changes: 17 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,13 @@ impl CatalogController {
pub async fn create_internal_table_catalog(
&self,
job_id: ObjectId,
is_mv: bool,
internal_tables: Vec<PbTable>,
) -> MetaResult<HashMap<u32, u32>> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let mut table_id_map = HashMap::new();
for table in internal_tables {
for table in &internal_tables {
let table_id = Self::create_object(
&txn,
ObjectType::Table,
Expand All @@ -340,13 +341,27 @@ impl CatalogController {
.await?
.oid;
table_id_map.insert(table.id, table_id as u32);
let mut table: table::ActiveModel = table.into();
let mut table: table::ActiveModel = table.clone().into();
table.table_id = Set(table_id as _);
table.belongs_to_job_id = Set(Some(job_id as _));
table.fragment_id = NotSet;
Table::insert(table).exec(&txn).await?;
}
txn.commit().await?;
if is_mv {
let relations = internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.to_owned())),
})
.collect_vec();
let _version = self
.notify_frontend(
Operation::Delete,
Info::RelationGroup(RelationGroup { relations }),
)
.await;
}

Ok(table_id_map)
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use risingwave_common::current_cluster_version;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;
use strum::EnumDiscriminants;
use strum::{EnumDiscriminants, EnumIs};

use crate::model::FragmentId;

// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants)]
#[derive(Debug, Clone, EnumDiscriminants, EnumIs)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ impl DdlController {
let internal_tables = fragment_graph.internal_tables().into_values().collect_vec();
let table_id_map = mgr
.catalog_controller
.create_internal_table_catalog(streaming_job.id() as _, internal_tables)
.create_internal_table_catalog(
streaming_job.id() as _,
streaming_job.is_materialized_view(),
internal_tables,
)
.await?;
fragment_graph.refill_internal_table_ids(table_id_map);

Expand Down

0 comments on commit fff3849

Please sign in to comment.