Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jul 12, 2024
2 parents 6529c8a + 45c9e2b commit 90e3ca4
Show file tree
Hide file tree
Showing 88 changed files with 2,146 additions and 244 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
# patch: unlimit 4MB message size for grpc client
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# patch to remove preserve_order from serde_json
deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" }
# patch to user reqwest 0.12.2
Expand Down
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,10 @@ if ! ${TMUX} ls &>/dev/null ; then
exit 0
fi
# Kill other components with Ctrl+C/Ctrl+D
# Kill tmux components with Ctrl+C
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d
| xargs -I {} ${TMUX} send-keys -t {} C-c
# Stop docker components
containers=$(docker ps -a -q -f name=risedev- 2>/dev/null) || true
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,16 @@ statement ok
flush;

statement ok
create subscription sub from t1 with(retention = '1D');
create subscription sub from t1 with(retention = '1D');

statement ok
create table t2 (v1 int, v2 int);

statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create subscription sub2 from t3 with(retention = '1D');

statement ok
create sink s1 into t3 from t2;
14 changes: 13 additions & 1 deletion e2e_test/subscription/drop_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,16 @@ statement ok
drop subscription sub;

statement ok
drop table t1;
drop table t1;

statement ok
drop sink s1;

statement ok
drop subscription sub2;

statement ok
drop table t3;

statement ok
drop table t2;
24 changes: 23 additions & 1 deletion e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_cursor_with_table_alter():
drop_table_subscription()

def test_cursor_fetch_n():
print(f"test_cursor_with_table_alter")
print(f"test_cursor_fetch_n")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
Expand Down Expand Up @@ -304,6 +304,27 @@ def test_cursor_fetch_n():
check_rows_data([10,100],row[3],3)
drop_table_subscription()

def test_rebuild_table():
print(f"test_rebuild_table")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub2",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
execute_insert("flush",conn)
row = execute_query("fetch 4 from cur",conn)
assert len(row) == 3
check_rows_data([1,1],row[0],1)
check_rows_data([1,1],row[1],4)
check_rows_data([1,100],row[2],3)

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
Expand All @@ -313,3 +334,4 @@ def test_cursor_fetch_n():
test_cursor_since_begin()
test_cursor_with_table_alter()
test_cursor_fetch_n()
test_rebuild_table()
57 changes: 36 additions & 21 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,44 +460,59 @@ def section_compaction(outer_panels):
],
),
panels.timeseries_bytes(
"Hummock Sstable Size",
"Total bytes gotten from sstable_bloom_filter, for observing bloom_filter size",
"Hummock Sstable Bloom Filter Size",
"For observing bloom_filter size, sstable file size, sstable block size etc.",
[
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_bloom_filter_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_bloom_filter_size_count')}[$__rate_interval])) > 0",
"avg_meta - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
f"histogram_quantile(0.50, sum(rate({metric('compactor_sstable_bloom_filter_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"bloom_filter_size_p50 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_file_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_file_size_count')}[$__rate_interval])) > 0",
"avg_file - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
f"histogram_quantile(0.90, sum(rate({metric('compactor_sstable_bloom_filter_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"bloom_filter_size_p90 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
],
),
panels.timeseries_bytes(
"Hummock Sstable Item Size",
"Total bytes gotten from sstable_avg_key_size, for observing sstable_avg_key_size",
"Hummock Sstable File Size",
"For observing sstable file size",
[
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_key_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_key_size_count')}[$__rate_interval])) > 0",
"avg_key_size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
f"histogram_quantile(0.50, sum(rate({metric('compactor_sstable_file_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"sstable_file_size_p50 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_value_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_value_size_count')}[$__rate_interval]))",
"avg_value_size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
f"histogram_quantile(0.90, sum(rate({metric('compactor_sstable_file_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"sstable_file_size_p90 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
],
),
panels.timeseries_count(
"Hummock Sstable Stat",
"Avg count gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
panels.timeseries_bytes(
"Hummock Sstable Block Size",
"For observing sstable block size",
[
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_distinct_epoch_count_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_distinct_epoch_count_count')}[$__rate_interval])) > 0",
"avg_epoch_count - {{%s}} @ {{%s}}"
f"histogram_quantile(0.50, sum(rate({metric('compactor_sstable_block_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"sstable_block_size_p50 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"histogram_quantile(0.90, sum(rate({metric('compactor_sstable_block_size_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"sstable_block_size_p90 - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
],
),
panels.timeseries_bytes(
"Hummock Sstable Avg Key And Value Count",
"For observing avg key and value count",
[
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_key_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_key_size_count')}[$__rate_interval])) > 0",
"avg_key_size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_value_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL})(rate({metric('compactor_sstable_avg_value_size_count')}[$__rate_interval]))",
"avg_value_size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions java/connector-node/risingwave-connector-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>risingwave-sink-deltalake</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-iceberg</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>s3-common</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,6 @@
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Expand Down
3 changes: 3 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message RowSeqScanNode {

// The pushed down `batch_limit`. Max rows needed to return.
optional uint64 limit = 6;
optional plan_common.AsOf as_of = 7;
}

message SysRowSeqScanNode {
Expand Down Expand Up @@ -300,6 +301,7 @@ message LocalLookupJoinNode {
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 11;
optional plan_common.AsOf as_of = 12;
}

// RFC: A new schedule way for distributed lookup join
Expand All @@ -316,6 +318,7 @@ message DistributedLookupJoinNode {
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 9;
optional plan_common.AsOf as_of = 10;
}

message UnionNode {}
Expand Down
1 change: 1 addition & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ message BatchQueryEpoch {
uint64 committed = 1;
uint64 current = 2;
uint64 backup = 3;
uint64 time_travel = 4;
}
}

Expand Down
13 changes: 12 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ message GroupDelta {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4;
GroupMetaChange group_meta_change = 4 [deprecated = true];
GroupTableChange group_table_change = 5 [deprecated = true];
}
}
Expand Down Expand Up @@ -571,6 +571,7 @@ message VacuumTask {
// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
}

// Cancel compact task
Expand Down Expand Up @@ -614,6 +615,7 @@ message ReportFullScanTaskResponse {

message TriggerFullGCRequest {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
}

message TriggerFullGCResponse {
Expand Down Expand Up @@ -821,6 +823,14 @@ message CancelCompactTaskResponse {
bool ret = 1;
}

message GetVersionByEpochRequest {
uint64 epoch = 1;
}

message GetVersionByEpochResponse {
HummockVersion version = 1;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -861,6 +871,7 @@ service HummockManagerService {
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
}

message CompactionConfig {
Expand Down
15 changes: 15 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ message StorageTableDesc {
optional uint32 retention_seconds = 11;
}

message AsOf {
message ProcessTime {}
message Timestamp {
int64 timestamp = 1;
}
message Version {
int64 version = 1;
}
oneof as_of_type {
ProcessTime process_time = 1;
Timestamp timestamp = 2;
Version version = 3;
}
}

// Represents a table in external database for CDC scenario
message ExternalTableDesc {
uint32 table_id = 1;
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ pub enum BatchError {
#[backtrace]
opendal::Error,
),

#[error("Failed to execute time travel query")]
TimeTravel(
#[source]
#[backtrace]
anyhow::Error,
),
}

// Serialize/deserialize error.
Expand Down
25 changes: 21 additions & 4 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use risingwave_storage::{dispatch_state_store, StateStore};
use crate::error::Result;
use crate::executor::join::JoinType;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder,
BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -93,6 +93,24 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
NodeBody::DistributedLookupJoin
)?;

// as_of takes precedence
let as_of = distributed_lookup_join_node
.as_of
.as_ref()
.map(AsOf::try_from)
.transpose()?;
let query_epoch = as_of
.map(|a| {
let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
tracing::debug!(epoch, "time travel");
risingwave_pb::common::BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
epoch,
)),
}
})
.unwrap_or_else(|| source.epoch());

let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
let condition = match distributed_lookup_join_node.get_condition() {
Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
Expand Down Expand Up @@ -179,12 +197,11 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
let vnodes = Some(TableDistribution::all_vnodes());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);

let inner_side_builder = InnerSideExecutorBuilder::new(
outer_side_key_types,
inner_side_key_types.clone(),
lookup_prefix_len,
source.epoch(),
query_epoch,
vec![],
table,
chunk_size,
Expand Down
Loading

0 comments on commit 90e3ca4

Please sign in to comment.