Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherrypick fix(storage): fix compactor task parallelism race #16999

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
timeout_in_minutes: 11
retry: *auto-retry

- label: "end-to-end test (parallel, in-memory) (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 16
retry: *auto-retry

- label: "end-to-end test for opendal (parallel)"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

25 changes: 0 additions & 25 deletions e2e_test/ddl/alter_parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali
statement ok
create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id;

Expand Down Expand Up @@ -97,28 +94,9 @@ select parallelism from sink_parallelism where name = 's';
----
FIXED(4)

statement ok
create subscription subscription1 from t with (retention = '1D');

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
ADAPTIVE

statement ok
alter subscription subscription1 set parallelism = 4;

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
FIXED(4)

statement ok
drop sink s;

statement ok
drop subscription subscription1;

statement ok
drop materialized view m_join;

Expand Down Expand Up @@ -179,8 +157,5 @@ drop view mview_parallelism;
statement ok
drop view sink_parallelism;

statement ok
drop view subscription_parallelism;

statement ok
drop view fragment_parallelism;
3 changes: 3 additions & 0 deletions e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ create table t1 (v1 int, v2 int);
statement ok
insert into t1 values (1,2);

statement ok
flush;

statement ok
create subscription sub from t1 with(retention = '1D');
16 changes: 10 additions & 6 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ def execute_insert(sql,conn):

def check_rows_data(expect_vec,rows,status):
row = rows[0]
value_len = len(row)
for index, value in enumerate(row):
if index == 0:
if index == value_len - 1:
continue
if index == 1:
if index == value_len - 2:
assert value == status,f"expect {value} but got {status}"
continue
assert value == expect_vec[index-2],f"expect {expect_vec[index-2]} but got {value}"
assert value == expect_vec[index],f"expect {expect_vec[index]} but got {value}"

def test_cursor_snapshot():
print(f"test_cursor_snapshot")
Expand Down Expand Up @@ -161,13 +162,16 @@ def test_cursor_since_rw_timestamp():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
rw_timestamp_1 = row[0][0]
valuelen = len(row[0])
rw_timestamp_1 = row[0][valuelen - 1]
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_2 = row[0][0] - 1
valuelen = len(row[0])
rw_timestamp_2 = row[0][valuelen - 1] - 1
check_rows_data([5,5],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_3 = row[0][0] + 1
valuelen = len(row[0])
rw_timestamp_3 = row[0][valuelen - 1] + 1
check_rows_data([6,6],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
Expand Down
10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3849,6 +3849,16 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_percentage(
"Log Store Backpressure Ratio",
"",
[
panels.target(
f"avg(rate({metric('log_store_reader_wait_new_future_duration_ns')}[$__rate_interval])) by (connector, sink_id, executor_id) / 1000000000",
"Backpressure @ {{connector}} {{sink_id}} {{executor_id}}",
),
],
),
panels.timeseries_latency(
"Log Store Consume Persistent Log Lag",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,25 @@ message Sink {
}

message Subscription {
enum SubscriptionState {
UNSPECIFIED = 0;
INIT = 1;
CREATED = 2;
}
uint32 id = 1;
string name = 2;
string definition = 3;
repeated common.ColumnOrder plan_pk = 4;
repeated int32 distribution_key = 5;
map<string, string> properties = 6;
repeated plan_common.ColumnCatalog column_catalogs = 7;
uint64 retention_seconds = 6;
uint32 database_id = 8;
uint32 schema_id = 9;
repeated uint32 dependent_relations = 10;
uint32 dependent_table_id = 10;
optional uint64 initialized_at_epoch = 11;
optional uint64 created_at_epoch = 12;
uint32 owner = 13;
StreamJobStatus stream_job_status = 14;

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
optional string subscription_internal_table_name = 18;
SubscriptionState subscription_state = 19;
}

message Connection {
Expand Down
1 change: 0 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ message DropSinkResponse {

message CreateSubscriptionRequest {
catalog.Subscription subscription = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateSubscriptionResponse {
Expand Down
11 changes: 11 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ service HummockManagerService {
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -892,3 +893,13 @@ message BranchedObject {
// Compaction group id the SST belongs to.
uint64 compaction_group_id = 3;
}

message ListChangeLogEpochsRequest {
uint32 table_id = 1;
uint64 min_epoch = 2;
uint32 max_count = 3;
}

message ListChangeLogEpochsResponse {
repeated uint64 epochs = 1;
}
8 changes: 0 additions & 8 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,6 @@ message OverWindowNode {
OverWindowCachePolicy cache_policy = 5;
}

message SubscriptionNode {
catalog.Subscription subscription_catalog = 1;
// log store should have a table.
catalog.Table log_store_table = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -814,7 +808,6 @@ message StreamNode {
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SubscriptionNode subscription = 141;
SourceBackfillNode source_backfill = 142;
}
// The id for the operator. This is local per mview.
Expand Down Expand Up @@ -911,7 +904,6 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
}

Expand Down
2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl LogReader for MockRangeLogReader {
}
}

async fn truncate(&mut self, _offset: TruncateOffset) -> LogStoreResult<()> {
fn truncate(&mut self, _offset: TruncateOffset) -> LogStoreResult<()> {
Ok(())
}

Expand Down
7 changes: 0 additions & 7 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ pub fn valid_table_name(table_name: &str) -> bool {
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn is_subscription_internal_table(subscription_name: &str, table_name: &str) -> bool {
let regex =
Regex::new(format!(r"__internal_{}_(\d+)_subscription_(\d+)", subscription_name).as_str())
.unwrap();
regex.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
6 changes: 0 additions & 6 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,6 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.table, "Sink")
}

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a log store
optional!(node.log_store_table, "Subscription")
}

// Now
NodeBody::Now(node) => {
always!(node.state_table, "Now");
Expand Down
9 changes: 0 additions & 9 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -215,12 +214,6 @@ pub async fn compute_node_serve(
));

let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32
* storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
sstable_store: storage.sstable_store(),
Expand All @@ -233,8 +226,6 @@ pub async fn compute_node_serve(
await_tree_reg: await_tree_config
.clone()
.map(new_compaction_await_tree_reg_ref),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};

let (handle, shutdown_sender) = start_compactor(
Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/sink/iceberg/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for IcebergLogSinkerOf<W> {
sink_metrics
.sink_commit_duration_metrics
.observe(start_time.elapsed().as_millis() as f64);
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
current_checkpoint = 0;
} else {
sink_writer.barrier(false).await?;
Expand Down
Loading
Loading