Skip to content

Commit

Permalink
feat(meta): refactor subscription and cursor with Partial ckpt (#16563)
Browse files Browse the repository at this point in the history
Co-authored-by: William Wen <[email protected]>
  • Loading branch information
xxhZs and wenym1 authored May 7, 2024
1 parent 3a1e259 commit 7df9465
Show file tree
Hide file tree
Showing 57 changed files with 1,084 additions and 1,528 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
timeout_in_minutes: 11
retry: *auto-retry

- label: "end-to-end test (parallel, in-memory) (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 16
retry: *auto-retry

- label: "end-to-end test for opendal (parallel)"
Expand Down
25 changes: 0 additions & 25 deletions e2e_test/ddl/alter_parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali
statement ok
create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id;

Expand Down Expand Up @@ -97,28 +94,9 @@ select parallelism from sink_parallelism where name = 's';
----
FIXED(4)

statement ok
create subscription subscription1 from t with (retention = '1D');

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
ADAPTIVE

statement ok
alter subscription subscription1 set parallelism = 4;

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
FIXED(4)

statement ok
drop sink s;

statement ok
drop subscription subscription1;

statement ok
drop materialized view m_join;

Expand Down Expand Up @@ -179,8 +157,5 @@ drop view mview_parallelism;
statement ok
drop view sink_parallelism;

statement ok
drop view subscription_parallelism;

statement ok
drop view fragment_parallelism;
3 changes: 3 additions & 0 deletions e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ create table t1 (v1 int, v2 int);
statement ok
insert into t1 values (1,2);

statement ok
flush;

statement ok
create subscription sub from t1 with(retention = '1D');
16 changes: 10 additions & 6 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ def execute_insert(sql,conn):

def check_rows_data(expect_vec,rows,status):
row = rows[0]
value_len = len(row)
for index, value in enumerate(row):
if index == 0:
if index == value_len - 1:
continue
if index == 1:
if index == value_len - 2:
assert value == status,f"expect {value} but got {status}"
continue
assert value == expect_vec[index-2],f"expect {expect_vec[index-2]} but got {value}"
assert value == expect_vec[index],f"expect {expect_vec[index]} but got {value}"

def test_cursor_snapshot():
print(f"test_cursor_snapshot")
Expand Down Expand Up @@ -161,13 +162,16 @@ def test_cursor_since_rw_timestamp():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
rw_timestamp_1 = row[0][0]
valuelen = len(row[0])
rw_timestamp_1 = row[0][valuelen - 1]
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_2 = row[0][0] - 1
valuelen = len(row[0])
rw_timestamp_2 = row[0][valuelen - 1] - 1
check_rows_data([5,5],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_3 = row[0][0] + 1
valuelen = len(row[0])
rw_timestamp_3 = row[0][valuelen - 1] + 1
check_rows_data([6,6],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
Expand Down
17 changes: 8 additions & 9 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,25 @@ message Sink {
}

message Subscription {
enum SubscriptionState {
UNSPECIFIED = 0;
INIT = 1;
CREATED = 2;
}
uint32 id = 1;
string name = 2;
string definition = 3;
repeated common.ColumnOrder plan_pk = 4;
repeated int32 distribution_key = 5;
map<string, string> properties = 6;
repeated plan_common.ColumnCatalog column_catalogs = 7;
uint64 retention_seconds = 6;
uint32 database_id = 8;
uint32 schema_id = 9;
repeated uint32 dependent_relations = 10;
uint32 dependent_table_id = 10;
optional uint64 initialized_at_epoch = 11;
optional uint64 created_at_epoch = 12;
uint32 owner = 13;
StreamJobStatus stream_job_status = 14;

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
optional string subscription_internal_table_name = 18;
SubscriptionState subscription_state = 19;
}

message Connection {
Expand Down
1 change: 0 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ message DropSinkResponse {

message CreateSubscriptionRequest {
catalog.Subscription subscription = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateSubscriptionResponse {
Expand Down
11 changes: 11 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ service HummockManagerService {
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -892,3 +893,13 @@ message BranchedObject {
// Compaction group id the SST belongs to.
uint64 compaction_group_id = 3;
}

message ListChangeLogEpochsRequest {
uint32 table_id = 1;
uint64 min_epoch = 2;
uint32 max_count = 3;
}

message ListChangeLogEpochsResponse {
repeated uint64 epochs = 1;
}
8 changes: 0 additions & 8 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,6 @@ message OverWindowNode {
OverWindowCachePolicy cache_policy = 5;
}

message SubscriptionNode {
catalog.Subscription subscription_catalog = 1;
// log store should have a table.
catalog.Table log_store_table = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -814,7 +808,6 @@ message StreamNode {
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SubscriptionNode subscription = 141;
SourceBackfillNode source_backfill = 142;
}
// The id for the operator. This is local per mview.
Expand Down Expand Up @@ -911,7 +904,6 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
}

Expand Down
7 changes: 0 additions & 7 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ pub fn valid_table_name(table_name: &str) -> bool {
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn is_subscription_internal_table(subscription_name: &str, table_name: &str) -> bool {
let regex =
Regex::new(format!(r"__internal_{}_(\d+)_subscription_(\d+)", subscription_name).as_str())
.unwrap();
regex.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
6 changes: 0 additions & 6 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,6 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.table, "Sink")
}

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a log store
optional!(node.log_store_table, "Subscription")
}

// Now
NodeBody::Now(node) => {
always!(node.state_table, "Now");
Expand Down
12 changes: 5 additions & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,11 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
let subscription_models: Vec<subscription::ActiveModel> = subscriptions
.into_iter()
.map(|s| {
object_dependencies.extend(s.dependent_relations.iter().map(|id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(*id as _),
used_by: Set(s.id as _),
}
}));
object_dependencies.push(object_dependency::ActiveModel {
id: NotSet,
oid: Set(s.dependent_table_id as _),
used_by: Set(s.id as _),
});
s.into()
})
.collect();
Expand Down
36 changes: 22 additions & 14 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ pub trait CatalogWriter: Send + Sync {
affected_table_change: Option<PbReplaceTablePlan>,
) -> Result<()>;

async fn create_subscription(
&self,
subscription: PbSubscription,
graph: StreamFragmentGraph,
) -> Result<()>;
async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;

async fn create_function(&self, function: PbFunction) -> Result<()>;

Expand Down Expand Up @@ -204,6 +200,13 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -339,15 +342,8 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_subscription(
&self,
subscription: PbSubscription,
graph: StreamFragmentGraph,
) -> Result<()> {
let version = self
.meta_client
.create_subscription(subscription, graph)
.await?;
async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
let version = self.meta_client.create_subscription(subscription).await?;
self.wait_version(version).await
}

Expand Down Expand Up @@ -568,6 +564,18 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
Ok(self
.meta_client
.list_change_log_epochs(table_id, min_epoch, max_count)
.await?)
}
}

impl CatalogWriterImpl {
Expand Down
Loading

0 comments on commit 7df9465

Please sign in to comment.