Skip to content

Commit

Permalink
fix(meta): Fix sink_into_table causes mv's depended_subscription_ids …
Browse files Browse the repository at this point in the history
…to be empty. (#17664)
  • Loading branch information
xxhZs authored and xiangjinwu committed Jul 12, 2024
1 parent de4af9b commit 3765be3
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 14 deletions.
14 changes: 13 additions & 1 deletion e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,16 @@ statement ok
flush;

statement ok
create subscription sub from t1 with(retention = '1D');
create subscription sub from t1 with(retention = '1D');

statement ok
create table t2 (v1 int, v2 int);

statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create subscription sub2 from t3 with(retention = '1D');

statement ok
create sink s1 into t3 from t2;
14 changes: 13 additions & 1 deletion e2e_test/subscription/drop_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,16 @@ statement ok
drop subscription sub;

statement ok
drop table t1;
drop table t1;

statement ok
drop sink s1;

statement ok
drop subscription sub2;

statement ok
drop table t3;

statement ok
drop table t2;
24 changes: 23 additions & 1 deletion e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_cursor_with_table_alter():
drop_table_subscription()

def test_cursor_fetch_n():
print(f"test_cursor_with_table_alter")
print(f"test_cursor_fetch_n")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
Expand Down Expand Up @@ -304,6 +304,27 @@ def test_cursor_fetch_n():
check_rows_data([10,100],row[3],3)
drop_table_subscription()

def test_rebuild_table():
print(f"test_rebuild_table")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub2",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
execute_insert("flush",conn)
row = execute_query("fetch 4 from cur",conn)
assert len(row) == 3
check_rows_data([1,1],row[0],1)
check_rows_data([1,1],row[1],4)
check_rows_data([1,100],row[2],3)

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
Expand All @@ -313,3 +334,4 @@ def test_cursor_fetch_n():
test_cursor_since_begin()
test_cursor_with_table_alter()
test_cursor_fetch_n()
test_rebuild_table()
24 changes: 24 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,30 @@ impl CatalogController {
Ok(subscription)
}

pub async fn get_mv_depended_subscriptions(
&self,
) -> MetaResult<HashMap<risingwave_common::catalog::TableId, HashMap<u32, u64>>> {
let inner = self.inner.read().await;
let subscription_objs = Subscription::find()
.find_also_related(Object)
.all(&inner.db)
.await?;
let mut map = HashMap::new();
// Write object at the same time we write subscription, so we must be able to get obj
for subscription in subscription_objs
.into_iter()
.map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into())
{
let subscription: PbSubscription = subscription;
map.entry(risingwave_common::catalog::TableId::from(
subscription.dependent_table_id,
))
.or_insert(HashMap::new())
.insert(subscription.id, subscription.retention_seconds);
}
Ok(map)
}

pub async fn find_creating_streaming_job_ids(
&self,
infos: Vec<PbCreatingJobInfo>,
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3815,6 +3815,22 @@ impl CatalogManager {
Ok(subscription.clone())
}

pub async fn get_mv_depended_subscriptions(
&self,
) -> MetaResult<HashMap<risingwave_common::catalog::TableId, HashMap<u32, u64>>> {
let guard = self.core.lock().await;
let mut map = HashMap::new();
for subscription in guard.database.subscriptions.values() {
map.entry(risingwave_common::catalog::TableId::from(
subscription.dependent_table_id,
))
.or_insert(HashMap::new())
.insert(subscription.id, subscription.retention_seconds);
}

Ok(map)
}

pub async fn get_created_table_ids(&self) -> Vec<u32> {
let guard = self.core.lock().await;
guard
Expand Down
9 changes: 6 additions & 3 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,14 @@ impl MetadataManager {
}
}

#[expect(clippy::unused_async)]
pub async fn get_mv_depended_subscriptions(
&self,
) -> MetaResult<HashMap<TableId, HashMap<u32, u64>>> {
// TODO(subscription): support the correct logic when supporting L0 log store subscriptions
Ok(HashMap::new())
match self {
MetadataManager::V1(mgr) => mgr.catalog_manager.get_mv_depended_subscriptions().await,
MetadataManager::V2(mgr) => {
mgr.catalog_controller.get_mv_depended_subscriptions().await
}
}
}
}
6 changes: 3 additions & 3 deletions src/meta/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ pub use stream_manager::*;
pub(crate) fn to_build_actor_info(
actor: StreamActor,
subscriptions: &HashMap<TableId, HashMap<u32, u64>>,
fragment_table_id: TableId,
subscription_depend_table_id: TableId,
) -> BuildActorInfo {
BuildActorInfo {
actor: Some(actor),
related_subscriptions: subscriptions
.get(&fragment_table_id)
.get(&subscription_depend_table_id)
.into_iter()
.map(|subscriptions| {
(
fragment_table_id.table_id,
subscription_depend_table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions.keys().cloned().collect(),
},
Expand Down
22 changes: 17 additions & 5 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ impl GlobalStreamManager {
table_fragments: &TableFragments,
building_locations: &Locations,
existing_locations: &Locations,
subscription_depend_table_id: TableId,
) -> MetaResult<()> {
let actor_map = table_fragments.actor_map();

Expand Down Expand Up @@ -367,7 +368,7 @@ impl GlobalStreamManager {
to_build_actor_info(
actor_map[actor_id].clone(),
&subscriptions,
table_fragments.table_id(),
subscription_depend_table_id,
)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -406,8 +407,13 @@ impl GlobalStreamManager {
let mut replace_table_command = None;
let mut replace_table_id = None;

self.build_actors(&table_fragments, &building_locations, &existing_locations)
.await?;
self.build_actors(
&table_fragments,
&building_locations,
&existing_locations,
table_fragments.table_id(),
)
.await?;
tracing::debug!(
table_id = %table_fragments.table_id(),
"built actors finished"
Expand All @@ -418,6 +424,7 @@ impl GlobalStreamManager {
&table_fragments,
&context.building_locations,
&context.existing_locations,
context.old_table_fragments.table_id(),
)
.await?;

Expand Down Expand Up @@ -504,8 +511,13 @@ impl GlobalStreamManager {
existing_locations,
}: ReplaceTableContext,
) -> MetaResult<()> {
self.build_actors(&table_fragments, &building_locations, &existing_locations)
.await?;
self.build_actors(
&table_fragments,
&building_locations,
&existing_locations,
old_table_fragments.table_id(),
)
.await?;

let dummy_table_id = table_fragments.table_id();
let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?;
Expand Down

0 comments on commit 3765be3

Please sign in to comment.