Skip to content

Commit

Permalink
Merge branch 'main' into jiawei/pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Stab123 authored May 9, 2024
2 parents 940e7ec + 464f8ef commit a3510a1
Show file tree
Hide file tree
Showing 79 changed files with 1,602 additions and 809 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set -euo pipefail
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

74 changes: 72 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,61 @@ create sink invalid_pk_column from t_kafka with (
primary_key = 'id,invalid'
);

### Test sink with key encode ###

statement error sink key encode unsupported: JSON, only TEXT supported
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id')
format plain encode json (
force_append_only='true'
) key encode json ;

statement error
# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id, v_varchar')
format plain encode json (
force_append_only='true'
) key encode text ;

statement error
# The key encode is TEXT, but the primary key column v_bytea has type bytea. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, or big int.
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'v_bytea')
format plain encode json (
force_append_only='true'
) key encode text ;

statement ok
create sink sink_text_id from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
primary_key = 'id')
format plain encode json (
force_append_only='true'
) key encode text ;

statement ok
create table t_sink_text_id (id int)
include key as rw_key
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-text-key-id',
) format plain encode json;

#======

statement ok
insert into t_kafka values
(1, '8DfUFencLe', 31031, 1872, 1872, 26261.416, 23956.39329760601, '2023-04-14 06:27:14.104742', '\x00', '0 second', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
Expand All @@ -193,7 +248,6 @@ insert into t_kafka values
(6, 'V4y71v4Gip', 4014, 10844, 28842, 5885.368, 11210.458724794062, '2023-04-13 10:42:02.137742', '\xCAFEBABE', '4 hour', '0001-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(7, 'YIVLnWxHyf', 10324, 12652, 15914, 3946.7434, 10967.182297153104, '2023-04-14 04:41:03.083742', '\xBABEC0DE', '3 day', '0001-01-01', '01:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}');


statement error
create sink si_kafka_without_snapshot as select * from t_kafka with (
connector = 'kafka',
Expand All @@ -216,8 +270,24 @@ create sink si_kafka_without_snapshot from t_kafka with (
snapshot = 'false',
);

sleep 1s

query T
select rw_key from t_sink_text_id order by rw_key
----
\x31
\x32
\x33
\x34
\x35
\x36
\x37

statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');

statement ok
drop table t_sink_text_id;
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/drop_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ drop sink si_kafka_upsert_schema;
statement ok
drop sink si_kafka_without_snapshot;

statement ok
drop sink sink_text_id;

statement ok
drop table t_kafka;
7 changes: 1 addition & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,7 @@ def section_object_storage(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
f"sum(rate({metric('object_store_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ message SinkFormatDesc {
plan_common.FormatType format = 1;
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
optional plan_common.EncodeType key_encode = 4;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ enum EncodeType {
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
ENCODE_TYPE_NONE = 8;
ENCODE_TYPE_TEXT = 9;
}

enum RowFormatType {
Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum BatchError {

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),

#[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
OutOfMemory(u64),
}

// Serialize/deserialize error.
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {

#[tokio::test]
async fn test_group_top_n_executor() {
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
{
let schema = Schema {
fields: vec![
Expand Down
6 changes: 4 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
}
}
// update memory usage
self.mem_context.add(memory_usage_diff);
if !self.mem_context.add(memory_usage_diff) {
Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
}
}

// Don't use `into_iter` here, it may cause memory leak.
Expand Down Expand Up @@ -323,7 +325,7 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
Expand Down
10 changes: 7 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ impl<K: HashKey> HashJoinExecutor<K> {
let build_chunk = build_chunk?;
if build_chunk.cardinality() > 0 {
build_row_count += build_chunk.cardinality();
self.mem_ctx.add(build_chunk.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_chunk.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
build_side.push(build_chunk);
}
}
Expand Down Expand Up @@ -264,7 +266,9 @@ impl<K: HashKey> HashJoinExecutor<K> {
// Only insert key to hash map if it is consistent with the null safe restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
let row_id = RowId::new(build_chunk_id, build_row_id);
self.mem_ctx.add(build_key.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_key.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
}
}
Expand Down Expand Up @@ -2204,7 +2208,7 @@ mod tests {
right_executor: BoxedExecutor,
) {
let parent_mem_context =
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);

{
let join_executor = self.create_join_executor_with_chunk_size_and_executors(
Expand Down
8 changes: 6 additions & 2 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl<K: HashKey> LookupJoinBase<K> {
let build_chunk = build_chunk?;
if build_chunk.cardinality() > 0 {
build_row_count += build_chunk.cardinality();
self.mem_ctx.add(build_chunk.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_chunk.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
build_side.push(build_chunk);
}
}
Expand Down Expand Up @@ -160,7 +162,9 @@ impl<K: HashKey> LookupJoinBase<K> {
// restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
let row_id = RowId::new(build_chunk_id, build_row_id);
self.mem_ctx.add(build_key.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_key.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
hash_key_heap_size += build_key.estimated_heap_size() as i64;
next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
}
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl NestedLoopJoinExecutor {
for chunk in self.left_child.execute() {
let c = chunk?;
trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
self.mem_context.add(c.estimated_heap_size() as i64);
if !self.mem_context.add(c.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
}
ret.push(c);
}
ret
Expand Down
16 changes: 12 additions & 4 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx

// Check whether there is indeed a chunk and there is a visible row sitting at `row_idx`
// in the chunk before calling this function.
fn push_row_into_heap(&mut self, source_idx: usize, row_idx: usize) {
fn push_row_into_heap(&mut self, source_idx: usize, row_idx: usize) -> Result<()> {
assert!(source_idx < self.source_inputs.len());
let chunk_ref = self.source_inputs[source_idx].as_ref().unwrap();
self.min_heap.push(HeapElem::new(
Expand All @@ -131,6 +131,14 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
row_idx,
None,
));

if self.min_heap.mem_context().check_memory_usage() {
Ok(())
} else {
Err(BatchError::OutOfMemory(
self.min_heap.mem_context().mem_limit(),
))
}
}
}

Expand Down Expand Up @@ -166,7 +174,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
// exchange, therefore we are sure that there is at least
// one visible row.
let next_row_idx = chunk.next_visible_row_idx(0);
self.push_row_into_heap(source_idx, next_row_idx.unwrap());
self.push_row_into_heap(source_idx, next_row_idx.unwrap())?;
}
}

Expand Down Expand Up @@ -201,13 +209,13 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
let possible_next_row_idx = cur_chunk.next_visible_row_idx(row_idx + 1);
match possible_next_row_idx {
Some(next_row_idx) => {
self.push_row_into_heap(child_idx, next_row_idx);
self.push_row_into_heap(child_idx, next_row_idx)?;
}
None => {
self.get_source_chunk(child_idx).await?;
if let Some(chunk) = &self.source_inputs[child_idx] {
let next_row_idx = chunk.next_visible_row_idx(0);
self.push_row_into_heap(child_idx, next_row_idx.unwrap());
self.push_row_into_heap(child_idx, next_row_idx.unwrap())?;
}
}
}
Expand Down
Loading

0 comments on commit a3510a1

Please sign in to comment.